diff --git a/controllers/crds/cleanup_controller.go b/controllers/crds/cleanup_controller.go new file mode 100644 index 00000000..4c76124a --- /dev/null +++ b/controllers/crds/cleanup_controller.go @@ -0,0 +1,119 @@ +package crds + +import ( + "context" + "time" + + ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api" + "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup" + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type NodeCleanupObject struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + NodeID string `json:"nodeID"` + ClusterName string `json:"clusterName"` +} + +func (o *NodeCleanupObject) DeepCopyObject() runtime.Object { + if o == nil { + return nil + } + cp := *o + return &cp +} + +type ENICleanup struct { + Events chan event.GenericEvent +} + +func SetupENICleanupController(mgr ctrl.Manager, log logr.Logger, vpcID string, ec2Wrapper ec2API.EC2Wrapper) (*ENICleanup, error) { + + enicleanup := &ENICleanup{} + enicleanup.Events = make(chan event.GenericEvent, 2048) + nodecleanupHandler := enqueueRequestForNodeIDEvent{ + log: log, + } + rec := &ENICleanupReconciler{ + vpcID: vpcID, + ec2Wrapper: ec2Wrapper, + log: log, + } + source.Channel(enicleanup.Events, &nodecleanupHandler) + err := ctrl.NewControllerManagedBy(mgr). + Named("NodeENICleaner"). + WatchesRawSource(source.Channel(enicleanup.Events, &nodecleanupHandler)). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 5, + }). + Complete(rec) + if err != nil { + return nil, err + } + + return enicleanup, nil +} + +var _ handler.TypedEventHandler[client.Object, reconcile.Request] = (*enqueueRequestForNodeIDEvent)(nil) + +type enqueueRequestForNodeIDEvent struct { + log logr.Logger +} + +func (h *enqueueRequestForNodeIDEvent) Create(ctx context.Context, _ event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { +} + +func (h *enqueueRequestForNodeIDEvent) Update(ctx context.Context, _ event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { +} + +func (h *enqueueRequestForNodeIDEvent) Delete(ctx context.Context, _ event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { +} + +func (h *enqueueRequestForNodeIDEvent) Generic(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + if e.Object == nil { + return + } + nodeCleanup, ok := e.Object.(*NodeCleanupObject) + if !ok { + h.log.Error(nil, "Failed to convert object to NodeCleanupObject") + return + } + if nodeCleanup.Name == "" { + h.log.Error(nil, "NodeID is empty") + return + } + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: nodeCleanup.Name, + }, + }) +} + +type ENICleanupReconciler struct { + vpcID string + ec2Wrapper ec2API.EC2Wrapper + log logr.Logger +} + +func (r *ENICleanupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + defer cancel() + if err := cleanup.NewNodeResourceCleaner(req.NamespacedName.Name, r.ec2Wrapper, r.vpcID, r.log).DeleteLeakedResources(ctx); err != nil { + return ctrl.Result{}, err + + } + return ctrl.Result{}, nil +} diff --git a/controllers/crds/cninode_controller.go b/controllers/crds/cninode_controller.go index 18d03978..2eaa5d65 100644 --- a/controllers/crds/cninode_controller.go +++ b/controllers/crds/cninode_controller.go @@ -19,13 +19,11 @@ import ( "github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1" ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api" - "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" - "golang.org/x/sync/semaphore" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/metrics" ) @@ -69,16 +68,15 @@ func prometheusRegister() { // CNINodeReconciler reconciles a CNINode object type CNINodeReconciler struct { client.Client - scheme *runtime.Scheme - context context.Context - log logr.Logger - eC2Wrapper ec2API.EC2Wrapper - k8sAPI k8s.K8sWrapper - clusterName string - vpcId string - finalizerManager k8s.FinalizerManager - deletePool *semaphore.Weighted - newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner + scheme *runtime.Scheme + context context.Context + log logr.Logger + eC2Wrapper ec2API.EC2Wrapper + k8sAPI k8s.K8sWrapper + clusterName string + vpcId string + finalizerManager k8s.FinalizerManager + nodeCleanupQueue *ENICleanup } func NewCNINodeReconciler( @@ -92,20 +90,19 @@ func NewCNINodeReconciler( vpcId string, finalizerManager k8s.FinalizerManager, maxConcurrentWorkers int, - newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner, + nodeCleanupQueue *ENICleanup, ) *CNINodeReconciler { return &CNINodeReconciler{ - Client: client, - scheme: scheme, - context: ctx, - log: logger, - eC2Wrapper: ec2Wrapper, - k8sAPI: k8sWrapper, - clusterName: clusterName, - vpcId: vpcId, - finalizerManager: finalizerManager, - deletePool: semaphore.NewWeighted(int64(maxConcurrentWorkers)), - newResourceCleaner: newResourceCleaner, + Client: client, + scheme: scheme, + context: ctx, + log: logger, + eC2Wrapper: ec2Wrapper, + k8sAPI: k8sWrapper, + clusterName: clusterName, + vpcId: vpcId, + finalizerManager: finalizerManager, + nodeCleanupQueue: nodeCleanupQueue, } } @@ -156,19 +153,16 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct r.log.Info("running the finalizer routine on cniNode", "cniNode", cniNode.Name) // run cleanup when node id is present if nodeID, ok := cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" { - if !r.deletePool.TryAcquire(1) { - r.log.Info("d, will requeue request") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + obj := &NodeCleanupObject{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeID, + Namespace: "", + }, + NodeID: nodeID, + } + r.nodeCleanupQueue.Events <- event.GenericEvent{ + Object: obj, } - go func(nodeID string) { - defer r.deletePool.Release(1) - childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout) - defer cancel() - if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil { - r.log.Error(err, "failed to cleanup resources during node termination") - ec2API.NodeTerminationENICleanupFailure.Inc() - } - }(nodeID) } } diff --git a/controllers/crds/cninode_controller_test.go b/controllers/crds/cninode_controller_test.go index 3d54e32d..25fbbf73 100644 --- a/controllers/crds/cninode_controller_test.go +++ b/controllers/crds/cninode_controller_test.go @@ -8,19 +8,16 @@ import ( mock_api "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/api" mock_cleanup "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/api/cleanup" mock_k8s "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/k8s" - ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api" - "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" - "github.com/go-logr/logr" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" - "golang.org/x/sync/semaphore" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" fakeClient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -55,14 +52,17 @@ func NewCNINodeMock(ctrl *gomock.Controller, mockObjects ...client.Object) *CNIN _ = corev1.AddToScheme(scheme) _ = v1alpha1.AddToScheme(scheme) client := fakeClient.NewClientBuilder().WithScheme(scheme).WithObjects(mockObjects...).Build() + cleanupQueue := &ENICleanup{ + Events: make(chan event.GenericEvent, 2048), + } return &CNINodeMock{ Reconciler: CNINodeReconciler{ - Client: client, - scheme: scheme, - log: zap.New(), - clusterName: mockClusterName, - vpcId: "vpc-000000000000", - deletePool: semaphore.NewWeighted(10), + Client: client, + scheme: scheme, + log: zap.New(), + clusterName: mockClusterName, + vpcId: "vpc-000000000000", + nodeCleanupQueue: cleanupQueue, }, } } @@ -118,12 +118,7 @@ func TestCNINodeReconcile(t *testing.T) { }, }, }, - prepare: func(f *fields) { - f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner { - return f.mockResourceCleaner - } - f.mockResourceCleaner.EXPECT().DeleteLeakedResources(gomock.Any()).Times(0) - }, + asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) { assert.NoError(t, err) assert.Equal(t, res, reconcile.Result{}) @@ -149,14 +144,6 @@ func TestCNINodeReconcile(t *testing.T) { }, }, }, - prepare: func(f *fields) { - f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner { - assert.Equal(t, "i-0123456789abcdef0", nodeID) - return f.mockResourceCleaner - } - f.mockResourceCleaner.EXPECT().DeleteLeakedResources(gomock.Any()).Times(1).Return(nil) - - }, asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) { assert.NoError(t, err) assert.Equal(t, res, reconcile.Result{}) diff --git a/go.mod b/go.mod index 8a8f511c..d7c4608f 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,6 @@ require ( github.com/prometheus/common v0.62.0 github.com/stretchr/testify v1.10.0 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.17.0 golang.org/x/time v0.11.0 gomodules.xyz/jsonpatch/v2 v2.5.0 k8s.io/api v0.33.0 @@ -48,6 +47,7 @@ require ( github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/x448/float16 v0.8.4 // indirect + golang.org/x/sync v0.17.0 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect sigs.k8s.io/randfill v1.0.0 // indirect ) diff --git a/main.go b/main.go index 7d152bf3..6f1bcec7 100644 --- a/main.go +++ b/main.go @@ -29,7 +29,6 @@ import ( crdcontroller "github.com/aws/amazon-vpc-resource-controller-k8s/controllers/crds" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/api" ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api" - "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup" eniCleaner "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" @@ -427,7 +426,11 @@ func main() { setupLog.Error(err, "unable to create introspect API") os.Exit(1) } - + nodeCleanup, err := crdcontroller.SetupENICleanupController(mgr, ctrl.Log.WithName("controllers").WithName("NodeENICleaner"), vpcID, ec2Wrapper) + if err != nil { + setupLog.Error(err, "unable to create controller", "controller", "NodeENICleaner") + os.Exit(1) + } finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log.WithName("finalizer manager")) if err = (crdcontroller.NewCNINodeReconciler( mgr.GetClient(), @@ -440,7 +443,7 @@ func main() { vpcID, finalizerManager, maxNodeConcurrentReconciles, - cleanup.NewNodeResourceCleaner, + nodeCleanup, ).SetupWithManager(mgr, maxNodeConcurrentReconciles)); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CNINode") os.Exit(1)