Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/core/v1beta2/machine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,24 @@ const (
MachineNotUpToDateReason = "NotUpToDate"
)

// Machine's Updating condition and corresponding reasons.
// Note: Updating condition is set by the Machine controller during in-place updates.
const (
// MachineUpdatingCondition is true while an in-place update is in progress on the Machine.
// The condition is owned by the Machine controller and is used to track the progress of in-place updates.
// This condition is considered when computing the UpToDate condition.
MachineUpdatingCondition = "Updating"

// MachineNotUpdatingReason surfaces when the Machine is not performing an in-place update.
MachineNotUpdatingReason = "NotUpdating"

// MachineInPlaceUpdatingReason surfaces when the Machine is waiting for in-place update to complete.
MachineInPlaceUpdatingReason = "InPlaceUpdating"

// MachineInPlaceUpdateFailedReason surfaces when the in-place update has failed.
MachineInPlaceUpdateFailedReason = "InPlaceUpdateFailed"
)

// Machine's BootstrapConfigReady condition and corresponding reasons.
// Note: when possible, BootstrapConfigReady condition will use reasons surfaced from the underlying bootstrap config object.
const (
Expand Down
8 changes: 5 additions & 3 deletions controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ func (r *ClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag

// MachineReconciler reconciles a Machine object.
type MachineReconciler struct {
Client client.Client
APIReader client.Reader
ClusterCache clustercache.ClusterCache
Client client.Client
APIReader client.Reader
ClusterCache clustercache.ClusterCache
RuntimeClient runtimeclient.Client

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand All @@ -90,6 +91,7 @@ func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
Client: r.Client,
APIReader: r.APIReader,
ClusterCache: r.ClusterCache,
RuntimeClient: r.RuntimeClient,
WatchFilterValue: r.WatchFilterValue,
RemoteConditionsGracePeriod: r.RemoteConditionsGracePeriod,
AdditionalSyncMachineLabels: r.AdditionalSyncMachineLabels,
Expand Down
25 changes: 21 additions & 4 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"sigs.k8s.io/cluster-api/controllers/clustercache"
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
runtimeclient "sigs.k8s.io/cluster-api/exp/runtime/client"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/internal/contract"
"sigs.k8s.io/cluster-api/internal/controllers/machine/drain"
Expand Down Expand Up @@ -93,9 +94,10 @@ var (

// Reconciler reconciles a Machine object.
type Reconciler struct {
Client client.Client
APIReader client.Reader
ClusterCache clustercache.ClusterCache
Client client.Client
APIReader client.Reader
ClusterCache clustercache.ClusterCache
RuntimeClient runtimeclient.Client

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand Down Expand Up @@ -129,6 +131,9 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
// to have some buffer.
return errors.New("Client, APIReader and ClusterCache must not be nil and RemoteConditionsGracePeriod must not be < 2m")
}
if feature.Gates.Enabled(feature.InPlaceUpdates) && r.RuntimeClient == nil {
return errors.New("RuntimeClient must not be nil when InPlaceUpdates feature gate is enabled")
}

r.predicateLog = ptr.To(ctrl.LoggerFrom(ctx).WithValues("controller", "machine"))
clusterToMachines, err := util.ClusterToTypedObjectsMapper(mgr.GetClient(), &clusterv1.MachineList{}, mgr.GetScheme())
Expand Down Expand Up @@ -282,7 +287,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
}

// Handle normal reconciliation loop.
return doReconcile(ctx, alwaysReconcile, s)
reconcileNormal := append(
alwaysReconcile,
r.reconcileInPlaceUpdate,
)

return doReconcile(ctx, reconcileNormal, s)
}

func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clusterv1.Machine, options ...patch.Option) error {
Expand Down Expand Up @@ -326,6 +336,7 @@ func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clust
clusterv1.MachineNodeReadyCondition,
clusterv1.MachineNodeHealthyCondition,
clusterv1.MachineDeletingCondition,
clusterv1.MachineUpdatingCondition,
}},
)

Expand Down Expand Up @@ -397,6 +408,12 @@ type scope struct {

// deletingMessage is the message that should be used when setting the Deleting condition.
deletingMessage string

// updatingReason is the reason that should be used when setting the Updating condition.
updatingReason string

// updatingMessage is the message that should be used when setting the Updating condition.
updatingMessage string
}

func (r *Reconciler) reconcileMachineOwnerAndLabels(_ context.Context, s *scope) (ctrl.Result, error) {
Expand Down
267 changes: 267 additions & 0 deletions internal/controllers/machine/machine_controller_inplace_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package machine

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
runtimehooksv1 "sigs.k8s.io/cluster-api/api/runtime/hooks/v1alpha1"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/internal/hooks"
)

