Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions controllers/crds/cleanup_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package crds

import (
"context"
"time"

ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

type NodeCleanupObject struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

NodeID string `json:"nodeID"`
ClusterName string `json:"clusterName"`
}

func (o *NodeCleanupObject) DeepCopyObject() runtime.Object {
if o == nil {
return nil
}
cp := *o
return &cp
}

type ENICleanup struct {
Events chan event.GenericEvent
}

func SetupENICleanupController(mgr ctrl.Manager, log logr.Logger, vpcID string, ec2Wrapper ec2API.EC2Wrapper) (*ENICleanup, error) {

enicleanup := &ENICleanup{}
enicleanup.Events = make(chan event.GenericEvent, 2048)
nodecleanupHandler := enqueueRequestForNodeIDEvent{
log: log,
}
rec := &ENICleanupReconciler{
vpcID: vpcID,
ec2Wrapper: ec2Wrapper,
log: log,
}
source.Channel(enicleanup.Events, &nodecleanupHandler)
err := ctrl.NewControllerManagedBy(mgr).
Named("NodeENICleaner").
WatchesRawSource(source.Channel(enicleanup.Events, &nodecleanupHandler)).
WithOptions(controller.Options{
MaxConcurrentReconciles: 5,
}).
Complete(rec)
if err != nil {
return nil, err
}

return enicleanup, nil
}

var _ handler.TypedEventHandler[client.Object, reconcile.Request] = (*enqueueRequestForNodeIDEvent)(nil)

type enqueueRequestForNodeIDEvent struct {
log logr.Logger
}

func (h *enqueueRequestForNodeIDEvent) Create(ctx context.Context, _ event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
}

func (h *enqueueRequestForNodeIDEvent) Update(ctx context.Context, _ event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
}

func (h *enqueueRequestForNodeIDEvent) Delete(ctx context.Context, _ event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
}

func (h *enqueueRequestForNodeIDEvent) Generic(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
if e.Object == nil {
return
}
nodeCleanup, ok := e.Object.(*NodeCleanupObject)
if !ok {
h.log.Error(nil, "Failed to convert object to NodeCleanupObject")
return
}
if nodeCleanup.Name == "" {
h.log.Error(nil, "NodeID is empty")
return
}
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: nodeCleanup.Name,
},
})
}

type ENICleanupReconciler struct {
vpcID string
ec2Wrapper ec2API.EC2Wrapper
log logr.Logger
}

