diff --git a/api/core/v1beta2/machine_types.go b/api/core/v1beta2/machine_types.go index 18d97b42a911..7e752f65a733 100644 --- a/api/core/v1beta2/machine_types.go +++ b/api/core/v1beta2/machine_types.go @@ -164,6 +164,24 @@ const ( MachineNotUpToDateReason = "NotUpToDate" ) +// Machine's Updating condition and corresponding reasons. +// Note: Updating condition is set by the Machine controller during in-place updates. +const ( + // MachineUpdatingCondition is true while an in-place update is in progress on the Machine. + // The condition is owned by the Machine controller and is used to track the progress of in-place updates. + // This condition is considered when computing the UpToDate condition. + MachineUpdatingCondition = "Updating" + + // MachineNotUpdatingReason surfaces when the Machine is not performing an in-place update. + MachineNotUpdatingReason = "NotUpdating" + + // MachineInPlaceUpdatingReason surfaces when the Machine is waiting for in-place update to complete. + MachineInPlaceUpdatingReason = "InPlaceUpdating" + + // MachineInPlaceUpdateFailedReason surfaces when the in-place update has failed. + MachineInPlaceUpdateFailedReason = "InPlaceUpdateFailed" +) + // Machine's BootstrapConfigReady condition and corresponding reasons. // Note: when possible, BootstrapConfigReady condition will use reasons surfaced from the underlying bootstrap config object. const ( diff --git a/controllers/alias.go b/controllers/alias.go index 657b1c03477d..0bf00eb64897 100644 --- a/controllers/alias.go +++ b/controllers/alias.go @@ -72,9 +72,10 @@ func (r *ClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag // MachineReconciler reconciles a Machine object. type MachineReconciler struct { - Client client.Client - APIReader client.Reader - ClusterCache clustercache.ClusterCache + Client client.Client + APIReader client.Reader + ClusterCache clustercache.ClusterCache + RuntimeClient runtimeclient.Client // WatchFilterValue is the label value used to filter events prior to reconciliation. WatchFilterValue string @@ -90,6 +91,7 @@ func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag Client: r.Client, APIReader: r.APIReader, ClusterCache: r.ClusterCache, + RuntimeClient: r.RuntimeClient, WatchFilterValue: r.WatchFilterValue, RemoteConditionsGracePeriod: r.RemoteConditionsGracePeriod, AdditionalSyncMachineLabels: r.AdditionalSyncMachineLabels, diff --git a/internal/controllers/machine/machine_controller.go b/internal/controllers/machine/machine_controller.go index 428ed8195fad..8cf90e54a063 100644 --- a/internal/controllers/machine/machine_controller.go +++ b/internal/controllers/machine/machine_controller.go @@ -52,6 +52,7 @@ import ( "sigs.k8s.io/cluster-api/controllers/clustercache" "sigs.k8s.io/cluster-api/controllers/external" "sigs.k8s.io/cluster-api/controllers/noderefutil" + runtimeclient "sigs.k8s.io/cluster-api/exp/runtime/client" "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/internal/contract" "sigs.k8s.io/cluster-api/internal/controllers/machine/drain" @@ -93,9 +94,10 @@ var ( // Reconciler reconciles a Machine object. type Reconciler struct { - Client client.Client - APIReader client.Reader - ClusterCache clustercache.ClusterCache + Client client.Client + APIReader client.Reader + ClusterCache clustercache.ClusterCache + RuntimeClient runtimeclient.Client // WatchFilterValue is the label value used to filter events prior to reconciliation. WatchFilterValue string @@ -129,6 +131,9 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt // to have some buffer. return errors.New("Client, APIReader and ClusterCache must not be nil and RemoteConditionsGracePeriod must not be < 2m") } + if feature.Gates.Enabled(feature.InPlaceUpdates) && r.RuntimeClient == nil { + return errors.New("RuntimeClient must not be nil when InPlaceUpdates feature gate is enabled") + } r.predicateLog = ptr.To(ctrl.LoggerFrom(ctx).WithValues("controller", "machine")) clusterToMachines, err := util.ClusterToTypedObjectsMapper(mgr.GetClient(), &clusterv1.MachineList{}, mgr.GetScheme()) @@ -282,7 +287,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re } // Handle normal reconciliation loop. - return doReconcile(ctx, alwaysReconcile, s) + reconcileNormal := append( + alwaysReconcile, + r.reconcileInPlaceUpdate, + ) + + return doReconcile(ctx, reconcileNormal, s) } func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clusterv1.Machine, options ...patch.Option) error { @@ -326,6 +336,7 @@ func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clust clusterv1.MachineNodeReadyCondition, clusterv1.MachineNodeHealthyCondition, clusterv1.MachineDeletingCondition, + clusterv1.MachineUpdatingCondition, }}, ) @@ -397,6 +408,12 @@ type scope struct { // deletingMessage is the message that should be used when setting the Deleting condition. deletingMessage string + + // updatingReason is the reason that should be used when setting the Updating condition. + updatingReason string + + // updatingMessage is the message that should be used when setting the Updating condition. + updatingMessage string } func (r *Reconciler) reconcileMachineOwnerAndLabels(_ context.Context, s *scope) (ctrl.Result, error) { diff --git a/internal/controllers/machine/machine_controller_inplace_update.go b/internal/controllers/machine/machine_controller_inplace_update.go new file mode 100644 index 000000000000..ea1733ccd359 --- /dev/null +++ b/internal/controllers/machine/machine_controller_inplace_update.go @@ -0,0 +1,267 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + + clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2" + runtimehooksv1 "sigs.k8s.io/cluster-api/api/runtime/hooks/v1alpha1" + "sigs.k8s.io/cluster-api/feature" + "sigs.k8s.io/cluster-api/internal/hooks" +) + +// reconcileInPlaceUpdate handles the in-place update workflow for a Machine. +func (r *Reconciler) reconcileInPlaceUpdate(ctx context.Context, s *scope) (ctrl.Result, error) { + if !feature.Gates.Enabled(feature.InPlaceUpdates) { + return ctrl.Result{}, nil + } + + log := ctrl.LoggerFrom(ctx) + + machineAnnotations := s.machine.GetAnnotations() + _, inPlaceUpdateInProgress := machineAnnotations[clusterv1.UpdateInProgressAnnotation] + hasUpdateMachinePending := hooks.IsPending(runtimehooksv1.UpdateMachine, s.machine) + + if !inPlaceUpdateInProgress { + // Clean up any orphaned pending hooks and annotations before exiting. + // This can happen if the in-place update annotation was removed from Machine + // but the UpdateMachine hook is still pending or annotations are still on InfraMachine/BootstrapConfig. + if hasUpdateMachinePending { + log.Info("In-place update annotation removed but UpdateMachine hook still pending, cleaning up orphaned hook and annotations") + if err := r.completeInPlaceUpdate(ctx, s); err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to clean up orphaned UpdateMachine hook and annotations") + } + } + + return ctrl.Result{}, nil + } + + // If hook is not pending, we're waiting for the owner controller to mark it as pending. + if !hasUpdateMachinePending { + log.Info("In-place update annotation is set, waiting for UpdateMachine hook to be marked as pending") + return ctrl.Result{}, nil + } + + if !ptr.Deref(s.machine.Status.Initialization.InfrastructureProvisioned, false) { + log.V(5).Info("Infrastructure not yet provisioned, skipping in-place update") + return ctrl.Result{}, nil + } + if !ptr.Deref(s.machine.Status.Initialization.BootstrapDataSecretCreated, false) { + log.V(5).Info("Bootstrap data secret not yet created, skipping in-place update") + return ctrl.Result{}, nil + } + + if s.infraMachine == nil { + s.updatingReason = clusterv1.MachineInPlaceUpdateFailedReason + s.updatingMessage = "In-place update not possible: InfraMachine not found" + return ctrl.Result{}, errors.New("in-place update failed: InfraMachine not found") + } + + infraReady := r.isInfraMachineReadyForUpdate(s) + bootstrapReady := r.isBootstrapConfigReadyForUpdate(s) + + if !infraReady || !bootstrapReady { + log.Info("Waiting for InfraMachine and BootstrapConfig to be marked for in-place update") + return ctrl.Result{}, nil + } + + result, message, err := r.callUpdateMachineHook(ctx, s) + if err != nil { + s.updatingReason = clusterv1.MachineInPlaceUpdateFailedReason + s.updatingMessage = "UpdateMachine hook failed: please check controller logs for errors" + return ctrl.Result{}, errors.Wrap(err, "in-place update failed") + } + + if result.RequeueAfter > 0 { + s.updatingReason = clusterv1.MachineInPlaceUpdatingReason + if message != "" { + s.updatingMessage = fmt.Sprintf("In-place update in progress: %s", message) + } else { + s.updatingMessage = "In-place update in progress" + } + return result, nil + } + + log.Info("In-place update completed successfully") + if err := r.completeInPlaceUpdate(ctx, s); err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to complete in-place update") + } + + return ctrl.Result{}, nil +} + +// isInfraMachineReadyForUpdate checks if the InfraMachine has the in-place update annotation. +func (r *Reconciler) isInfraMachineReadyForUpdate(s *scope) bool { + _, hasAnnotation := s.infraMachine.GetAnnotations()[clusterv1.UpdateInProgressAnnotation] + return hasAnnotation +} + +// isBootstrapConfigReadyForUpdate checks if the BootstrapConfig has the in-place update annotation. +func (r *Reconciler) isBootstrapConfigReadyForUpdate(s *scope) bool { + if s.bootstrapConfig == nil { + return true + } + _, hasAnnotation := s.bootstrapConfig.GetAnnotations()[clusterv1.UpdateInProgressAnnotation] + return hasAnnotation +} + +// callUpdateMachineHook calls the UpdateMachine runtime hook for the machine. +func (r *Reconciler) callUpdateMachineHook(ctx context.Context, s *scope) (ctrl.Result, string, error) { + log := ctrl.LoggerFrom(ctx) + + // Validate that exactly one extension is registered for the UpdateMachine hook. + // For the current iteration, we only support a single extension to ensure safe behavior. + // Support for multiple extensions will be introduced in a future iteration. + extensions, err := r.RuntimeClient.GetAllExtensions(ctx, runtimehooksv1.UpdateMachine, s.machine) + if err != nil { + return ctrl.Result{}, "", err + } + + if len(extensions) == 0 { + return ctrl.Result{}, "", errors.New("no extensions registered for UpdateMachine hook") + } + + if len(extensions) > 1 { + return ctrl.Result{}, "", errors.Errorf("multiple extensions registered for UpdateMachine hook: only one extension is supported, found %d extensions: %v", len(extensions), extensions) + } + + // Note: When building request message, dropping status; Runtime extension should treat UpdateMachine + // requests as desired state; it is up to them to compare with current state and perform necessary actions. + request := &runtimehooksv1.UpdateMachineRequest{ + Desired: runtimehooksv1.UpdateMachineRequestObjects{ + Machine: *cleanupMachine(s.machine), + InfrastructureMachine: runtime.RawExtension{Object: cleanupUnstructured(s.infraMachine)}, + }, + } + + if s.bootstrapConfig != nil { + request.Desired.BootstrapConfig = runtime.RawExtension{Object: cleanupUnstructured(s.bootstrapConfig)} + } + + response := &runtimehooksv1.UpdateMachineResponse{} + + if err := r.RuntimeClient.CallAllExtensions(ctx, runtimehooksv1.UpdateMachine, s.machine, request, response); err != nil { + return ctrl.Result{}, "", err + } + + if response.GetRetryAfterSeconds() != 0 { + log.Info(fmt.Sprintf("UpdateMachine hook requested retry after %d seconds", response.GetRetryAfterSeconds())) + return ctrl.Result{RequeueAfter: time.Duration(response.GetRetryAfterSeconds()) * time.Second}, response.GetMessage(), nil + } + + log.Info("UpdateMachine hook completed successfully") + return ctrl.Result{}, response.GetMessage(), nil +} + +// completeInPlaceUpdate removes in-place update annotations from InfraMachine, BootstrapConfig, Machine, +// and then marks the UpdateMachine hook as done (removes it from pending-hooks annotation). +func (r *Reconciler) completeInPlaceUpdate(ctx context.Context, s *scope) error { + log := ctrl.LoggerFrom(ctx) + + if err := r.removeInPlaceUpdateAnnotation(ctx, s.machine); err != nil { + return err + } + + if s.infraMachine == nil { + log.Info("InfraMachine not found during in-place update completion, skipping annotation removal") + } else { + if err := r.removeInPlaceUpdateAnnotation(ctx, s.infraMachine); err != nil { + return err + } + } + + if s.bootstrapConfig != nil { + if err := r.removeInPlaceUpdateAnnotation(ctx, s.bootstrapConfig); err != nil { + return err + } + } + + if err := hooks.MarkAsDone(ctx, r.Client, s.machine, runtimehooksv1.UpdateMachine); err != nil { + return err + } + + log.Info("In place upgrade completed!") + return nil +} + +// removeInPlaceUpdateAnnotation removes the in-place update annotation from an object and patches it immediately. +func (r *Reconciler) removeInPlaceUpdateAnnotation(ctx context.Context, obj client.Object) error { + annotations := obj.GetAnnotations() + if _, exists := annotations[clusterv1.UpdateInProgressAnnotation]; !exists { + return nil + } + + gvk, err := apiutil.GVKForObject(obj, r.Client.Scheme()) + if err != nil { + return errors.Wrapf(err, "failed to remove %s annotation from object %s", clusterv1.UpdateInProgressAnnotation, klog.KObj(obj)) + } + + orig := obj.DeepCopyObject().(client.Object) + delete(annotations, clusterv1.UpdateInProgressAnnotation) + obj.SetAnnotations(annotations) + + if err := r.Client.Patch(ctx, obj, client.MergeFrom(orig)); err != nil { + return errors.Wrapf(err, "failed to remove %s annotation from %s %s", clusterv1.UpdateInProgressAnnotation, gvk.Kind, klog.KObj(obj)) + } + + return nil +} + +func cleanupMachine(machine *clusterv1.Machine) *clusterv1.Machine { + return &clusterv1.Machine{ + // Set GVK because object is later marshalled with json.Marshal when the hook request is sent. + TypeMeta: metav1.TypeMeta{ + APIVersion: clusterv1.GroupVersion.String(), + Kind: "Machine", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: machine.Name, + Namespace: machine.Namespace, + Labels: machine.Labels, + Annotations: machine.Annotations, + }, + Spec: *machine.Spec.DeepCopy(), + } +} + +func cleanupUnstructured(u *unstructured.Unstructured) *unstructured.Unstructured { + cleanedUpU := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": u.GetAPIVersion(), + "kind": u.GetKind(), + "spec": u.Object["spec"], + }, + } + cleanedUpU.SetName(u.GetName()) + cleanedUpU.SetNamespace(u.GetNamespace()) + cleanedUpU.SetLabels(u.GetLabels()) + cleanedUpU.SetAnnotations(u.GetAnnotations()) + return cleanedUpU +} diff --git a/internal/controllers/machine/machine_controller_inplace_update_test.go b/internal/controllers/machine/machine_controller_inplace_update_test.go new file mode 100644 index 000000000000..05e2a55d9898 --- /dev/null +++ b/internal/controllers/machine/machine_controller_inplace_update_test.go @@ -0,0 +1,636 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + utilfeature "k8s.io/component-base/featuregate/testing" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2" + runtimehooksv1 "sigs.k8s.io/cluster-api/api/runtime/hooks/v1alpha1" + runtimev1 "sigs.k8s.io/cluster-api/api/runtime/v1beta2" + runtimecatalog "sigs.k8s.io/cluster-api/exp/runtime/catalog" + "sigs.k8s.io/cluster-api/feature" + fakeruntimeclient "sigs.k8s.io/cluster-api/internal/runtime/client/fake" +) + +func TestReconcileInPlaceUpdate(t *testing.T) { + tests := []struct { + name string + featureEnabled bool + setup func(*testing.T) (*Reconciler, *scope) + wantResult ctrl.Result + wantErr bool + wantErrContains string + wantReason string + wantMessage string + verify func(*testing.T, *WithT, context.Context, *Reconciler, *scope) + }{ + { + name: "feature gate disabled returns immediately", + featureEnabled: false, + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + return &Reconciler{}, &scope{machine: newTestMachine()} + }, + wantResult: ctrl.Result{}, + }, + { + name: "cleans up orphaned hook and annotations", + featureEnabled: true, + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + + scheme := runtime.NewScheme() + if err := clusterv1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add clusterv1 to scheme: %v", err) + } + + machine := newTestMachine() + machine.Annotations[runtimev1.PendingHooksAnnotation] = runtimecatalog.HookName(runtimehooksv1.UpdateMachine) + + infra := newTestUnstructured("GenericInfrastructureMachine", "infrastructure.cluster.x-k8s.io/v1beta2", "infra") + infra.SetAnnotations(map[string]string{clusterv1.UpdateInProgressAnnotation: ""}) + + bootstrap := newTestUnstructured("GenericBootstrapConfig", "bootstrap.cluster.x-k8s.io/v1beta2", "bootstrap") + bootstrap.SetAnnotations(map[string]string{clusterv1.UpdateInProgressAnnotation: ""}) + + client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(machine, infra, bootstrap).Build() + + return &Reconciler{Client: client}, &scope{ + machine: machine, + infraMachine: infra, + bootstrapConfig: bootstrap, + } + }, + wantResult: ctrl.Result{}, + verify: func(t *testing.T, g *WithT, ctx context.Context, r *Reconciler, s *scope) { + t.Helper() + + updatedMachine := &clusterv1.Machine{} + g.Expect(r.Client.Get(ctx, ctrlclient.ObjectKeyFromObject(s.machine), updatedMachine)).To(Succeed()) + g.Expect(updatedMachine.Annotations).ToNot(HaveKey(runtimev1.PendingHooksAnnotation)) + + updatedInfra := &unstructured.Unstructured{} + updatedInfra.SetGroupVersionKind(s.infraMachine.GroupVersionKind()) + g.Expect(r.Client.Get(ctx, ctrlclient.ObjectKeyFromObject(s.infraMachine), updatedInfra)).To(Succeed()) + g.Expect(updatedInfra.GetAnnotations()).ToNot(HaveKey(clusterv1.UpdateInProgressAnnotation)) + + if s.bootstrapConfig != nil { + updatedBootstrap := &unstructured.Unstructured{} + updatedBootstrap.SetGroupVersionKind(s.bootstrapConfig.GroupVersionKind()) + g.Expect(r.Client.Get(ctx, ctrlclient.ObjectKeyFromObject(s.bootstrapConfig), updatedBootstrap)).To(Succeed()) + g.Expect(updatedBootstrap.GetAnnotations()).ToNot(HaveKey(clusterv1.UpdateInProgressAnnotation)) + } + }, + }, + { + name: "waits for pending hook to be marked", + featureEnabled: true, + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + machine := newTestMachine() + machine.Annotations[clusterv1.UpdateInProgressAnnotation] = "" + return &Reconciler{}, &scope{machine: machine} + }, + wantResult: ctrl.Result{}, + }, + { + name: "fails when infra machine is missing", + featureEnabled: true, + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + machine := newTestMachine() + machine.Annotations[clusterv1.UpdateInProgressAnnotation] = "" + machine.Annotations[runtimev1.PendingHooksAnnotation] = runtimecatalog.HookName(runtimehooksv1.UpdateMachine) + machine.Status.Initialization.InfrastructureProvisioned = ptr.To(true) + machine.Status.Initialization.BootstrapDataSecretCreated = ptr.To(true) + return &Reconciler{}, &scope{machine: machine} + }, + wantResult: ctrl.Result{}, + wantErr: true, + wantErrContains: "InfraMachine not found", + wantReason: clusterv1.MachineInPlaceUpdateFailedReason, + wantMessage: "In-place update not possible: InfraMachine not found", + }, + { + name: "requeues while UpdateMachine hook is in progress", + featureEnabled: true, + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + + catalog := runtimecatalog.New() + if err := runtimehooksv1.AddToCatalog(catalog); err != nil { + t.Fatalf("failed to add hooks to catalog: %v", err) + } + updateGVH, err := catalog.GroupVersionHook(runtimehooksv1.UpdateMachine) + if err != nil { + t.Fatalf("failed to look up UpdateMachine hook: %v", err) + } + + runtimeClient := fakeruntimeclient.NewRuntimeClientBuilder(). + WithCatalog(catalog). + WithGetAllExtensionResponses(map[runtimecatalog.GroupVersionHook][]string{ + updateGVH: {"test-extension"}, + }). + WithCallAllExtensionResponses(map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject{ + updateGVH: &runtimehooksv1.UpdateMachineResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + Message: "processing", + }, + RetryAfterSeconds: 30, + }, + }, + }). + Build() + + scheme := runtime.NewScheme() + if err := clusterv1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add clusterv1 to scheme: %v", err) + } + + machine := newTestMachine() + machine.Annotations[clusterv1.UpdateInProgressAnnotation] = "" + machine.Annotations[runtimev1.PendingHooksAnnotation] = runtimecatalog.HookName(runtimehooksv1.UpdateMachine) + machine.Status.Initialization.InfrastructureProvisioned = ptr.To(true) + machine.Status.Initialization.BootstrapDataSecretCreated = ptr.To(true) + + infra := newTestUnstructured("GenericInfrastructureMachine", "infrastructure.cluster.x-k8s.io/v1beta2", "infra") + infra.SetAnnotations(map[string]string{clusterv1.UpdateInProgressAnnotation: ""}) + + client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(machine, infra).Build() + + return &Reconciler{ + Client: client, + RuntimeClient: runtimeClient, + }, &scope{ + machine: machine, + infraMachine: infra, + } + }, + wantResult: ctrl.Result{RequeueAfter: 30 * time.Second}, + wantReason: clusterv1.MachineInPlaceUpdatingReason, + wantMessage: "In-place update in progress: processing", + }, + { + name: "completes successfully and cleans annotations", + featureEnabled: true, + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + + catalog := runtimecatalog.New() + if err := runtimehooksv1.AddToCatalog(catalog); err != nil { + t.Fatalf("failed to add hooks to catalog: %v", err) + } + updateGVH, err := catalog.GroupVersionHook(runtimehooksv1.UpdateMachine) + if err != nil { + t.Fatalf("failed to look up UpdateMachine hook: %v", err) + } + + runtimeClient := fakeruntimeclient.NewRuntimeClientBuilder(). + WithCatalog(catalog). + WithGetAllExtensionResponses(map[runtimecatalog.GroupVersionHook][]string{ + updateGVH: {"test-extension"}, + }). + WithCallAllExtensionResponses(map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject{ + updateGVH: &runtimehooksv1.UpdateMachineResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + Message: "done", + }, + RetryAfterSeconds: 0, + }, + }, + }). + Build() + + scheme := runtime.NewScheme() + if err := clusterv1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add clusterv1 to scheme: %v", err) + } + + machine := newTestMachine() + machine.Annotations[clusterv1.UpdateInProgressAnnotation] = "" + machine.Annotations[runtimev1.PendingHooksAnnotation] = runtimecatalog.HookName(runtimehooksv1.UpdateMachine) + machine.Status.Initialization.InfrastructureProvisioned = ptr.To(true) + machine.Status.Initialization.BootstrapDataSecretCreated = ptr.To(true) + + infra := newTestUnstructured("GenericInfrastructureMachine", "infrastructure.cluster.x-k8s.io/v1beta2", "infra") + infra.SetAnnotations(map[string]string{clusterv1.UpdateInProgressAnnotation: ""}) + + bootstrap := newTestUnstructured("GenericBootstrapConfig", "bootstrap.cluster.x-k8s.io/v1beta2", "bootstrap") + bootstrap.SetAnnotations(map[string]string{clusterv1.UpdateInProgressAnnotation: ""}) + + client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(machine, infra, bootstrap).Build() + + return &Reconciler{ + Client: client, + RuntimeClient: runtimeClient, + }, &scope{ + machine: machine, + infraMachine: infra, + bootstrapConfig: bootstrap, + } + }, + wantResult: ctrl.Result{}, + verify: func(t *testing.T, g *WithT, ctx context.Context, r *Reconciler, s *scope) { + t.Helper() + + updatedMachine := &clusterv1.Machine{} + g.Expect(r.Client.Get(ctx, ctrlclient.ObjectKeyFromObject(s.machine), updatedMachine)).To(Succeed()) + g.Expect(updatedMachine.Annotations).ToNot(HaveKey(clusterv1.UpdateInProgressAnnotation)) + g.Expect(updatedMachine.Annotations).ToNot(HaveKey(runtimev1.PendingHooksAnnotation)) + + updatedInfra := &unstructured.Unstructured{} + updatedInfra.SetGroupVersionKind(s.infraMachine.GroupVersionKind()) + g.Expect(r.Client.Get(ctx, ctrlclient.ObjectKeyFromObject(s.infraMachine), updatedInfra)).To(Succeed()) + g.Expect(updatedInfra.GetAnnotations()).ToNot(HaveKey(clusterv1.UpdateInProgressAnnotation)) + + if s.bootstrapConfig != nil { + updatedBootstrap := &unstructured.Unstructured{} + updatedBootstrap.SetGroupVersionKind(s.bootstrapConfig.GroupVersionKind()) + g.Expect(r.Client.Get(ctx, ctrlclient.ObjectKeyFromObject(s.bootstrapConfig), updatedBootstrap)).To(Succeed()) + g.Expect(updatedBootstrap.GetAnnotations()).ToNot(HaveKey(clusterv1.UpdateInProgressAnnotation)) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + utilfeature.SetFeatureGateDuringTest(t, feature.Gates, feature.InPlaceUpdates, tt.featureEnabled) + + r, scope := tt.setup(t) + ctx := context.Background() + + result, err := r.reconcileInPlaceUpdate(ctx, scope) + + if tt.wantErr { + g.Expect(err).To(HaveOccurred()) + if tt.wantErrContains != "" { + g.Expect(err.Error()).To(ContainSubstring(tt.wantErrContains)) + } + } else { + g.Expect(err).ToNot(HaveOccurred()) + } + + g.Expect(result).To(Equal(tt.wantResult)) + + if tt.wantReason != "" { + g.Expect(scope.updatingReason).To(Equal(tt.wantReason)) + } else { + g.Expect(scope.updatingReason).To(BeEmpty()) + } + + if tt.wantMessage != "" { + g.Expect(scope.updatingMessage).To(Equal(tt.wantMessage)) + } else { + g.Expect(scope.updatingMessage).To(BeEmpty()) + } + + if tt.verify != nil { + tt.verify(t, g, ctx, r, scope) + } + }) + } +} + +func TestCallUpdateMachineHook(t *testing.T) { + catalog := runtimecatalog.New() + if err := runtimehooksv1.AddToCatalog(catalog); err != nil { + t.Fatalf("failed to add hooks to catalog: %v", err) + } + updateGVH, err := catalog.GroupVersionHook(runtimehooksv1.UpdateMachine) + if err != nil { + t.Fatalf("failed to determine UpdateMachine hook: %v", err) + } + + tests := []struct { + name string + setup func(*testing.T) (*Reconciler, *scope) + wantResult ctrl.Result + wantMessage string + wantErr bool + wantErrSubstrings []string + }{ + { + name: "fails if no extensions registered", + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + runtimeClient := fakeruntimeclient.NewRuntimeClientBuilder(). + WithCatalog(catalog). + WithGetAllExtensionResponses(map[runtimecatalog.GroupVersionHook][]string{}). + Build() + return &Reconciler{RuntimeClient: runtimeClient}, &scope{machine: newTestMachine(), infraMachine: newTestUnstructured("GenericInfrastructureMachine", "infrastructure.cluster.x-k8s.io/v1beta2", "infra")} + }, + wantErr: true, + wantErrSubstrings: []string{"no extensions registered for UpdateMachine hook"}, + }, + { + name: "fails if multiple extensions registered", + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + runtimeClient := fakeruntimeclient.NewRuntimeClientBuilder(). + WithCatalog(catalog). + WithGetAllExtensionResponses(map[runtimecatalog.GroupVersionHook][]string{ + updateGVH: {"ext-a", "ext-b"}, + }). + Build() + return &Reconciler{RuntimeClient: runtimeClient}, &scope{machine: newTestMachine(), infraMachine: newTestUnstructured("GenericInfrastructureMachine", "infrastructure.cluster.x-k8s.io/v1beta2", "infra")} + }, + wantErr: true, + wantErrSubstrings: []string{ + "multiple extensions registered for UpdateMachine hook", + "only one extension is supported", + "ext-a", + "ext-b", + }, + }, + { + name: "fails when hook invocation returns error", + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + runtimeClient := fakeruntimeclient.NewRuntimeClientBuilder(). + WithCatalog(catalog). + WithGetAllExtensionResponses(map[runtimecatalog.GroupVersionHook][]string{ + updateGVH: {"ext"}, + }). + WithCallAllExtensionResponses(map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject{ + updateGVH: &runtimehooksv1.UpdateMachineResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{Status: runtimehooksv1.ResponseStatusFailure}, + }, + }, + }). + Build() + return &Reconciler{RuntimeClient: runtimeClient}, &scope{machine: newTestMachine(), infraMachine: newTestUnstructured("GenericInfrastructureMachine", "infrastructure.cluster.x-k8s.io/v1beta2", "infra")} + }, + wantErr: true, + wantErrSubstrings: []string{"runtime hook", "UpdateMachine", "failed"}, + }, + { + name: "returns requeue when hook succeeds with retry", + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + runtimeClient := fakeruntimeclient.NewRuntimeClientBuilder(). + WithCatalog(catalog). + WithGetAllExtensionResponses(map[runtimecatalog.GroupVersionHook][]string{ + updateGVH: {"ext"}, + }). + WithCallAllExtensionResponses(map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject{ + updateGVH: &runtimehooksv1.UpdateMachineResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + Message: "processing", + }, + RetryAfterSeconds: 45, + }, + }, + }). + Build() + return &Reconciler{RuntimeClient: runtimeClient}, &scope{machine: newTestMachine(), infraMachine: newTestUnstructured("GenericInfrastructureMachine", "infrastructure.cluster.x-k8s.io/v1beta2", "infra")} + }, + wantResult: ctrl.Result{RequeueAfter: 45 * time.Second}, + wantMessage: "processing", + }, + { + name: "returns message when hook succeeds", + setup: func(t *testing.T) (*Reconciler, *scope) { + t.Helper() + runtimeClient := fakeruntimeclient.NewRuntimeClientBuilder(). + WithCatalog(catalog). + WithGetAllExtensionResponses(map[runtimecatalog.GroupVersionHook][]string{ + updateGVH: {"ext"}, + }). + WithCallAllExtensionResponses(map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject{ + updateGVH: &runtimehooksv1.UpdateMachineResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + Message: "done", + }, + }, + }, + }). + Build() + return &Reconciler{RuntimeClient: runtimeClient}, &scope{machine: newTestMachine(), infraMachine: newTestUnstructured("GenericInfrastructureMachine", "infrastructure.cluster.x-k8s.io/v1beta2", "infra")} + }, + wantResult: ctrl.Result{}, + wantMessage: "done", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + r, scope := tt.setup(t) + result, message, err := r.callUpdateMachineHook(context.Background(), scope) + + if tt.wantErr { + g.Expect(err).To(HaveOccurred()) + for _, substr := range tt.wantErrSubstrings { + g.Expect(err.Error()).To(ContainSubstring(substr)) + } + return + } + + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(result).To(Equal(tt.wantResult)) + g.Expect(message).To(Equal(tt.wantMessage)) + }) + } +} + +func TestRemoveInPlaceUpdateAnnotation(t *testing.T) { + tests := []struct { + name string + setup func(*testing.T) (*Reconciler, ctrlclient.Client, *clusterv1.Machine) + verify func(*WithT, context.Context, ctrlclient.Client, *clusterv1.Machine) + }{ + { + name: "removes annotation when present", + setup: func(t *testing.T) (*Reconciler, ctrlclient.Client, *clusterv1.Machine) { + t.Helper() + scheme := runtime.NewScheme() + if err := clusterv1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add clusterv1 to scheme: %v", err) + } + + machine := &clusterv1.Machine{ObjectMeta: metav1.ObjectMeta{Name: "machine", Namespace: "default", Annotations: map[string]string{clusterv1.UpdateInProgressAnnotation: ""}}} + client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(machine).Build() + return &Reconciler{Client: client}, client, machine + }, + verify: func(g *WithT, ctx context.Context, c ctrlclient.Client, machine *clusterv1.Machine) { + updated := &clusterv1.Machine{} + g.Expect(c.Get(ctx, ctrlclient.ObjectKeyFromObject(machine), updated)).To(Succeed()) + g.Expect(updated.Annotations).ToNot(HaveKey(clusterv1.UpdateInProgressAnnotation)) + }, + }, + { + name: "no-op when annotation missing", + setup: func(t *testing.T) (*Reconciler, ctrlclient.Client, *clusterv1.Machine) { + t.Helper() + scheme := runtime.NewScheme() + if err := clusterv1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add clusterv1 to scheme: %v", err) + } + + machine := &clusterv1.Machine{ObjectMeta: metav1.ObjectMeta{Name: "machine", Namespace: "default"}} + client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(machine).Build() + return &Reconciler{Client: client}, client, machine + }, + verify: func(g *WithT, ctx context.Context, c ctrlclient.Client, machine *clusterv1.Machine) { + updated := &clusterv1.Machine{} + g.Expect(c.Get(ctx, ctrlclient.ObjectKeyFromObject(machine), updated)).To(Succeed()) + g.Expect(updated.Annotations).ToNot(HaveKey(clusterv1.UpdateInProgressAnnotation)) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + r, client, machine := tt.setup(t) + ctx := context.Background() + g.Expect(r.removeInPlaceUpdateAnnotation(ctx, machine)).To(Succeed()) + + if tt.verify != nil { + tt.verify(g, ctx, client, machine) + } + }) + } +} + +func TestCompleteInPlaceUpdate_MissingInfra(t *testing.T) { + g := NewWithT(t) + + scheme := runtime.NewScheme() + if err := clusterv1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add clusterv1 to scheme: %v", err) + } + + machine := &clusterv1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Name: "machine", + Namespace: "default", + Annotations: map[string]string{}, + }, + } + client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(machine).Build() + + r := &Reconciler{Client: client} + scope := &scope{machine: machine, infraMachine: nil} + + err := r.completeInPlaceUpdate(context.Background(), scope) + g.Expect(err).ToNot(HaveOccurred()) +} + +func TestCleanupMachine(t *testing.T) { + g := NewWithT(t) + + original := &clusterv1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Name: "machine", + Namespace: "default", + Labels: map[string]string{"key": "value"}, + Annotations: map[string]string{"anno": "value"}, + }, + } + original.Status.Phase = "Running" + + cleaned := cleanupMachine(original) + + g.Expect(cleaned.APIVersion).To(Equal(clusterv1.GroupVersion.String())) + g.Expect(cleaned.Kind).To(Equal("Machine")) + g.Expect(cleaned.Name).To(Equal("machine")) + g.Expect(cleaned.Namespace).To(Equal("default")) + g.Expect(cleaned.Labels).To(HaveKeyWithValue("key", "value")) + g.Expect(cleaned.Annotations).To(HaveKeyWithValue("anno", "value")) + g.Expect(cleaned.Status).To(BeZero()) +} + +func TestCleanupUnstructured(t *testing.T) { + g := NewWithT(t) + + original := &unstructured.Unstructured{Object: map[string]interface{}{}} + original.SetAPIVersion("infrastructure.cluster.x-k8s.io/v1beta2") + original.SetKind("GenericInfrastructureMachine") + original.SetName("infra") + original.SetNamespace("default") + original.SetLabels(map[string]string{"key": "value"}) + original.SetAnnotations(map[string]string{"anno": "value"}) + original.Object["spec"] = map[string]interface{}{"field": "value"} + original.Object["status"] = map[string]interface{}{"state": "ready"} + + cleaned := cleanupUnstructured(original) + + g.Expect(cleaned.GetAPIVersion()).To(Equal(original.GetAPIVersion())) + g.Expect(cleaned.GetKind()).To(Equal(original.GetKind())) + g.Expect(cleaned.GetName()).To(Equal(original.GetName())) + g.Expect(cleaned.GetNamespace()).To(Equal(original.GetNamespace())) + g.Expect(cleaned.GetLabels()).To(HaveKeyWithValue("key", "value")) + g.Expect(cleaned.GetAnnotations()).To(HaveKeyWithValue("anno", "value")) + + spec, found, err := unstructured.NestedMap(cleaned.Object, "spec") + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(found).To(BeTrue()) + g.Expect(spec).To(HaveKeyWithValue("field", "value")) + + _, found, err = unstructured.NestedFieldCopy(cleaned.Object, "status") + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(found).To(BeFalse()) +} + +func newTestMachine() *clusterv1.Machine { + return &clusterv1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Name: "machine", + Namespace: "default", + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Spec: clusterv1.MachineSpec{}, + } +} + +func newTestUnstructured(kind, apiVersion, name string) *unstructured.Unstructured { + u := &unstructured.Unstructured{Object: map[string]interface{}{}} + u.SetAPIVersion(apiVersion) + u.SetKind(kind) + u.SetNamespace("default") + u.SetName(name) + u.SetLabels(map[string]string{}) + u.SetAnnotations(map[string]string{}) + u.Object["spec"] = map[string]interface{}{"field": "value"} + return u +} diff --git a/internal/controllers/machine/machine_controller_status.go b/internal/controllers/machine/machine_controller_status.go index 59c70cac013f..c1e7a74dc837 100644 --- a/internal/controllers/machine/machine_controller_status.go +++ b/internal/controllers/machine/machine_controller_status.go @@ -65,6 +65,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, s *scope) { // Note: also other controllers adds conditions to the machine object (machine's owner controller sets the UpToDate condition, // MHC controller sets HealthCheckSucceeded and OwnerRemediated conditions, KCP sets conditions about etcd and control plane pods). setDeletingCondition(ctx, s.machine, s.reconcileDeleteExecuted, s.deletingReason, s.deletingMessage) + setUpdatingCondition(ctx, s.machine, s.updatingReason, s.updatingMessage) setReadyCondition(ctx, s.machine) setAvailableCondition(ctx, s.machine) @@ -633,6 +634,24 @@ func setDeletingCondition(_ context.Context, machine *clusterv1.Machine, reconci }) } +func setUpdatingCondition(_ context.Context, machine *clusterv1.Machine, updatingReason, updatingMessage string) { + if updatingReason == "" { + conditions.Set(machine, metav1.Condition{ + Type: clusterv1.MachineUpdatingCondition, + Status: metav1.ConditionFalse, + Reason: clusterv1.MachineNotUpdatingReason, + }) + return + } + + conditions.Set(machine, metav1.Condition{ + Type: clusterv1.MachineUpdatingCondition, + Status: metav1.ConditionTrue, + Reason: updatingReason, + Message: updatingMessage, + }) +} + func setReadyCondition(ctx context.Context, machine *clusterv1.Machine) { log := ctrl.LoggerFrom(ctx) diff --git a/main.go b/main.go index f8056b6efc17..8f6f050fafd8 100644 --- a/main.go +++ b/main.go @@ -672,6 +672,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager, watchNamespaces map Client: mgr.GetClient(), APIReader: mgr.GetAPIReader(), ClusterCache: clusterCache, + RuntimeClient: runtimeClient, WatchFilterValue: watchFilterValue, RemoteConditionsGracePeriod: remoteConditionsGracePeriod, AdditionalSyncMachineLabels: additionalSyncMachineLabelRegexes,