// reconcileInPlaceUpdate handles the in-place update workflow for a Machine.
func (r *Reconciler) reconcileInPlaceUpdate(ctx context.Context, s *scope) (ctrl.Result, error) {
if !feature.Gates.Enabled(feature.InPlaceUpdates) {
return ctrl.Result{}, nil
}

log := ctrl.LoggerFrom(ctx)

machineAnnotations := s.machine.GetAnnotations()
_, inPlaceUpdateInProgress := machineAnnotations[clusterv1.UpdateInProgressAnnotation]
hasUpdateMachinePending := hooks.IsPending(runtimehooksv1.UpdateMachine, s.machine)

if !inPlaceUpdateInProgress {
// Clean up any orphaned pending hooks and annotations before exiting.
// This can happen if the in-place update annotation was removed from Machine
// but the UpdateMachine hook is still pending or annotations are still on InfraMachine/BootstrapConfig.
if hasUpdateMachinePending {
log.Info("In-place update annotation removed but UpdateMachine hook still pending, cleaning up orphaned hook and annotations")
if err := r.completeInPlaceUpdate(ctx, s); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to clean up orphaned UpdateMachine hook and annotations")
}
}

return ctrl.Result{}, nil
}

// If hook is not pending, we're waiting for the owner controller to mark it as pending.
if !hasUpdateMachinePending {
log.Info("In-place update annotation is set, waiting for UpdateMachine hook to be marked as pending")
return ctrl.Result{}, nil
}

if !ptr.Deref(s.machine.Status.Initialization.InfrastructureProvisioned, false) {
log.V(5).Info("Infrastructure not yet provisioned, skipping in-place update")
return ctrl.Result{}, nil
}
if !ptr.Deref(s.machine.Status.Initialization.BootstrapDataSecretCreated, false) {
log.V(5).Info("Bootstrap data secret not yet created, skipping in-place update")
return ctrl.Result{}, nil
}

if s.infraMachine == nil {
s.updatingReason = clusterv1.MachineInPlaceUpdateFailedReason
s.updatingMessage = "In-place update not possible: InfraMachine not found"
return ctrl.Result{}, errors.New("in-place update failed: InfraMachine not found")
}

infraReady := r.isInfraMachineReadyForUpdate(s)
bootstrapReady := r.isBootstrapConfigReadyForUpdate(s)

if !infraReady || !bootstrapReady {
log.Info("Waiting for InfraMachine and BootstrapConfig to be marked for in-place update")
return ctrl.Result{}, nil
}

result, message, err := r.callUpdateMachineHook(ctx, s)
if err != nil {
s.updatingReason = clusterv1.MachineInPlaceUpdateFailedReason
s.updatingMessage = "UpdateMachine hook failed: please check controller logs for errors"
return ctrl.Result{}, errors.Wrap(err, "in-place update failed")
}

if result.RequeueAfter > 0 {
s.updatingReason = clusterv1.MachineInPlaceUpdatingReason
if message != "" {
s.updatingMessage = fmt.Sprintf("In-place update in progress: %s", message)
} else {
s.updatingMessage = "In-place update in progress"
}
return result, nil
}

log.Info("In-place update completed successfully")
if err := r.completeInPlaceUpdate(ctx, s); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to complete in-place update")
}

return ctrl.Result{}, nil
}

// isInfraMachineReadyForUpdate checks if the InfraMachine has the in-place update annotation.
func (r *Reconciler) isInfraMachineReadyForUpdate(s *scope) bool {
_, hasAnnotation := s.infraMachine.GetAnnotations()[clusterv1.UpdateInProgressAnnotation]
return hasAnnotation
}

// isBootstrapConfigReadyForUpdate checks if the BootstrapConfig has the in-place update annotation.
func (r *Reconciler) isBootstrapConfigReadyForUpdate(s *scope) bool {
if s.bootstrapConfig == nil {
return true
}
_, hasAnnotation := s.bootstrapConfig.GetAnnotations()[clusterv1.UpdateInProgressAnnotation]
return hasAnnotation
}

