Skip to content

Commit d82c249

Browse files
committed
moving node cleaner to separate controller
1 parent 02c28c7 commit d82c249

File tree

5 files changed

+167
-64
lines changed

5 files changed

+167
-64
lines changed
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package crds
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
8+
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
9+
"github.com/go-logr/logr"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/apimachinery/pkg/types"
13+
"k8s.io/client-go/util/workqueue"
14+
ctrl "sigs.k8s.io/controller-runtime"
15+
"sigs.k8s.io/controller-runtime/pkg/client"
16+
"sigs.k8s.io/controller-runtime/pkg/controller"
17+
"sigs.k8s.io/controller-runtime/pkg/event"
18+
"sigs.k8s.io/controller-runtime/pkg/handler"
19+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
20+
"sigs.k8s.io/controller-runtime/pkg/source"
21+
)
22+
23+
type NodeCleanupObject struct {
24+
metav1.TypeMeta `json:",inline"`
25+
metav1.ObjectMeta `json:"metadata,omitempty"`
26+
27+
NodeID string `json:"nodeID"`
28+
ClusterName string `json:"clusterName"`
29+
}
30+
31+
func (o *NodeCleanupObject) DeepCopyObject() runtime.Object {
32+
if o == nil {
33+
return nil
34+
}
35+
cp := *o
36+
return &cp
37+
}
38+
39+
type ENICleanup struct {
40+
Events chan event.GenericEvent
41+
}
42+
43+
func SetupENICleanupController(mgr ctrl.Manager, log logr.Logger, vpcID string, ec2Wrapper ec2API.EC2Wrapper) (*ENICleanup, error) {
44+
45+
enicleanup := &ENICleanup{}
46+
enicleanup.Events = make(chan event.GenericEvent, 2048)
47+
nodecleanupHandler := enqueueRequestForNodeIDEvent{
48+
log: log,
49+
}
50+
rec := &ENICleanupReconciler{
51+
vpcID: vpcID,
52+
ec2Wrapper: ec2Wrapper,
53+
log: log,
54+
}
55+
source.Channel(enicleanup.Events, &nodecleanupHandler)
56+
err := ctrl.NewControllerManagedBy(mgr).
57+
Named("NodeENICleaner").
58+
WatchesRawSource(source.Channel(enicleanup.Events, &nodecleanupHandler)).
59+
WithOptions(controller.Options{
60+
MaxConcurrentReconciles: 5,
61+
}).
62+
Complete(rec)
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
return enicleanup, nil
68+
}
69+
70+
var _ handler.TypedEventHandler[client.Object, reconcile.Request] = (*enqueueRequestForNodeIDEvent)(nil)
71+
72+
type enqueueRequestForNodeIDEvent struct {
73+
log logr.Logger
74+
}
75+
76+
func (h *enqueueRequestForNodeIDEvent) Create(ctx context.Context, _ event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
77+
}
78+
79+
func (h *enqueueRequestForNodeIDEvent) Update(ctx context.Context, _ event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
80+
}
81+
82+
func (h *enqueueRequestForNodeIDEvent) Delete(ctx context.Context, _ event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
83+
}
84+
85+
func (h *enqueueRequestForNodeIDEvent) Generic(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
86+
if e.Object == nil {
87+
return
88+
}
89+
nodeCleanup, ok := e.Object.(*NodeCleanupObject)
90+
if !ok {
91+
h.log.Error(nil, "Failed to convert object to NodeCleanupObject")
92+
return
93+
}
94+
if nodeCleanup.Name == "" {
95+
h.log.Error(nil, "NodeID is empty")
96+
return
97+
}
98+
q.Add(reconcile.Request{
99+
NamespacedName: types.NamespacedName{
100+
Name: nodeCleanup.Name,
101+
},
102+
})
103+
}
104+
105+
type ENICleanupReconciler struct {
106+
vpcID string
107+
ec2Wrapper ec2API.EC2Wrapper
108+
log logr.Logger
109+
}
110+
111+
func (r *ENICleanupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
112+
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
113+
defer cancel()
114+
if err := cleanup.NewNodeResourceCleaner(req.NamespacedName.Name, r.ec2Wrapper, r.vpcID, r.log).DeleteLeakedResources(ctx); err != nil {
115+
return ctrl.Result{}, err
116+
117+
}
118+
return ctrl.Result{}, nil
119+
}