func (r *ENICleanupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()
if err := cleanup.NewNodeResourceCleaner(req.NamespacedName.Name, r.ec2Wrapper, r.vpcID, r.log).DeleteLeakedResources(ctx); err != nil {
return ctrl.Result{}, err

}
return ctrl.Result{}, nil
}
66 changes: 30 additions & 36 deletions controllers/crds/cninode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ import (

"github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,6 +35,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

Expand Down Expand Up @@ -69,16 +68,15 @@ func prometheusRegister() {
// CNINodeReconciler reconciles a CNINode object
type CNINodeReconciler struct {
client.Client
scheme *runtime.Scheme
context context.Context
log logr.Logger
eC2Wrapper ec2API.EC2Wrapper
k8sAPI k8s.K8sWrapper
clusterName string
vpcId string
finalizerManager k8s.FinalizerManager
deletePool *semaphore.Weighted
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner
scheme *runtime.Scheme
context context.Context
log logr.Logger
eC2Wrapper ec2API.EC2Wrapper
k8sAPI k8s.K8sWrapper
clusterName string
vpcId string
finalizerManager k8s.FinalizerManager
nodeCleanupQueue *ENICleanup
}

func NewCNINodeReconciler(
Expand All @@ -92,20 +90,19 @@ func NewCNINodeReconciler(
vpcId string,
finalizerManager k8s.FinalizerManager,
maxConcurrentWorkers int,
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner,
nodeCleanupQueue *ENICleanup,
) *CNINodeReconciler {
return &CNINodeReconciler{
Client: client,
scheme: scheme,
context: ctx,
log: logger,
eC2Wrapper: ec2Wrapper,
k8sAPI: k8sWrapper,
clusterName: clusterName,
vpcId: vpcId,
finalizerManager: finalizerManager,
deletePool: semaphore.NewWeighted(int64(maxConcurrentWorkers)),
newResourceCleaner: newResourceCleaner,
Client: client,
scheme: scheme,
context: ctx,
log: logger,
eC2Wrapper: ec2Wrapper,
k8sAPI: k8sWrapper,
clusterName: clusterName,
vpcId: vpcId,
finalizerManager: finalizerManager,
nodeCleanupQueue: nodeCleanupQueue,
}
}

Expand Down Expand Up @@ -156,19 +153,16 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
r.log.Info("running the finalizer routine on cniNode", "cniNode", cniNode.Name)
// run cleanup when node id is present
if nodeID, ok := cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" {
if !r.deletePool.TryAcquire(1) {
r.log.Info("d, will requeue request")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
obj := &NodeCleanupObject{
ObjectMeta: metav1.ObjectMeta{
Name: nodeID,
Namespace: "",
},
NodeID: nodeID,
}
r.nodeCleanupQueue.Events <- event.GenericEvent{
Object: obj,
}
go func(nodeID string) {
defer r.deletePool.Release(1)
childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout)
defer cancel()
if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil {
r.log.Error(err, "failed to cleanup resources during node termination")
ec2API.NodeTerminationENICleanupFailure.Inc()
}
}(nodeID)
}
}

Expand Down
35 changes: 11 additions & 24 deletions controllers/crds/cninode_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,16 @@ import (
mock_api "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/api"
mock_cleanup "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
mock_k8s "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/k8s"
ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
"github.com/go-logr/logr"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/semaphore"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
fakeClient "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -55,14 +52,17 @@ func NewCNINodeMock(ctrl *gomock.Controller, mockObjects ...client.Object) *CNIN
_ = corev1.AddToScheme(scheme)
_ = v1alpha1.AddToScheme(scheme)
client := fakeClient.NewClientBuilder().WithScheme(scheme).WithObjects(mockObjects...).Build()
cleanupQueue := &ENICleanup{
Events: make(chan event.GenericEvent, 2048),
}
return &CNINodeMock{
Reconciler: CNINodeReconciler{
Client: client,
scheme: scheme,
log: zap.New(),
clusterName: mockClusterName,
vpcId: "vpc-000000000000",
deletePool: semaphore.NewWeighted(10),
Client: client,
scheme: scheme,
log: zap.New(),
clusterName: mockClusterName,
vpcId: "vpc-000000000000",
nodeCleanupQueue: cleanupQueue,
},
}
}
Expand Down Expand Up @@ -118,12 +118,7 @@ func TestCNINodeReconcile(t *testing.T) {
},
},
},
prepare: func(f *fields) {
f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner {
return f.mockResourceCleaner
}
f.mockResourceCleaner.EXPECT().DeleteLeakedResources(gomock.Any()).Times(0)
},

asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) {
assert.NoError(t, err)
assert.Equal(t, res, reconcile.Result{})
Expand All @@ -149,14 +144,6 @@ func TestCNINodeReconcile(t *testing.T) {
},
},
},
prepare: func(f *fields) {
f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner {
assert.Equal(t, "i-0123456789abcdef0", nodeID)
return f.mockResourceCleaner
}
f.mockResourceCleaner.EXPECT().DeleteLeakedResources(gomock.Any()).Times(1).Return(nil)

},
asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) {
assert.NoError(t, err)
assert.Equal(t, res, reconcile.Result{})
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ require (
github.com/prometheus/common v0.62.0
github.com/stretchr/testify v1.10.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.17.0
golang.org/x/time v0.11.0
gomodules.xyz/jsonpatch/v2 v2.5.0
k8s.io/api v0.33.0
Expand All @@ -48,6 +47,7 @@ require (
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/sync v0.17.0 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
)
Expand Down
9 changes: 6 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
crdcontroller "github.com/aws/amazon-vpc-resource-controller-k8s/controllers/crds"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/api"
ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
eniCleaner "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
Expand Down Expand Up @@ -427,7 +426,11 @@ func main() {
setupLog.Error(err, "unable to create introspect API")
os.Exit(1)
}

nodeCleanup, err := crdcontroller.SetupENICleanupController(mgr, ctrl.Log.WithName("controllers").WithName("NodeENICleaner"), vpcID, ec2Wrapper)
if err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NodeENICleaner")
os.Exit(1)
}
finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log.WithName("finalizer manager"))
if err = (crdcontroller.NewCNINodeReconciler(
mgr.GetClient(),
Expand All @@ -440,7 +443,7 @@ func main() {
vpcID,
finalizerManager,
maxNodeConcurrentReconciles,
cleanup.NewNodeResourceCleaner,
nodeCleanup,
).SetupWithManager(mgr, maxNodeConcurrentReconciles)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CNINode")
os.Exit(1)
Expand Down