// callUpdateMachineHook calls the UpdateMachine runtime hook for the machine.
func (r *Reconciler) callUpdateMachineHook(ctx context.Context, s *scope) (ctrl.Result, string, error) {
log := ctrl.LoggerFrom(ctx)

// Validate that exactly one extension is registered for the UpdateMachine hook.
// For the current iteration, we only support a single extension to ensure safe behavior.
// Support for multiple extensions will be introduced in a future iteration.
extensions, err := r.RuntimeClient.GetAllExtensions(ctx, runtimehooksv1.UpdateMachine, s.machine)
if err != nil {
return ctrl.Result{}, "", err
}

if len(extensions) == 0 {
return ctrl.Result{}, "", errors.New("no extensions registered for UpdateMachine hook")
}

if len(extensions) > 1 {
return ctrl.Result{}, "", errors.Errorf("multiple extensions registered for UpdateMachine hook: only one extension is supported, found %d extensions: %v", len(extensions), extensions)
}

// Note: When building request message, dropping status; Runtime extension should treat UpdateMachine
// requests as desired state; it is up to them to compare with current state and perform necessary actions.
request := &runtimehooksv1.UpdateMachineRequest{
Desired: runtimehooksv1.UpdateMachineRequestObjects{
Machine: *cleanupMachine(s.machine),
InfrastructureMachine: runtime.RawExtension{Object: cleanupUnstructured(s.infraMachine)},
},
}

if s.bootstrapConfig != nil {
request.Desired.BootstrapConfig = runtime.RawExtension{Object: cleanupUnstructured(s.bootstrapConfig)}
}

response := &runtimehooksv1.UpdateMachineResponse{}

if err := r.RuntimeClient.CallAllExtensions(ctx, runtimehooksv1.UpdateMachine, s.machine, request, response); err != nil {
return ctrl.Result{}, "", err
}

if response.GetRetryAfterSeconds() != 0 {
log.Info(fmt.Sprintf("UpdateMachine hook requested retry after %d seconds", response.GetRetryAfterSeconds()))
return ctrl.Result{RequeueAfter: time.Duration(response.GetRetryAfterSeconds()) * time.Second}, response.GetMessage(), nil
}

log.Info("UpdateMachine hook completed successfully")
return ctrl.Result{}, response.GetMessage(), nil
}

// completeInPlaceUpdate removes in-place update annotations from InfraMachine, BootstrapConfig, Machine,
// and then marks the UpdateMachine hook as done (removes it from pending-hooks annotation).
func (r *Reconciler) completeInPlaceUpdate(ctx context.Context, s *scope) error {
log := ctrl.LoggerFrom(ctx)

if err := r.removeInPlaceUpdateAnnotation(ctx, s.machine); err != nil {
return err
}

if s.infraMachine == nil {
log.Info("InfraMachine not found during in-place update completion, skipping annotation removal")
} else {
if err := r.removeInPlaceUpdateAnnotation(ctx, s.infraMachine); err != nil {
return err
}
}

if s.bootstrapConfig != nil {
if err := r.removeInPlaceUpdateAnnotation(ctx, s.bootstrapConfig); err != nil {
return err
}
}

if err := hooks.MarkAsDone(ctx, r.Client, s.machine, runtimehooksv1.UpdateMachine); err != nil {
return err
}

log.Info("In place upgrade completed!")
return nil
}

// removeInPlaceUpdateAnnotation removes the in-place update annotation from an object and patches it immediately.
func (r *Reconciler) removeInPlaceUpdateAnnotation(ctx context.Context, obj client.Object) error {
annotations := obj.GetAnnotations()
if _, exists := annotations[clusterv1.UpdateInProgressAnnotation]; !exists {
return nil
}

gvk, err := apiutil.GVKForObject(obj, r.Client.Scheme())
if err != nil {
return errors.Wrapf(err, "failed to remove %s annotation from object %s", clusterv1.UpdateInProgressAnnotation, klog.KObj(obj))
}

orig := obj.DeepCopyObject().(client.Object)
delete(annotations, clusterv1.UpdateInProgressAnnotation)
obj.SetAnnotations(annotations)

if err := r.Client.Patch(ctx, obj, client.MergeFrom(orig)); err != nil {
return errors.Wrapf(err, "failed to remove %s annotation from %s %s", clusterv1.UpdateInProgressAnnotation, gvk.Kind, klog.KObj(obj))
}

return nil
}

func cleanupMachine(machine *clusterv1.Machine) *clusterv1.Machine {
return &clusterv1.Machine{
// Set GVK because object is later marshalled with json.Marshal when the hook request is sent.
TypeMeta: metav1.TypeMeta{
APIVersion: clusterv1.GroupVersion.String(),
Kind: "Machine",
},
ObjectMeta: metav1.ObjectMeta{
Name: machine.Name,
Namespace: machine.Namespace,
Labels: machine.Labels,
Annotations: machine.Annotations,
},
Spec: *machine.Spec.DeepCopy(),
}
}

func cleanupUnstructured(u *unstructured.Unstructured) *unstructured.Unstructured {
cleanedUpU := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": u.GetAPIVersion(),
"kind": u.GetKind(),
"spec": u.Object["spec"],
},
}
cleanedUpU.SetName(u.GetName())
cleanedUpU.SetNamespace(u.GetNamespace())
cleanedUpU.SetLabels(u.GetLabels())
cleanedUpU.SetAnnotations(u.GetAnnotations())
return cleanedUpU
}
Loading