From 9b8cfbceda71bebc5f599cbb44f945306a720d86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sun, 20 Jul 2025 00:25:25 +0800 Subject: [PATCH 1/6] fix uncertain cache fix wrong error is used when checking final error. map is not safe for concurrent read/write from multiple worker. Also fixed that. Added related tests. --- go.mod | 2 +- pkg/controller/controller.go | 2 +- pkg/modifycontroller/controller.go | 13 +-- pkg/modifycontroller/controller_test.go | 89 ++++++++++++----- pkg/modifycontroller/modify_status.go | 23 ----- pkg/modifycontroller/modify_status_test.go | 107 +-------------------- pkg/modifycontroller/modify_volume.go | 12 +-- pkg/modifycontroller/modify_volume_test.go | 40 ++++++++ 8 files changed, 118 insertions(+), 170 deletions(-) diff --git a/go.mod b/go.mod index 9342b08ac..43e22abba 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( k8s.io/component-base v0.33.1 k8s.io/csi-translation-lib v0.33.0 k8s.io/klog/v2 v2.130.1 + k8s.io/utils v0.0.0-20241210054802-24370beab758 ) require ( @@ -69,7 +70,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect - k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index c3d1b39d0..4fba3ccf1 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -291,7 +291,7 @@ func (ctrl *resizeController) Run(workers int, ctx context.Context) { go ctrl.slowSet.Run(stopCh) } - for i := 0; i < workers; i++ { + for range workers { go wait.Until(ctrl.syncPVCs, 0, stopCh) } diff --git a/pkg/modifycontroller/controller.go b/pkg/modifycontroller/controller.go index 1895a3553..2b41ee00f 100644 --- a/pkg/modifycontroller/controller.go +++ b/pkg/modifycontroller/controller.go @@ -19,6 +19,7 @@ package modifycontroller import ( "context" "fmt" + "sync" "time" "github.com/kubernetes-csi/external-resizer/pkg/util" @@ -61,7 +62,7 @@ type modifyController struct { vacListerSynced cache.InformerSynced extraModifyMetadata bool // the key of the map is {PVC_NAMESPACE}/{PVC_NAME} - uncertainPVCs map[string]v1.PersistentVolumeClaim + uncertainPVCs sync.Map // slowSet tracks PVCs for which modification failed with infeasible error and should be retried at slower rate. slowSet *slowset.SlowSet } @@ -121,7 +122,6 @@ func NewModifyController( } func (ctrl *modifyController) initUncertainPVCs() error { - ctrl.uncertainPVCs = make(map[string]v1.PersistentVolumeClaim) allPVCs, err := ctrl.pvcLister.List(labels.Everything()) if err != nil { klog.Errorf("Failed to list pvcs when init uncertain pvcs: %v", err) @@ -133,7 +133,7 @@ func (ctrl *modifyController) initUncertainPVCs() error { if err != nil { return err } - ctrl.uncertainPVCs[pvcKey] = *pvc.DeepCopy() + ctrl.uncertainPVCs.Store(pvcKey, pvc) } } @@ -187,10 +187,7 @@ func (ctrl *modifyController) deletePVC(obj interface{}) { } func (ctrl *modifyController) init(ctx context.Context) bool { - informersSyncd := []cache.InformerSynced{ctrl.pvListerSynced, ctrl.pvcListerSynced} - informersSyncd = append(informersSyncd, ctrl.vacListerSynced) - - if !cache.WaitForCacheSync(ctx.Done(), informersSyncd...) { + if !cache.WaitForCacheSync(ctx.Done(), ctrl.pvListerSynced, ctrl.pvcListerSynced, ctrl.vacListerSynced) { klog.ErrorS(nil, "Cannot sync pod, pv, pvc or vac caches") return false } @@ -220,7 +217,7 @@ func (ctrl *modifyController) Run( // Starts go-routine that deletes expired slowSet entries. go ctrl.slowSet.Run(stopCh) - for i := 0; i < workers; i++ { + for range workers { go wait.Until(ctrl.sync, 0, stopCh) } diff --git a/pkg/modifycontroller/controller_test.go b/pkg/modifycontroller/controller_test.go index 3acd842e1..2eb4de4f6 100644 --- a/pkg/modifycontroller/controller_test.go +++ b/pkg/modifycontroller/controller_test.go @@ -1,7 +1,6 @@ package modifycontroller import ( - "context" "errors" "fmt" "testing" @@ -14,7 +13,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" - storagev1beta1 "k8s.io/api/storage/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -321,6 +319,65 @@ func TestInfeasibleRetry(t *testing.T) { } } +// Intended to catch any race conditions in the controller +func TestConcurrentSync(t *testing.T) { + cases := []struct { + name string + waitCount int + err error + }{ + // TODO: This case is flaky due to fake client lacks resourceVersion support. + // { + // name: "success", + // waitCount: 10, + // }, + { + name: "uncertain", + waitCount: 30, + err: nonFinalErr, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + client.SetModifyError(tc.err) + + initialObjects := []runtime.Object{testVacObject, targetVacObject} + for i := range 10 { + initialObjects = append(initialObjects, + &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: pvcNamespace}, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeAttributesClassName: &testVac, + VolumeName: fmt.Sprintf("testPV-%d", i), + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + }, + }, + &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("testPV-%d", i)}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + CSI: &v1.CSIPersistentVolumeSource{ + Driver: testDriverName, + VolumeHandle: fmt.Sprintf("foo-%d", i), + }, + }, + }, + }, + ) + } + ctrlInstance := setupFakeK8sEnvironment(t, client, initialObjects) + go ctrlInstance.Run(3, t.Context()) + + for client.GetModifyCount() < tc.waitCount { + time.Sleep(20 * time.Millisecond) + } + }) + } +} + // setupFakeK8sEnvironment creates fake K8s environment and starts Informers and ModifyController func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObjects []runtime.Object) *modifyController { t.Helper() @@ -329,11 +386,9 @@ func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObject /* Create fake kubeClient, Informers, and ModifyController */ kubeClient, informerFactory := fakeK8s(initialObjects) - pvInformer := informerFactory.Core().V1().PersistentVolumes() - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses() - driverName, _ := client.GetDriverName(context.TODO()) + ctx := t.Context() + driverName, _ := client.GetDriverName(ctx) csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName) if err != nil { @@ -346,26 +401,10 @@ func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObject workqueue.DefaultTypedControllerRateLimiter[string]()) /* Start informers and ModifyController*/ - stopCh := make(chan struct{}) - informerFactory.Start(stopCh) - - go controller.Run(1, t.Context()) - - /* Add initial objects to informer caches */ - for _, obj := range initialObjects { - switch obj.(type) { - case *v1.PersistentVolume: - pvInformer.Informer().GetStore().Add(obj) - case *v1.PersistentVolumeClaim: - pvcInformer.Informer().GetStore().Add(obj) - case *storagev1beta1.VolumeAttributesClass: - vacInformer.Informer().GetStore().Add(obj) - default: - t.Fatalf("Test %s: Unknown initalObject type: %+v", t.Name(), obj) - } - } + informerFactory.Start(ctx.Done()) - ctrlInstance, _ := controller.(*modifyController) + ctrlInstance := controller.(*modifyController) + ctrlInstance.init(ctx) return ctrlInstance } diff --git a/pkg/modifycontroller/modify_status.go b/pkg/modifycontroller/modify_status.go index eb7c70320..f39c08419 100644 --- a/pkg/modifycontroller/modify_status.go +++ b/pkg/modifycontroller/modify_status.go @@ -22,7 +22,6 @@ import ( "github.com/kubernetes-csi/external-resizer/pkg/util" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" ) @@ -61,16 +60,6 @@ func (ctrl *modifyController) markControllerModifyVolumeStatus( if err != nil { return pvc, fmt.Errorf("mark PVC %q as modify volume failed, errored with: %v", pvc.Name, err) } - // Remove this PVC from the uncertain cache since the status is known now - if modifyVolumeStatus == v1.PersistentVolumeClaimModifyVolumeInfeasible { - pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) - if err != nil { - return pvc, err - } - - ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey) - ctrl.markForSlowRetry(pvc, pvcKey) - } return updatedPVC, nil } @@ -144,15 +133,3 @@ func clearModifyVolumeConditions(conditions []v1.PersistentVolumeClaimCondition) } return knownConditions } - -// removePVCFromModifyVolumeUncertainCache removes the pvc from the uncertain cache -func (ctrl *modifyController) removePVCFromModifyVolumeUncertainCache(pvcKey string) { - if ctrl.uncertainPVCs == nil { - return - } - // Format of the key of the uncertainPVCs is NAMESPACE/NAME of the pvc - _, ok := ctrl.uncertainPVCs[pvcKey] - if ok { - delete(ctrl.uncertainPVCs, pvcKey) - } -} diff --git a/pkg/modifycontroller/modify_status_test.go b/pkg/modifycontroller/modify_status_test.go index 50c7de27a..431befc7a 100644 --- a/pkg/modifycontroller/modify_status_test.go +++ b/pkg/modifycontroller/modify_status_test.go @@ -14,13 +14,11 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" - storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" featuregatetesting "k8s.io/component-base/featuregate/testing" ) @@ -38,6 +36,7 @@ var ( testDriverName = "mock" infeasibleErr = status.Errorf(codes.InvalidArgument, "Parameters in VolumeAttributesClass is invalid") finalErr = status.Errorf(codes.Internal, "Final error") + nonFinalErr = status.Errorf(codes.Aborted, "Non-final error") pvcConditionInProgress = v1.PersistentVolumeClaimCondition{ Type: v1.PersistentVolumeClaimVolumeModifyingVolume, Status: v1.ConditionTrue, @@ -273,110 +272,6 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) { } } -func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) { - basePVC := testutil.MakeTestPVC([]v1.PersistentVolumeClaimCondition{}) - basePVC.WithModifyVolumeStatus(v1.PersistentVolumeClaimModifyVolumeInProgress) - secondPVC := testutil.GetTestPVC("test-vol0", "2G", "1G", "", "") - secondPVC.Status.Phase = v1.ClaimBound - secondPVC.Status.ModifyVolumeStatus = &v1.ModifyVolumeStatus{} - secondPVC.Status.ModifyVolumeStatus.Status = v1.PersistentVolumeClaimModifyVolumeInfeasible - - tests := []struct { - name string - pvc *v1.PersistentVolumeClaim - }{ - { - name: "should delete the target pvc but keep the others in the cache", - pvc: basePVC.Get(), - }, - } - - for _, test := range tests { - tc := test - t.Run(tc.name, func(t *testing.T) { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true) - client := csi.NewMockClient("foo", true, true, true, true, true, false) - driverName, _ := client.GetDriverName(context.TODO()) - - var initialObjects []runtime.Object - initialObjects = append(initialObjects, test.pvc) - initialObjects = append(initialObjects, secondPVC) - - kubeClient, informerFactory := fakeK8s(initialObjects) - pvInformer := informerFactory.Core().V1().PersistentVolumes() - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - podInformer := informerFactory.Core().V1().Pods() - vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses() - - csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName) - if err != nil { - t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err) - } - controller := NewModifyController(driverName, - csiModifier, kubeClient, - time.Second, 2*time.Minute, false, informerFactory, - workqueue.DefaultTypedControllerRateLimiter[string]()) - - ctrlInstance, _ := controller.(*modifyController) - - stopCh := make(chan struct{}) - informerFactory.Start(stopCh) - - success := ctrlInstance.init(t.Context()) - if !success { - t.Fatal("failed to init controller") - } - - for _, obj := range initialObjects { - switch obj.(type) { - case *v1.PersistentVolume: - pvInformer.Informer().GetStore().Add(obj) - case *v1.PersistentVolumeClaim: - pvcInformer.Informer().GetStore().Add(obj) - case *v1.Pod: - podInformer.Informer().GetStore().Add(obj) - case *storagev1beta1.VolumeAttributesClass: - vacInformer.Informer().GetStore().Add(obj) - default: - t.Fatalf("Test %s: Unknown initalObject type: %+v", test.name, obj) - } - } - - time.Sleep(time.Second * 2) - - pvcKey, err := cache.MetaNamespaceKeyFunc(tc.pvc) - if err != nil { - t.Errorf("failed to extract pvc key from pvc %v", tc.pvc) - } - ctrlInstance.removePVCFromModifyVolumeUncertainCache(pvcKey) - - deletedPVCKey, err := cache.MetaNamespaceKeyFunc(tc.pvc) - if err != nil { - t.Errorf("failed to extract pvc key from pvc %v", tc.pvc) - } - _, ok := ctrlInstance.uncertainPVCs[deletedPVCKey] - if ok { - t.Errorf("pvc %v should be deleted but it is still in the uncertainPVCs cache", tc.pvc) - } - if err != nil { - t.Errorf("err get pvc %v from uncertainPVCs: %v", tc.pvc, err) - } - - notDeletedPVCKey, err := cache.MetaNamespaceKeyFunc(secondPVC) - if err != nil { - t.Errorf("failed to extract pvc key from secondPVC %v", secondPVC) - } - _, ok = ctrlInstance.uncertainPVCs[notDeletedPVCKey] - if !ok { - t.Errorf("pvc %v should not be deleted, uncertainPVCs list %v", secondPVC, ctrlInstance.uncertainPVCs) - } - if err != nil { - t.Errorf("err get pvc %v from uncertainPVCs: %v", secondPVC, err) - } - }) - } -} - func createTestPV(capacityGB int, pvcName, pvcNamespace string, pvcUID types.UID, volumeMode *v1.PersistentVolumeMode, vacName string) *v1.PersistentVolume { capacity := testutil.QuantityGB(capacityGB) diff --git a/pkg/modifycontroller/modify_volume.go b/pkg/modifycontroller/modify_volume.go index 7523ab624..763ec02b7 100644 --- a/pkg/modifycontroller/modify_volume.go +++ b/pkg/modifycontroller/modify_volume.go @@ -57,7 +57,7 @@ func (ctrl *modifyController) modify(pvc *v1.PersistentVolumeClaim, pv *v1.Persi return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) } else if pvcSpecVacName != nil && curVacName != nil && *pvcSpecVacName != *curVacName { // Check if PVC in uncertain state - _, inUncertainState := ctrl.uncertainPVCs[pvcKey] + _, inUncertainState := ctrl.uncertainPVCs.Load(pvcKey) if !inUncertainState { klog.V(3).InfoS("previous operation on the PVC failed with a final error, retrying") return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) @@ -119,7 +119,7 @@ func (ctrl *modifyController) controllerModifyVolumeWithTarget( if err == nil { klog.V(4).Infof("Update volumeAttributesClass of PV %q to %s succeeded", pv.Name, *pvcSpecVacName) // Record an event to indicate that modify operation is successful. - ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.VolumeModifySuccess, fmt.Sprintf("external resizer modified volume %s with vac %s successfully ", pvc.Name, vacObj.Name)) + ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.VolumeModifySuccess, fmt.Sprintf("external resizer modified volume %s with vac %s successfully", pvc.Name, vacObj.Name)) return pvc, pv, nil, true } else { errStatus, ok := status.FromError(err) @@ -132,9 +132,9 @@ func (ctrl *modifyController) controllerModifyVolumeWithTarget( if keyErr != nil { return pvc, pv, keyErr, false } - if !util.IsFinalError(keyErr) { + if !util.IsFinalError(err) { // update conditions and cache pvc as uncertain - ctrl.uncertainPVCs[pvcKey] = *pvc + ctrl.uncertainPVCs.Store(pvcKey, pvc) } else { // Only InvalidArgument can be set to Infeasible state // Final errors other than InvalidArgument will still be in InProgress state @@ -146,10 +146,10 @@ func (ctrl *modifyController) controllerModifyVolumeWithTarget( } ctrl.markForSlowRetry(pvc, pvcKey) } - ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey) + ctrl.uncertainPVCs.Delete(pvcKey) } } else { - return pvc, pv, fmt.Errorf("cannot get error status from modify volume err: %v ", err), false + return pvc, pv, fmt.Errorf("cannot get error status from modify volume err: %v", err), false } // Record an event to indicate that modify operation is failed. ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeModifyFailed, err.Error()) diff --git a/pkg/modifycontroller/modify_volume_test.go b/pkg/modifycontroller/modify_volume_test.go index 97490c478..e35b78d5f 100644 --- a/pkg/modifycontroller/modify_volume_test.go +++ b/pkg/modifycontroller/modify_volume_test.go @@ -1,6 +1,8 @@ package modifycontroller import ( + "errors" + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -153,6 +155,44 @@ func TestModify(t *testing.T) { } } +func TestModifyUncertain(t *testing.T) { + basePVC := createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, targetVac /*targetVacName*/) + basePVC.Status.ModifyVolumeStatus.Status = v1.PersistentVolumeClaimModifyVolumeInProgress + basePV := createTestPV(1, pvcName, pvcNamespace, "foobaz" /*pvcUID*/, &fsVolumeMode, testVac) + + client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + initialObjects := []runtime.Object{testVacObject, targetVacObject, basePVC, basePV} + ctrlInstance := setupFakeK8sEnvironment(t, client, initialObjects) + + pvcKey := fmt.Sprintf("%s/%s", pvcNamespace, pvcName) + assertUncertain := func(uncertain bool) { + t.Helper() + _, ok := ctrlInstance.uncertainPVCs.Load(pvcKey) + if ok != uncertain { + t.Fatalf("expected uncertain state to be %v, got %v", uncertain, ok) + } + } + + // initialized to uncertain + assertUncertain(true) + + client.SetModifyError(finalErr) + pvc, pv, err, _ := ctrlInstance.modify(basePVC, basePV) + if !errors.Is(err, finalErr) { + t.Fatalf("expected error to be %v, got %v", finalErr, err) + } + // should clear uncertain state + assertUncertain(false) + + client.SetModifyError(nonFinalErr) + _, _, err, _ = ctrlInstance.modify(pvc, pv) + if !errors.Is(err, nonFinalErr) { + t.Fatalf("expected error to be %v, got %v", nonFinalErr, err) + } + // should enter uncertain state again + assertUncertain(true) +} + func createTestPVC(pvcName string, vacName string, curVacName string, targetVacName string) *v1.PersistentVolumeClaim { pvc := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: pvcNamespace}, From 5459e0763c9a0830d77af3bfbf5d5433f03e0598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sun, 24 Aug 2025 01:02:05 +0800 Subject: [PATCH 2/6] add a missing space in condition message --- pkg/modifycontroller/modify_status.go | 2 +- pkg/modifycontroller/modify_status_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/modifycontroller/modify_status.go b/pkg/modifycontroller/modify_status.go index f39c08419..a57684ea4 100644 --- a/pkg/modifycontroller/modify_status.go +++ b/pkg/modifycontroller/modify_status.go @@ -47,7 +47,7 @@ func (ctrl *modifyController) markControllerModifyVolumeStatus( case v1.PersistentVolumeClaimModifyVolumeInProgress: conditionMessage = "ModifyVolume operation in progress." case v1.PersistentVolumeClaimModifyVolumeInfeasible: - conditionMessage = "ModifyVolume failed with error" + err.Error() + ". Waiting for retry." + conditionMessage = "ModifyVolume failed with error " + err.Error() + ". Waiting for retry." } pvcCondition.Message = conditionMessage // Do not change conditions for pending modifications and keep existing conditions diff --git a/pkg/modifycontroller/modify_status_test.go b/pkg/modifycontroller/modify_status_test.go index 431befc7a..737dfb06a 100644 --- a/pkg/modifycontroller/modify_status_test.go +++ b/pkg/modifycontroller/modify_status_test.go @@ -46,7 +46,7 @@ var ( pvcConditionInfeasible = v1.PersistentVolumeClaimCondition{ Type: v1.PersistentVolumeClaimVolumeModifyingVolume, Status: v1.ConditionTrue, - Message: "ModifyVolume failed with errorrpc error: code = InvalidArgument desc = Parameters in VolumeAttributesClass is invalid. Waiting for retry.", + Message: "ModifyVolume failed with error rpc error: code = InvalidArgument desc = Parameters in VolumeAttributesClass is invalid. Waiting for retry.", } pvcConditionError = v1.PersistentVolumeClaimCondition{ From 0df2a99800ab2f6b66cf16c58d230facb5dd3115 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sun, 24 Aug 2025 01:17:30 +0800 Subject: [PATCH 3/6] don't change target in uncertain state We should keep retry the previously specified target. --- pkg/modifycontroller/modify_status.go | 1 + pkg/modifycontroller/modify_volume.go | 9 +++++---- pkg/modifycontroller/modify_volume_test.go | 13 ++++++++++++- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/pkg/modifycontroller/modify_status.go b/pkg/modifycontroller/modify_status.go index a57684ea4..c1b354963 100644 --- a/pkg/modifycontroller/modify_status.go +++ b/pkg/modifycontroller/modify_status.go @@ -47,6 +47,7 @@ func (ctrl *modifyController) markControllerModifyVolumeStatus( case v1.PersistentVolumeClaimModifyVolumeInProgress: conditionMessage = "ModifyVolume operation in progress." case v1.PersistentVolumeClaimModifyVolumeInfeasible: + newPVC.Status.ModifyVolumeStatus.TargetVolumeAttributesClassName = pvc.Status.ModifyVolumeStatus.TargetVolumeAttributesClassName conditionMessage = "ModifyVolume failed with error " + err.Error() + ". Waiting for retry." } pvcCondition.Message = conditionMessage diff --git a/pkg/modifycontroller/modify_volume.go b/pkg/modifycontroller/modify_volume.go index 763ec02b7..dd6204d12 100644 --- a/pkg/modifycontroller/modify_volume.go +++ b/pkg/modifycontroller/modify_volume.go @@ -58,14 +58,15 @@ func (ctrl *modifyController) modify(pvc *v1.PersistentVolumeClaim, pv *v1.Persi } else if pvcSpecVacName != nil && curVacName != nil && *pvcSpecVacName != *curVacName { // Check if PVC in uncertain state _, inUncertainState := ctrl.uncertainPVCs.Load(pvcKey) - if !inUncertainState { - klog.V(3).InfoS("previous operation on the PVC failed with a final error, retrying") + status := pvc.Status.ModifyVolumeStatus + if !inUncertainState || status == nil { + klog.V(3).InfoS("previous operation on the PVC succeeded or failed with a final error, retrying") return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) } else { - vac, err := ctrl.vacLister.Get(*pvcSpecVacName) + vac, err := ctrl.vacLister.Get(status.TargetVolumeAttributesClassName) if err != nil { if apierrors.IsNotFound(err) { - ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeModifyFailed, "VAC "+*pvcSpecVacName+" does not exist.") + ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeModifyFailed, "VAC "+status.TargetVolumeAttributesClassName+" does not exist.") } return pvc, pv, err, false } diff --git a/pkg/modifycontroller/modify_volume_test.go b/pkg/modifycontroller/modify_volume_test.go index e35b78d5f..7b327706d 100644 --- a/pkg/modifycontroller/modify_volume_test.go +++ b/pkg/modifycontroller/modify_volume_test.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/ptr" ) var ( @@ -185,12 +186,22 @@ func TestModifyUncertain(t *testing.T) { assertUncertain(false) client.SetModifyError(nonFinalErr) - _, _, err, _ = ctrlInstance.modify(pvc, pv) + pvc, pv, err, _ = ctrlInstance.modify(pvc, pv) if !errors.Is(err, nonFinalErr) { t.Fatalf("expected error to be %v, got %v", nonFinalErr, err) } // should enter uncertain state again assertUncertain(true) + + pvc.Spec.VolumeAttributesClassName = ptr.To("yet-another-vac") + pvc, _, err, _ = ctrlInstance.modify(pvc, pv) + if !errors.Is(err, nonFinalErr) { + t.Fatalf("expected error to be %v, got %v", nonFinalErr, err) + } + // target should not change, yet-another-vac should be ignored + if pvc.Status.ModifyVolumeStatus.TargetVolumeAttributesClassName != targetVac { + t.Fatalf("expected target to be %v, got %v", targetVac, pvc.Status.ModifyVolumeStatus.TargetVolumeAttributesClassName) + } } func createTestPVC(pvcName string, vacName string, curVacName string, targetVacName string) *v1.PersistentVolumeClaim { From 677f58224979a965e6c3273ddefbc6afdcaedab5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sat, 19 Jul 2025 12:04:38 +0800 Subject: [PATCH 4/6] extraModifyMetadata should not modify VAC in informer Also fix the test. Previous test does not actually cover the relevant code path. --- pkg/controller/controller_test.go | 4 +-- pkg/controller/expand_and_recover_test.go | 2 +- pkg/controller/resize_status_test.go | 2 +- pkg/csi/mock_client.go | 17 +++++++++--- pkg/modifier/csi_modifier_test.go | 2 +- pkg/modifycontroller/controller_test.go | 10 ++++---- pkg/modifycontroller/modify_status_test.go | 6 ++--- pkg/modifycontroller/modify_volume.go | 15 ++++++++--- pkg/modifycontroller/modify_volume_test.go | 30 ++++++++-------------- pkg/resizer/csi_resizer_test.go | 8 +++--- 10 files changed, 53 insertions(+), 43 deletions(-) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 4e6dc8d41..239fef686 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -211,7 +211,7 @@ func TestController(t *testing.T) { disableVolumeInUseErrorHandler: true, }, } { - client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true, false) + client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true) driverName, _ := client.GetDriverName(context.TODO()) var expectedCap resource.Quantity @@ -378,7 +378,7 @@ func TestResizePVC(t *testing.T) { }, } { t.Run(test.Name, func(t *testing.T) { - client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true, false) + client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true) if test.expansionError != nil { client.SetExpansionError(test.expansionError) } diff --git a/pkg/controller/expand_and_recover_test.go b/pkg/controller/expand_and_recover_test.go index 8d86d21e4..625359150 100644 --- a/pkg/controller/expand_and_recover_test.go +++ b/pkg/controller/expand_and_recover_test.go @@ -159,7 +159,7 @@ func TestExpandAndRecover(t *testing.T) { test := tests[i] t.Run(test.name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, true) - client := csi.NewMockClient("foo", !test.disableNodeExpansion, !test.disableControllerExpansion, false, true, true, false) + client := csi.NewMockClient("foo", !test.disableNodeExpansion, !test.disableControllerExpansion, false, true, true) driverName, _ := client.GetDriverName(context.TODO()) if test.expansionError != nil { client.SetExpansionError(test.expansionError) diff --git a/pkg/controller/resize_status_test.go b/pkg/controller/resize_status_test.go index 7e13bfea6..67afc6844 100644 --- a/pkg/controller/resize_status_test.go +++ b/pkg/controller/resize_status_test.go @@ -77,7 +77,7 @@ func TestResizeFunctions(t *testing.T) { tc := test t.Run(tc.name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, true) - client := csi.NewMockClient("foo", true, true, false, true, true, false) + client := csi.NewMockClient("foo", true, true, false, true, true) driverName, _ := client.GetDriverName(context.TODO()) pvc := test.pvc diff --git a/pkg/csi/mock_client.go b/pkg/csi/mock_client.go index 49c93c3e2..dc8a03b7e 100644 --- a/pkg/csi/mock_client.go +++ b/pkg/csi/mock_client.go @@ -3,6 +3,8 @@ package csi import ( "context" "fmt" + "maps" + "sync" "sync/atomic" "github.com/container-storage-interface/spec/lib/go/csi" @@ -16,7 +18,6 @@ func NewMockClient( supportsControllerModify bool, supportsPluginControllerService bool, supportsControllerSingleNodeMultiWriter bool, - supportsExtraModifyMetada bool, ) *MockClient { return &MockClient{ name: name, @@ -25,7 +26,7 @@ func NewMockClient( supportsControllerModify: supportsControllerModify, supportsPluginControllerService: supportsPluginControllerService, supportsControllerSingleNodeMultiWriter: supportsControllerSingleNodeMultiWriter, - extraModifyMetadata: supportsExtraModifyMetada, + modifiedParameters: make(map[string]string), } } @@ -43,7 +44,8 @@ type MockClient struct { checkMigratedLabel bool usedSecrets atomic.Pointer[map[string]string] usedCapability atomic.Pointer[csi.VolumeCapability] - extraModifyMetadata bool + modifyMu sync.Mutex + modifiedParameters map[string]string } func (c *MockClient) GetDriverName(context.Context) (string, error) { @@ -116,6 +118,12 @@ func (c *MockClient) GetModifyCount() int { return int(c.modifyCalled.Load()) } +func (c *MockClient) GetModifiedParameters() map[string]string { + c.modifyMu.Lock() + defer c.modifyMu.Unlock() + return maps.Clone(c.modifiedParameters) +} + func (c *MockClient) GetCapability() *csi.VolumeCapability { return c.usedCapability.Load() } @@ -138,5 +146,8 @@ func (c *MockClient) Modify( if c.modifyError != nil { return c.modifyError } + c.modifyMu.Lock() + defer c.modifyMu.Unlock() + maps.Copy(c.modifiedParameters, mutableParameters) return nil } diff --git a/pkg/modifier/csi_modifier_test.go b/pkg/modifier/csi_modifier_test.go index de68df9f2..179eb0341 100644 --- a/pkg/modifier/csi_modifier_test.go +++ b/pkg/modifier/csi_modifier_test.go @@ -28,7 +28,7 @@ func TestNewModifier(t *testing.T) { SupportsControllerModify: false, }, } { - client := csi.NewMockClient("mock", false, false, c.SupportsControllerModify, false, false, false) + client := csi.NewMockClient("mock", false, false, c.SupportsControllerModify, false, false) driverName := "mock-driver" k8sClient, informerFactory := fakeK8s() _, err := NewModifierFromClient(client, 0, k8sClient, informerFactory, false, driverName) diff --git a/pkg/modifycontroller/controller_test.go b/pkg/modifycontroller/controller_test.go index 2eb4de4f6..d8decd71a 100644 --- a/pkg/modifycontroller/controller_test.go +++ b/pkg/modifycontroller/controller_test.go @@ -63,7 +63,7 @@ func TestController(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Setup - client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + client := csi.NewMockClient(testDriverName, true, true, true, true, true) initialObjects := []runtime.Object{test.pvc, test.pv, testVacObject, targetVacObject} ctrlInstance := setupFakeK8sEnvironment(t, client, initialObjects) @@ -114,7 +114,7 @@ func TestModifyPVC(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + client := csi.NewMockClient(testDriverName, true, true, true, true, true) if test.modifyFailure { client.SetModifyError(fmt.Errorf("fake modification error")) } @@ -215,7 +215,7 @@ func TestSyncPVC(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + client := csi.NewMockClient(testDriverName, true, true, true, true, true) initialObjects := []runtime.Object{test.pvc, test.pv, testVacObject, targetVacObject} ctrlInstance := setupFakeK8sEnvironment(t, client, initialObjects) @@ -275,7 +275,7 @@ func TestInfeasibleRetry(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Setup - client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + client := csi.NewMockClient(testDriverName, true, true, true, true, true) if test.csiModifyError != nil { client.SetModifyError(test.csiModifyError) } @@ -339,7 +339,7 @@ func TestConcurrentSync(t *testing.T) { } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + client := csi.NewMockClient(testDriverName, true, true, true, true, true) client.SetModifyError(tc.err) initialObjects := []runtime.Object{testVacObject, targetVacObject} diff --git a/pkg/modifycontroller/modify_status_test.go b/pkg/modifycontroller/modify_status_test.go index 737dfb06a..9012a064d 100644 --- a/pkg/modifycontroller/modify_status_test.go +++ b/pkg/modifycontroller/modify_status_test.go @@ -103,7 +103,7 @@ func TestMarkControllerModifyVolumeStatus(t *testing.T) { tc := test t.Run(tc.name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true) - client := csi.NewMockClient("foo", true, true, true, true, true, false) + client := csi.NewMockClient("foo", true, true, true, true, true) driverName, _ := client.GetDriverName(context.TODO()) pvc := test.pvc @@ -163,7 +163,7 @@ func TestUpdateConditionBasedOnError(t *testing.T) { tc := test t.Run(tc.name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true) - client := csi.NewMockClient("foo", true, true, true, true, true, false) + client := csi.NewMockClient("foo", true, true, true, true, true) driverName, _ := client.GetDriverName(context.TODO()) pvc := test.pvc @@ -232,7 +232,7 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) { tc := test t.Run(tc.name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true) - client := csi.NewMockClient("foo", true, true, true, true, true, false) + client := csi.NewMockClient("foo", true, true, true, true, true) driverName, _ := client.GetDriverName(context.TODO()) var initialObjects []runtime.Object diff --git a/pkg/modifycontroller/modify_volume.go b/pkg/modifycontroller/modify_volume.go index dd6204d12..8c18facef 100644 --- a/pkg/modifycontroller/modify_volume.go +++ b/pkg/modifycontroller/modify_volume.go @@ -18,6 +18,7 @@ package modifycontroller import ( "fmt" + "maps" "time" "github.com/kubernetes-csi/csi-lib-utils/slowset" @@ -162,12 +163,18 @@ func (ctrl *modifyController) callModifyVolumeOnPlugin( pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, vac *storagev1beta1.VolumeAttributesClass) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error) { + parameters := vac.Parameters if ctrl.extraModifyMetadata { - vac.Parameters[pvcNameKey] = pvc.GetName() - vac.Parameters[pvcNamespaceKey] = pvc.GetNamespace() - vac.Parameters[pvNameKey] = pv.GetName() + if len(parameters) == 0 { + parameters = make(map[string]string, 3) + } else { + parameters = maps.Clone(parameters) + } + parameters[pvcNameKey] = pvc.GetName() + parameters[pvcNamespaceKey] = pvc.GetNamespace() + parameters[pvNameKey] = pv.GetName() } - err := ctrl.modifier.Modify(pv, vac.Parameters) + err := ctrl.modifier.Modify(pv, parameters) if err != nil { return pvc, pv, err diff --git a/pkg/modifycontroller/modify_volume_test.go b/pkg/modifycontroller/modify_volume_test.go index 7b327706d..cc0567a55 100644 --- a/pkg/modifycontroller/modify_volume_test.go +++ b/pkg/modifycontroller/modify_volume_test.go @@ -28,12 +28,7 @@ var ( targetVacObject = &storagev1beta1.VolumeAttributesClass{ ObjectMeta: metav1.ObjectMeta{Name: targetVac}, DriverName: testDriverName, - Parameters: map[string]string{ - "iops": "4567", - "csi.storage.k8s.io/pvc/name": pvcName, - "csi.storage.k8s.io/pvc/namespace": pvcNamespace, - "csi.storage.k8s.io/pv/name": pvName, - }, + Parameters: map[string]string{"iops": "4567"}, } ) @@ -51,7 +46,7 @@ func TestModify(t *testing.T) { expectedCurrentVolumeAttributesClassName *string expectedPVVolumeAttributesClassName *string withExtraMetadata bool - expectedVacParams map[string]string + expectedMutableParams map[string]string }{ { name: "nothing to modify", @@ -83,6 +78,7 @@ func TestModify(t *testing.T) { expectedModifyVolumeStatus: nil, expectedCurrentVolumeAttributesClassName: &targetVac, expectedPVVolumeAttributesClassName: &targetVac, + expectedMutableParams: map[string]string{"iops": "4567"}, }, { name: "modify volume success with extra metadata", @@ -94,7 +90,7 @@ func TestModify(t *testing.T) { expectedCurrentVolumeAttributesClassName: &targetVac, expectedPVVolumeAttributesClassName: &targetVac, withExtraMetadata: true, - expectedVacParams: map[string]string{ + expectedMutableParams: map[string]string{ "iops": "4567", "csi.storage.k8s.io/pvc/name": basePVC.GetName(), "csi.storage.k8s.io/pvc/namespace": basePVC.GetNamespace(), @@ -107,12 +103,13 @@ func TestModify(t *testing.T) { test := tests[i] t.Run(test.name, func(t *testing.T) { // Setup - client := csi.NewMockClient(testDriverName, true, true, true, true, true, test.withExtraMetadata) + client := csi.NewMockClient(testDriverName, true, true, true, true, true) initialObjects := []runtime.Object{test.pvc, test.pv, testVacObject} if test.vacExists { initialObjects = append(initialObjects, targetVacObject) } ctrlInstance := setupFakeK8sEnvironment(t, client, initialObjects) + ctrlInstance.extraModifyMetadata = test.withExtraMetadata // Action pvc, pv, err, modifyCalled := ctrlInstance.modify(test.pvc, test.pv) @@ -141,15 +138,10 @@ func TestModify(t *testing.T) { t.Errorf("expected VolumeAttributesClassName of pv to be %v, got %v", *test.expectedPVVolumeAttributesClassName, *actualPVVolumeAttributesClassName) } - if test.withExtraMetadata { - vacObj, err := ctrlInstance.vacLister.Get(*test.expectedPVVolumeAttributesClassName) - if err != nil { - t.Errorf("failed to get VAC: %v", err) - } else { - vacParams := vacObj.Parameters - if diff := cmp.Diff(test.expectedVacParams, vacParams); diff != "" { - t.Errorf("expected VAC parameters to be %v, got %v", test.expectedVacParams, vacParams) - } + if test.expectedMutableParams != nil { + p := client.GetModifiedParameters() + if diff := cmp.Diff(test.expectedMutableParams, p); diff != "" { + t.Errorf("expected mutable parameters to be %v, got %v", test.expectedMutableParams, p) } } }) @@ -161,7 +153,7 @@ func TestModifyUncertain(t *testing.T) { basePVC.Status.ModifyVolumeStatus.Status = v1.PersistentVolumeClaimModifyVolumeInProgress basePV := createTestPV(1, pvcName, pvcNamespace, "foobaz" /*pvcUID*/, &fsVolumeMode, testVac) - client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + client := csi.NewMockClient(testDriverName, true, true, true, true, true) initialObjects := []runtime.Object{testVacObject, targetVacObject, basePVC, basePV} ctrlInstance := setupFakeK8sEnvironment(t, client, initialObjects) diff --git a/pkg/resizer/csi_resizer_test.go b/pkg/resizer/csi_resizer_test.go index 6d089511f..ea6b9e023 100644 --- a/pkg/resizer/csi_resizer_test.go +++ b/pkg/resizer/csi_resizer_test.go @@ -72,7 +72,7 @@ func TestNewResizer(t *testing.T) { Error: resizeNotSupportErr, }, } { - client := csi.NewMockClient("mock", c.SupportsNodeResize, c.SupportsControllerResize, false, c.SupportsPluginControllerService, c.SupportsControllerSingleNodeMultiWriter, false) + client := csi.NewMockClient("mock", c.SupportsNodeResize, c.SupportsControllerResize, false, c.SupportsPluginControllerService, c.SupportsControllerSingleNodeMultiWriter) driverName := "mock-driver" k8sClient := fake.NewSimpleClientset() resizer, err := NewResizerFromClient(client, 0, k8sClient, driverName) @@ -106,7 +106,7 @@ func TestResizeWithSecret(t *testing.T) { }, } for _, tc := range tests { - client := csi.NewMockClient("mock", true, true, false, true, true, false) + client := csi.NewMockClient("mock", true, true, false, true, true) secret := makeSecret("some-secret", "secret-namespace") k8sClient := fake.NewSimpleClientset(secret) pv := makeTestPV("test-csi", 2, "ebs-csi", "vol-abcde", tc.hasExpansionSecret) @@ -164,7 +164,7 @@ func TestResizeMigratedPV(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { driverName := tc.driverName - client := csi.NewMockClient(driverName, true, true, false, true, true, false) + client := csi.NewMockClient(driverName, true, true, false, true, true) client.SetCheckMigratedLabel() k8sClient := fake.NewSimpleClientset() resizer, err := NewResizerFromClient(client, 0, k8sClient, driverName) @@ -433,7 +433,7 @@ func TestCanSupport(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { driverName := tc.driverName - client := csi.NewMockClient(driverName, true, true, false, true, true, false) + client := csi.NewMockClient(driverName, true, true, false, true, true) k8sClient := fake.NewSimpleClientset() resizer, err := NewResizerFromClient(client, 0, k8sClient, driverName) if err != nil { From 19cef74deb1bc7be897103c4f5feb2bbad65ccb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sat, 19 Jul 2025 11:34:31 +0800 Subject: [PATCH 5/6] modify: support rollback Support rollback to VAC A if modifying from A to B failed with a final error. This works just like we modifying it again to C on final error. The significant changes in the sync logic: - Always retry if pvc.Status.ModifyVolumeStatus is not nil, which means the last transation does not finish successfully. - Keep reconciling to the previous target if spec is rolled back to nil, until it succeeds or we get an infeasible error. Then we just leave it at its current state and stop reconciling for it, since user may not care about it now. --- pkg/modifycontroller/controller.go | 20 ++-- pkg/modifycontroller/controller_test.go | 19 +++- pkg/modifycontroller/modify_status.go | 9 +- pkg/modifycontroller/modify_volume.go | 104 ++++++++++++--------- pkg/modifycontroller/modify_volume_test.go | 7 +- 5 files changed, 94 insertions(+), 65 deletions(-) diff --git a/pkg/modifycontroller/controller.go b/pkg/modifycontroller/controller.go index 2b41ee00f..fb6975e5e 100644 --- a/pkg/modifycontroller/controller.go +++ b/pkg/modifycontroller/controller.go @@ -39,6 +39,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ) // ModifyController watches PVCs and checks if they are requesting an modify operation. @@ -128,7 +129,7 @@ func (ctrl *modifyController) initUncertainPVCs() error { return err } for _, pvc := range allPVCs { - if pvc.Status.ModifyVolumeStatus != nil && (pvc.Status.ModifyVolumeStatus.Status == v1.PersistentVolumeClaimModifyVolumeInProgress || pvc.Status.ModifyVolumeStatus.Status == v1.PersistentVolumeClaimModifyVolumeInfeasible) { + if pvc.Status.ModifyVolumeStatus != nil && (pvc.Status.ModifyVolumeStatus.Status == v1.PersistentVolumeClaimModifyVolumeInProgress) { pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) if err != nil { return err @@ -160,12 +161,11 @@ func (ctrl *modifyController) updatePVC(oldObj, newObj interface{}) { } // Only trigger modify volume if the following conditions are met - // 1. Non empty vac name - // 2. oldVacName != newVacName - // 3. PVC is in Bound state - oldVacName := oldPVC.Spec.VolumeAttributesClassName - newVacName := newPVC.Spec.VolumeAttributesClassName - if newVacName != nil && *newVacName != "" && (oldVacName == nil || *newVacName != *oldVacName) && oldPVC.Status.Phase == v1.ClaimBound { + // 1. VAC changed or modify finished (check pending modify request while we are modifying) + // 2. PVC is in Bound state + oldVacName := ptr.Deref(oldPVC.Spec.VolumeAttributesClassName, "") + newVacName := ptr.Deref(newPVC.Spec.VolumeAttributesClassName, "") + if (newVacName != oldVacName || newPVC.Status.ModifyVolumeStatus == nil) && newPVC.Status.Phase == v1.ClaimBound { _, err := ctrl.pvLister.Get(oldPVC.Spec.VolumeName) if err != nil { klog.Errorf("Get PV %q of pvc %q in PVInformer cache failed: %v", oldPVC.Spec.VolumeName, klog.KObj(oldPVC), err) @@ -267,15 +267,13 @@ func (ctrl *modifyController) syncPVC(key string) error { // Only trigger modify volume if the following conditions are met // 1. PV provisioned by CSI driver AND driver name matches local driver - // 2. Non-empty vac name - // 3. PVC is in Bound state + // 2. PVC is in Bound state if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != ctrl.name { klog.V(7).InfoS("Skipping PV provisioned by different driver", "PV", klog.KObj(pv)) return nil } - vacName := pvc.Spec.VolumeAttributesClassName - if vacName != nil && *vacName != "" && pvc.Status.Phase == v1.ClaimBound { + if pvc.Status.Phase == v1.ClaimBound { _, _, err, _ := ctrl.modify(pvc, pv) if err != nil { return err diff --git a/pkg/modifycontroller/controller_test.go b/pkg/modifycontroller/controller_test.go index d8decd71a..10f312707 100644 --- a/pkg/modifycontroller/controller_test.go +++ b/pkg/modifycontroller/controller_test.go @@ -22,13 +22,13 @@ import ( ) func TestController(t *testing.T) { - basePVC := createTestPVC(pvcName, testVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/) + basePVC := createTestPVC(pvcName, testVac /*vacName*/, testVac /*curVacName*/, "" /*targetVacName*/) + basePVC.Status.ModifyVolumeStatus = nil basePV := createTestPV(1, pvcName, pvcNamespace, "foobaz" /*pvcUID*/, &fsVolumeMode, testVac) firstTimePV := basePV.DeepCopy() firstTimePV.Spec.VolumeAttributesClassName = nil firstTimePVC := basePVC.DeepCopy() firstTimePVC.Status.CurrentVolumeAttributesClassName = nil - firstTimePVC.Status.ModifyVolumeStatus = nil tests := []struct { name string @@ -150,6 +150,9 @@ func TestSyncPVC(t *testing.T) { pvcWithUncreatedPV := createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/) pvcWithUncreatedPV.Spec.VolumeName = "" + inprogressPVC := createTestPVC(pvcName, "" /*vacName*/, "" /*curVacName*/, testVac /*targetVacName*/) + inprogressPVC.Status.ModifyVolumeStatus.Status = v1.PersistentVolumeClaimModifyVolumeInProgress + nonCSIPVC := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: pvcNamespace}, Spec: v1.PersistentVolumeClaimSpec{ @@ -181,6 +184,12 @@ func TestSyncPVC(t *testing.T) { pv: basePV, callCSIModify: true, }, + { + name: "Should NOT modify when rollback to empty VACName", + pvc: createTestPVC(pvcName, "" /*vacName*/, "" /*curVacName*/, testVac /*targetVacName*/), + pv: basePV, + callCSIModify: false, + }, { name: "Should NOT modify if PVC managed by another CSI Driver", pvc: basePVC, @@ -188,10 +197,10 @@ func TestSyncPVC(t *testing.T) { callCSIModify: false, }, { - name: "Should NOT modify if PVC has empty Spec.VACName", - pvc: createTestPVC(pvcName, "" /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/), + name: "Should execute ModifyVolume for InProgress target if PVC has empty Spec.VACName", + pvc: inprogressPVC, pv: basePV, - callCSIModify: false, + callCSIModify: true, }, { name: "Should NOT modify if PVC not in bound state", diff --git a/pkg/modifycontroller/modify_status.go b/pkg/modifycontroller/modify_status.go index c1b354963..a31a12f88 100644 --- a/pkg/modifycontroller/modify_status.go +++ b/pkg/modifycontroller/modify_status.go @@ -90,7 +90,10 @@ func (ctrl *modifyController) updateConditionBasedOnError(pvc *v1.PersistentVolu // markControllerModifyVolumeStatus will mark ModifyVolumeStatus as completed in the PVC // and update CurrentVolumeAttributesClassName, clear the conditions func (ctrl *modifyController) markControllerModifyVolumeCompleted(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error) { - modifiedVacName := pvc.Status.ModifyVolumeStatus.TargetVolumeAttributesClassName + var modifiedVacName *string + if pvc.Status.ModifyVolumeStatus.TargetVolumeAttributesClassName != "" { + modifiedVacName = &pvc.Status.ModifyVolumeStatus.TargetVolumeAttributesClassName + } // Update PVC newPVC := pvc.DeepCopy() @@ -99,14 +102,14 @@ func (ctrl *modifyController) markControllerModifyVolumeCompleted(pvc *v1.Persis newPVC.Status.ModifyVolumeStatus = nil // Update CurrentVolumeAttributesClassName - newPVC.Status.CurrentVolumeAttributesClassName = &modifiedVacName + newPVC.Status.CurrentVolumeAttributesClassName = modifiedVacName // Clear all the conditions related to modify volume newPVC.Status.Conditions = clearModifyVolumeConditions(newPVC.Status.Conditions) // Update PV newPV := pv.DeepCopy() - newPV.Spec.VolumeAttributesClassName = &modifiedVacName + newPV.Spec.VolumeAttributesClassName = modifiedVacName // Update PV before PVC to avoid PV not getting updated but PVC did updatedPV, err := util.PatchPersistentVolume(ctrl.kubeClient, pv, newPV) diff --git a/pkg/modifycontroller/modify_volume.go b/pkg/modifycontroller/modify_volume.go index 8c18facef..5bc158c9f 100644 --- a/pkg/modifycontroller/modify_volume.go +++ b/pkg/modifycontroller/modify_volume.go @@ -30,6 +30,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ) const ( @@ -40,8 +41,6 @@ const ( // The return value bool is only used as a sentinel value when function returns without actually performing modification func (ctrl *modifyController) modify(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error, bool) { - pvcSpecVacName := pvc.Spec.VolumeAttributesClassName - curVacName := pvc.Status.CurrentVolumeAttributesClassName pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) if err != nil { return pvc, pv, err, false @@ -53,30 +52,53 @@ func (ctrl *modifyController) modify(pvc *v1.PersistentVolumeClaim, pv *v1.Persi return pvc, pv, delayModificationErr, false } - if pvcSpecVacName != nil && curVacName == nil { - // First time adding VAC to a PVC - return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) - } else if pvcSpecVacName != nil && curVacName != nil && *pvcSpecVacName != *curVacName { - // Check if PVC in uncertain state - _, inUncertainState := ctrl.uncertainPVCs.Load(pvcKey) - status := pvc.Status.ModifyVolumeStatus - if !inUncertainState || status == nil { - klog.V(3).InfoS("previous operation on the PVC succeeded or failed with a final error, retrying") - return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) - } else { - vac, err := ctrl.vacLister.Get(status.TargetVolumeAttributesClassName) - if err != nil { - if apierrors.IsNotFound(err) { - ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeModifyFailed, "VAC "+status.TargetVolumeAttributesClassName+" does not exist.") - } - return pvc, pv, err, false - } - return ctrl.controllerModifyVolumeWithTarget(pvc, pv, vac, pvcSpecVacName) + pvcSpecVacName := ptr.Deref(pvc.Spec.VolumeAttributesClassName, "") + curVacName := ptr.Deref(pvc.Status.CurrentVolumeAttributesClassName, "") + status := pvc.Status.ModifyVolumeStatus + + if status == nil && pvcSpecVacName == curVacName { + // No modification required, already reached target state + return pvc, pv, nil, false + } + + if pvcSpecVacName == "" && + (status == nil || status.Status != v1.PersistentVolumeClaimModifyVolumeInProgress) { + // User don't care the target state, and we've reached a relatively stable state. Just keep it here. + // Note: APIServer generally not allowing setting pvcSpecVacName to empty when curVacName is not empty. + klog.V(4).InfoS("stop reconcile for rolled back PVC", "PV", klog.KObj(pv)) + // Don't try to revert anything here, because we only record the result of the last modification. + // We don't know what happened before. User can switch between InProgress/Infeasible/Pending status + // freely by modifying the spec. + return pvc, pv, nil, false + } + + // Check if we should change our target + _, inUncertainState := ctrl.uncertainPVCs.Load(pvcKey) + if (status != nil && status.Status == v1.PersistentVolumeClaimModifyVolumeInProgress && inUncertainState) || pvcSpecVacName == "" { + vac, err := ctrl.getTargetVAC(pvc, status.TargetVolumeAttributesClassName) + if err != nil { + return pvc, pv, err, false } + return ctrl.controllerModifyVolumeWithTarget(pvc, pv, vac) } - // No modification required - return pvc, pv, nil, false + // If status != InProgress && inUncertainState, we either see a stall PVC, or the status was updated externally. + // For stall PVC, we will get Conflict when marking InProgress. + // For status updated externally, we respect the user choice and try the new target, as if it were not uncertain. + // The in-memory uncertain state will be updated after the next ControllerModifyVolume call. + return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) +} + +func (ctrl *modifyController) getTargetVAC(pvc *v1.PersistentVolumeClaim, vacName string) (*storagev1beta1.VolumeAttributesClass, error) { + vac, err := ctrl.vacLister.Get(vacName) + // Check if pvcSpecVac is valid and exist + if err != nil { + if apierrors.IsNotFound(err) { + ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeModifyFailed, "VAC %q does not exist.", vacName) + } + return nil, fmt.Errorf("get VAC with vac name %s in VACInformer cache failed: %w", vacName, err) + } + return vac, nil } // func validateVACAndModifyVolumeWithTarget validate the VAC. The function sets pvc.Status.ModifyVolumeStatus @@ -84,29 +106,23 @@ func (ctrl *modifyController) modify(pvc *v1.PersistentVolumeClaim, pv *v1.Persi func (ctrl *modifyController) validateVACAndModifyVolumeWithTarget( pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error, bool) { - // The controller only triggers ModifyVolume if pvcSpecVacName is not nil nor empty - pvcSpecVacName := pvc.Spec.VolumeAttributesClassName - // Check if pvcSpecVac is valid and exist - vac, err := ctrl.vacLister.Get(*pvcSpecVacName) - if err == nil { - // Mark pvc.Status.ModifyVolumeStatus as in progress - pvc, err = ctrl.markControllerModifyVolumeStatus(pvc, v1.PersistentVolumeClaimModifyVolumeInProgress, nil) - if err != nil { - return pvc, pv, err, false - } - // Record an event to indicate that external resizer is modifying this volume. - ctrl.eventRecorder.Event(pvc, v1.EventTypeNormal, util.VolumeModify, - fmt.Sprintf("external resizer is modifying volume %s with vac %s", pvc.Name, *pvcSpecVacName)) - return ctrl.controllerModifyVolumeWithTarget(pvc, pv, vac, pvcSpecVacName) - } else { - if apierrors.IsNotFound(err) { - ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeModifyFailed, "VAC "+*pvcSpecVacName+" does not exist.") - } - klog.Errorf("Get VAC with vac name %s in VACInformer cache failed: %v", *pvcSpecVacName, err) + + vac, err := ctrl.getTargetVAC(pvc, *pvc.Spec.VolumeAttributesClassName) + if err != nil { // Mark pvc.Status.ModifyVolumeStatus as pending pvc, err = ctrl.markControllerModifyVolumeStatus(pvc, v1.PersistentVolumeClaimModifyVolumePending, nil) return pvc, pv, err, false } + + // Mark pvc.Status.ModifyVolumeStatus as in progress + pvc, err = ctrl.markControllerModifyVolumeStatus(pvc, v1.PersistentVolumeClaimModifyVolumeInProgress, nil) + if err != nil { + return pvc, pv, err, false + } + // Record an event to indicate that external resizer is modifying this volume. + ctrl.eventRecorder.Event(pvc, v1.EventTypeNormal, util.VolumeModify, + fmt.Sprintf("external resizer is modifying volume %s with vac %s", pvc.Name, vac.Name)) + return ctrl.controllerModifyVolumeWithTarget(pvc, pv, vac) } // func controllerModifyVolumeWithTarget trigger the CSI ControllerModifyVolume API call @@ -115,11 +131,11 @@ func (ctrl *modifyController) controllerModifyVolumeWithTarget( pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, vacObj *storagev1beta1.VolumeAttributesClass, - pvcSpecVacName *string) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error, bool) { +) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error, bool) { var err error pvc, pv, err = ctrl.callModifyVolumeOnPlugin(pvc, pv, vacObj) if err == nil { - klog.V(4).Infof("Update volumeAttributesClass of PV %q to %s succeeded", pv.Name, *pvcSpecVacName) + klog.V(4).Infof("Update volumeAttributesClass of PV %q to %s succeeded", pv.Name, vacObj.Name) // Record an event to indicate that modify operation is successful. ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.VolumeModifySuccess, fmt.Sprintf("external resizer modified volume %s with vac %s successfully", pvc.Name, vacObj.Name)) return pvc, pv, nil, true diff --git a/pkg/modifycontroller/modify_volume_test.go b/pkg/modifycontroller/modify_volume_test.go index cc0567a55..723f59ed2 100644 --- a/pkg/modifycontroller/modify_volume_test.go +++ b/pkg/modifycontroller/modify_volume_test.go @@ -33,7 +33,7 @@ var ( ) func TestModify(t *testing.T) { - basePVC := createTestPVC(pvcName, testVac /*vacName*/, testVac /*curVacName*/, testVac /*targetVacName*/) + basePVC := createTestPVC(pvcName, testVac /*vacName*/, testVac /*curVacName*/, "" /*targetVacName*/) basePV := createTestPV(1, pvcName, pvcNamespace, "foobaz" /*pvcUID*/, &fsVolumeMode, testVac) var tests = []struct { @@ -220,10 +220,13 @@ func createTestPVC(pvcName string, vacName string, curVacName string, targetVacN CurrentVolumeAttributesClassName: &curVacName, ModifyVolumeStatus: &v1.ModifyVolumeStatus{ TargetVolumeAttributesClassName: targetVacName, - Status: "", + Status: v1.PersistentVolumeClaimModifyVolumeInfeasible, }, }, } + if targetVacName == "" { + pvc.Status.ModifyVolumeStatus = nil + } return pvc } From d9709c5c943edb0d5d19d2cf0a0c05e8c8599370 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sun, 31 Aug 2025 03:27:06 +0800 Subject: [PATCH 6/6] modify: cancel retry deleted PVC --- pkg/modifycontroller/controller.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/modifycontroller/controller.go b/pkg/modifycontroller/controller.go index fb6975e5e..122160cea 100644 --- a/pkg/modifycontroller/controller.go +++ b/pkg/modifycontroller/controller.go @@ -27,6 +27,7 @@ import ( "github.com/kubernetes-csi/csi-lib-utils/slowset" "github.com/kubernetes-csi/external-resizer/pkg/modifier" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -172,6 +173,7 @@ func (ctrl *modifyController) updatePVC(oldObj, newObj interface{}) { return } // Handle modify volume by adding to the claimQueue to avoid race conditions + klog.V(4).InfoS("Enqueueing PVC for modify", "PVC", klog.KObj(newPVC)) ctrl.addPVC(newObj) } else { klog.V(4).InfoS("No need to modify PVC", "PVC", klog.KObj(newPVC)) @@ -252,6 +254,10 @@ func (ctrl *modifyController) syncPVC(key string) error { pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name) if err != nil { + if apierrors.IsNotFound(err) { + klog.V(3).InfoS("PVC is deleted or does not exist", "PVC", klog.KRef(namespace, name)) + return nil + } return fmt.Errorf("getting PVC %s/%s failed: %v", namespace, name, err) }