controllers/crds/cninode_controller.go

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@ import (
1919

2020
"github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
2121
ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
22-
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
2322
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
2423
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s"
2524
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"
2625
"github.com/go-logr/logr"
2726
"github.com/prometheus/client_golang/prometheus"
28-
"golang.org/x/sync/semaphore"
2927
v1 "k8s.io/api/core/v1"
3028
apierrors "k8s.io/apimachinery/pkg/api/errors"
3129
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -37,6 +35,7 @@ import (
3735
"sigs.k8s.io/controller-runtime/pkg/client"
3836
"sigs.k8s.io/controller-runtime/pkg/controller"
3937
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
38+
"sigs.k8s.io/controller-runtime/pkg/event"
4039
"sigs.k8s.io/controller-runtime/pkg/metrics"
4140
)
4241

@@ -69,16 +68,15 @@ func prometheusRegister() {
6968
// CNINodeReconciler reconciles a CNINode object
7069
type CNINodeReconciler struct {
7170
client.Client
72-
scheme *runtime.Scheme
73-
context context.Context
74-
log logr.Logger
75-
eC2Wrapper ec2API.EC2Wrapper
76-
k8sAPI k8s.K8sWrapper
77-
clusterName string
78-
vpcId string
79-
finalizerManager k8s.FinalizerManager
80-
deletePool *semaphore.Weighted
81-
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner
71+
scheme *runtime.Scheme
72+
context context.Context
73+
log logr.Logger
74+
eC2Wrapper ec2API.EC2Wrapper
75+
k8sAPI k8s.K8sWrapper
76+
clusterName string
77+
vpcId string
78+
finalizerManager k8s.FinalizerManager
79+
nodeCleanupQueue *ENICleanup
8280
}
8381

8482
func NewCNINodeReconciler(
@@ -92,20 +90,19 @@ func NewCNINodeReconciler(
9290
vpcId string,
9391
finalizerManager k8s.FinalizerManager,
9492
maxConcurrentWorkers int,
95-
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner,
93+
nodeCleanupQueue *ENICleanup,
9694
) *CNINodeReconciler {
9795
return &CNINodeReconciler{
98-
Client: client,
99-
scheme: scheme,
100-
context: ctx,
101-
log: logger,
102-
eC2Wrapper: ec2Wrapper,
103-
k8sAPI: k8sWrapper,
104-
clusterName: clusterName,
105-
vpcId: vpcId,
106-
finalizerManager: finalizerManager,
107-
deletePool: semaphore.NewWeighted(int64(maxConcurrentWorkers)),
108-
newResourceCleaner: newResourceCleaner,
96+
Client: client,
97+
scheme: scheme,
98+
context: ctx,
99+
log: logger,
100+
eC2Wrapper: ec2Wrapper,
101+
k8sAPI: k8sWrapper,
102+
clusterName: clusterName,
103+
vpcId: vpcId,
104+
finalizerManager: finalizerManager,
105+
nodeCleanupQueue: nodeCleanupQueue,
109106
}
110107
}
111108

@@ -156,19 +153,16 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
156153
r.log.Info("running the finalizer routine on cniNode", "cniNode", cniNode.Name)
157154
// run cleanup when node id is present
158155
if nodeID, ok := cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" {
159-
if !r.deletePool.TryAcquire(1) {
160-
r.log.Info("d, will requeue request")
161-
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
156+
obj := &NodeCleanupObject{
157+
ObjectMeta: metav1.ObjectMeta{
158+
Name: nodeID,
159+
Namespace: "",
160+
},
161+
NodeID: nodeID,
162+
}
163+
r.nodeCleanupQueue.Events <- event.GenericEvent{
164+
Object: obj,
162165
}
163-
go func(nodeID string) {
164-
defer r.deletePool.Release(1)
165-
childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout)
166-
defer cancel()
167-
if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil {
168-
r.log.Error(err, "failed to cleanup resources during node termination")
169-
ec2API.NodeTerminationENICleanupFailure.Inc()
170-
}
171-
}(nodeID)
172166
}
173167
}
174168

controllers/crds/cninode_controller_test.go

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,16 @@ import (
88
mock_api "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/api"
99
mock_cleanup "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
1010
mock_k8s "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/k8s"
11-
ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
12-
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
1311
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
14-
"github.com/go-logr/logr"
1512
"github.com/golang/mock/gomock"
1613
"github.com/stretchr/testify/assert"
17-
"golang.org/x/sync/semaphore"
1814
corev1 "k8s.io/api/core/v1"
1915
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2016
"k8s.io/apimachinery/pkg/runtime"
2117
"k8s.io/apimachinery/pkg/types"
2218
"sigs.k8s.io/controller-runtime/pkg/client"
2319
fakeClient "sigs.k8s.io/controller-runtime/pkg/client/fake"
20+
"sigs.k8s.io/controller-runtime/pkg/event"
2421
"sigs.k8s.io/controller-runtime/pkg/log/zap"
2522
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2623
)
@@ -55,14 +52,17 @@ func NewCNINodeMock(ctrl *gomock.Controller, mockObjects ...client.Object) *CNIN
5552
_ = corev1.AddToScheme(scheme)
5653
_ = v1alpha1.AddToScheme(scheme)
5754
client := fakeClient.NewClientBuilder().WithScheme(scheme).WithObjects(mockObjects...).Build()
55+
cleanupQueue := &ENICleanup{
56+
Events: make(chan event.GenericEvent, 2048),
57+
}
5858
return &CNINodeMock{
5959
Reconciler: CNINodeReconciler{
60-
Client: client,
61-
scheme: scheme,
62-
log: zap.New(),
63-
clusterName: mockClusterName,
64-
vpcId: "vpc-000000000000",
65-
deletePool: semaphore.NewWeighted(10),
60+
Client: client,
61+
scheme: scheme,
62+
log: zap.New(),
63+
clusterName: mockClusterName,
64+
vpcId: "vpc-000000000000",
65+
nodeCleanupQueue: cleanupQueue,
6666
},
6767
}
6868
}
@@ -118,12 +118,7 @@ func TestCNINodeReconcile(t *testing.T) {
118118
},
119119
},
120120
},
121-
prepare: func(f *fields) {
122-
f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner {
123-
return f.mockResourceCleaner
124-
}
125-
f.mockResourceCleaner.EXPECT().DeleteLeakedResources(gomock.Any()).Times(0)
126-
},
121+
127122
asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) {
128123
assert.NoError(t, err)
129124
assert.Equal(t, res, reconcile.Result{})
@@ -149,14 +144,6 @@ func TestCNINodeReconcile(t *testing.T) {
149144
},
150145
},
151146
},
152-
prepare: func(f *fields) {
153-
f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner {
154-
assert.Equal(t, "i-0123456789abcdef0", nodeID)
155-
return f.mockResourceCleaner
156-
}
157-
f.mockResourceCleaner.EXPECT().DeleteLeakedResources(gomock.Any()).Times(1).Return(nil)
158-
159-
},
160147
asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) {
161148
assert.NoError(t, err)
162149
assert.Equal(t, res, reconcile.Result{})

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ require (
2424
github.com/prometheus/common v0.62.0
2525
github.com/stretchr/testify v1.10.0
2626
go.uber.org/zap v1.27.0
27-
golang.org/x/sync v0.17.0
2827
golang.org/x/time v0.11.0
2928
gomodules.xyz/jsonpatch/v2 v2.5.0
3029
k8s.io/api v0.33.0
@@ -48,6 +47,7 @@ require (
4847
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
4948
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
5049
github.com/x448/float16 v0.8.4 // indirect
50+
golang.org/x/sync v0.17.0 // indirect
5151
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
5252
sigs.k8s.io/randfill v1.0.0 // indirect
5353
)

main.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
crdcontroller "github.com/aws/amazon-vpc-resource-controller-k8s/controllers/crds"
3030
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/api"
3131
ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
32-
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
3332
eniCleaner "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
3433
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition"
3534
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
@@ -427,7 +426,11 @@ func main() {
427426
setupLog.Error(err, "unable to create introspect API")
428427
os.Exit(1)
429428
}
430-
429+
nodeCleanup, err := crdcontroller.SetupENICleanupController(mgr, ctrl.Log.WithName("controllers").WithName("NodeENICleaner"), vpcID, ec2Wrapper)
430+
if err != nil {
431+
setupLog.Error(err, "unable to create controller", "controller", "NodeENICleaner")
432+
os.Exit(1)
433+
}
431434
finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log.WithName("finalizer manager"))
432435
if err = (crdcontroller.NewCNINodeReconciler(
433436
mgr.GetClient(),
@@ -440,7 +443,7 @@ func main() {
440443
vpcID,
441444
finalizerManager,
442445
maxNodeConcurrentReconciles,
443-
cleanup.NewNodeResourceCleaner,
446+
nodeCleanup,
444447
).SetupWithManager(mgr, maxNodeConcurrentReconciles)); err != nil {
445448
setupLog.Error(err, "unable to create controller", "controller", "CNINode")
446449
os.Exit(1)

0 commit comments

Comments
 (0)