Skip to content

Commit 6090a48

Browse files
committed
chore: add node informer
Signed-off-by: Tanisha goyal <[email protected]>
1 parent 0dcf50c commit 6090a48

File tree

5 files changed

+353
-101
lines changed

5 files changed

+353
-101
lines changed

health-monitors/csp-health-monitor/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ require (
7474
github.com/gogo/protobuf v1.3.2 // indirect
7575
github.com/golang/snappy v1.0.0 // indirect
7676
github.com/google/gnostic-models v0.7.0 // indirect
77+
github.com/google/go-cmp v0.7.0 // indirect
7778
github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d // indirect
7879
github.com/google/s2a-go v0.1.9 // indirect
7980
github.com/google/uuid v1.6.0 // indirect

health-monitors/csp-health-monitor/pkg/csp/aws/aws.go

Lines changed: 10 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
awsConfig "github.com/aws/aws-sdk-go-v2/config"
3535
"github.com/aws/aws-sdk-go-v2/service/health"
3636
"github.com/aws/aws-sdk-go-v2/service/health/types"
37-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3837
"k8s.io/client-go/kubernetes"
3938
"k8s.io/client-go/rest"
4039
"k8s.io/client-go/tools/clientcmd"
@@ -110,6 +109,7 @@ type AWSClient struct {
110109
clusterName string
111110
kubeconfigPath string
112111
store datastore.Store
112+
nodeInformer *NodeInformer
113113
}
114114

115115
func NewClient(
@@ -164,6 +164,11 @@ func NewClient(
164164

165165
slog.Info("AWS Client: Kubernetes clientset initialized successfully.")
166166

167+
nodeInformer := NewNodeInformer(k8sClient)
168+
nodeInformer.Start(ctx)
169+
170+
slog.Info("AWS Client: Node informer started successfully")
171+
167172
normalizer, err := eventpkg.GetNormalizer(model.CSPAWS)
168173
if err != nil {
169174
return nil, fmt.Errorf("failed to get AWS normalizer: %w", err)
@@ -177,6 +182,7 @@ func NewClient(
177182
clusterName: clusterName,
178183
kubeconfigPath: kubeconfigPath,
179184
store: store,
185+
nodeInformer: nodeInformer,
180186
}, nil
181187
}
182188

@@ -289,17 +295,7 @@ func (c *AWSClient) pollNewEvents(ctx context.Context,
289295

290296
slog.Debug("Polling AWS Health API")
291297

292-
instanceIDs, err := c.getClusterInstanceNodeMap(ctx)
293-
if err != nil {
294-
metrics.CSPAPIErrors.WithLabelValues(string(model.CSPAWS), "get_nodes_provider_id_error").Inc()
295-
slog.Error("Error getting nodes provider IDs", "error", err)
296-
297-
return fmt.Errorf("error getting nodes provider IDs: %w", err)
298-
}
299-
300-
slog.Debug("Found nodes with instance IDs", "instanceIDs", instanceIDs)
301-
302-
err = c.handleMaintenanceEvents(ctx, instanceIDs, eventChan, pollStartTime)
298+
err := c.handleMaintenanceEvents(ctx, eventChan, pollStartTime)
303299
if err != nil {
304300
metrics.CSPAPIErrors.WithLabelValues(string(model.CSPAWS), "handle_maintenance_events_error").Inc()
305301
slog.Error("Error polling AWS Health events", "error", err)
@@ -313,7 +309,6 @@ func (c *AWSClient) pollNewEvents(ctx context.Context,
313309
// handleMaintenanceEvents performs a single poll request to the AWS Health API.
314310
func (c *AWSClient) handleMaintenanceEvents(
315311
ctx context.Context,
316-
instanceIDs map[string]string,
317312
eventChan chan<- model.MaintenanceEvent,
318313
pollStartTime time.Time,
319314
) error {
@@ -366,8 +361,6 @@ func (c *AWSClient) handleMaintenanceEvents(
366361

367362
var wg sync.WaitGroup
368363

369-
var mu sync.Mutex
370-
371364
var errs *multierror.Error
372365

373366
for eventID, event := range eventArnsMap {
@@ -384,17 +377,15 @@ func (c *AWSClient) handleMaintenanceEvents(
384377
wg.Done()
385378
}()
386379

380+
instanceIDs := c.nodeInformer.GetInstanceIDs()
381+
387382
err := c.processAWSHealthEvent(ctx, eventID, eventData, instanceIDs, eventChan)
388383
if err != nil {
389384
slog.Error("Error processing AWS Health event",
390385
"eventID", eventID,
391386
"error", err)
392387

393-
mu.Lock()
394-
395388
errs = multierror.Append(errs, err)
396-
397-
mu.Unlock()
398389
}
399390
}(eventID, event)
400391
}
@@ -768,54 +759,6 @@ func (c *AWSClient) checkStatusOfKnownEvents(ctx context.Context, activeEvent mo
768759
return awsEvents.Events[0], string(awsEvents.Events[0].StatusCode), nil
769760
}
770761

771-
// GetNodesProviderId returns a list of EC2 instance IDs for the nodes in this cluster
772-
func (c *AWSClient) getClusterInstanceNodeMap(ctx context.Context) (map[string]string, error) {
773-
slog.Debug("Fetching Kubernetes nodes to derive EC2 instance IDs")
774-
775-
nodes, err := c.k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
776-
if err != nil {
777-
return nil, fmt.Errorf("failed to list nodes: %w", err)
778-
}
779-
780-
instanceIDs := make(map[string]string)
781-
782-
for _, node := range nodes.Items {
783-
if node.Spec.ProviderID == "" {
784-
slog.Info("Node has no providerID", "node", node.Name)
785-
continue
786-
}
787-
788-
// Parse AWS provider ID format: aws:///us-east-1/i-0123456789abcdef0
789-
if !strings.HasPrefix(node.Spec.ProviderID, "aws:///") {
790-
slog.Info("Node has non-AWS providerID",
791-
"node", node.Name,
792-
"providerID", node.Spec.ProviderID)
793-
794-
continue
795-
}
796-
797-
idPart := strings.TrimPrefix(node.Spec.ProviderID, "aws:///")
798-
799-
parts := strings.Split(idPart, "/")
800-
instanceID := parts[len(parts)-1]
801-
802-
if strings.HasPrefix(instanceID, "i-") {
803-
instanceIDs[instanceID] = node.Name
804-
slog.Debug("Found instance ID for node",
805-
"instanceID", instanceID,
806-
"node", node.Name)
807-
} else {
808-
slog.Info("Unexpected instance ID format for node",
809-
"node", node.Name,
810-
"instanceID", instanceID)
811-
}
812-
}
813-
814-
slog.Debug("Found AWS EC2 instances in the cluster", "count", len(instanceIDs))
815-
816-
return instanceIDs, nil
817-
}
818-
819762
func (c *AWSClient) mapToValidAction(desc string) pb.RecommendedAction {
820763
lines := strings.Split(desc, "\n")
821764

health-monitors/csp-health-monitor/pkg/csp/aws/aws_test.go

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,24 @@ func createTestClient(t *testing.T) (*AWSClient, *MockAWSHealthClient, *fake.Cli
9999
_, err := fakeK8sClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{})
100100
assert.NoError(t, err)
101101

102+
nodeInformer := &NodeInformer{
103+
k8sClient: fakeK8sClient,
104+
instanceIDs: map[string]string{
105+
testInstanceID: testNodeName,
106+
},
107+
}
108+
102109
client := &AWSClient{
103110
config: config.AWSConfig{
104111
Region: testRegion,
105112
PollingIntervalSeconds: 60,
106113
Enabled: true,
107114
},
108-
awsClient: mockAWSClient,
109-
k8sClient: fakeK8sClient,
110-
normalizer: &eventpkg.AWSNormalizer{},
111-
clusterName: "test-cluster",
115+
awsClient: mockAWSClient,
116+
k8sClient: fakeK8sClient,
117+
normalizer: &eventpkg.AWSNormalizer{},
118+
clusterName: "test-cluster",
119+
nodeInformer: nodeInformer,
112120
}
113121

114122
return client, mockAWSClient, fakeK8sClient
@@ -167,12 +175,9 @@ func TestHandleMaintenanceEvents(t *testing.T) {
167175
}, nil)
168176
// Setup test channel and test instance IDs
169177
eventChan := make(chan model.MaintenanceEvent, 10)
170-
instanceIDs := map[string]string{
171-
testInstanceID: testNodeName,
172-
}
173178

174179
// Call the function being tested
175-
err := client.handleMaintenanceEvents(context.Background(), instanceIDs, eventChan, pollStartTime)
180+
err := client.handleMaintenanceEvents(context.Background(), eventChan, pollStartTime)
176181
assert.NoError(t, err)
177182

178183
// Verify we received an event
@@ -202,12 +207,9 @@ func TestNoMaintenanceEvents(t *testing.T) {
202207

203208
// Setup test channel and test instance IDs
204209
eventChan := make(chan model.MaintenanceEvent, 10)
205-
instanceIDs := map[string]string{
206-
testInstanceID: testNodeName,
207-
}
208210

209211
// Call the function being tested
210-
err := client.handleMaintenanceEvents(context.Background(), instanceIDs, eventChan, pollStartTime)
212+
err := client.handleMaintenanceEvents(context.Background(), eventChan, pollStartTime)
211213
assert.NoError(t, err)
212214

213215
mockAWSClient.AssertNotCalled(t, "DescribeAffectedEntities", mock.Anything, mock.Anything)
@@ -304,14 +306,15 @@ func TestMultipleAffectedEntities(t *testing.T) {
304306

305307
// Setup test channel and instance IDs
306308
eventChan := make(chan model.MaintenanceEvent, 10)
307-
instanceIDs := map[string]string{
309+
310+
client.nodeInformer.instanceIDs = map[string]string{
308311
testInstanceID: testNodeName,
309312
testInstanceID1: testNodeName1,
310313
testInstanceID2: testNodeName2,
311314
}
312315

313316
// Call the function being tested
314-
err := client.handleMaintenanceEvents(context.Background(), instanceIDs, eventChan, pollStartTime)
317+
err := client.handleMaintenanceEvents(context.Background(), eventChan, pollStartTime)
315318
assert.NoError(t, err)
316319

317320
// Verify we received events for all affected instances
@@ -427,14 +430,14 @@ func TestCompletedEvent(t *testing.T) {
427430

428431
// Setup test channel and instance IDs
429432
eventChan := make(chan model.MaintenanceEvent, 10)
430-
instanceIDs := map[string]string{
433+
client.nodeInformer.instanceIDs = map[string]string{
431434
testInstanceID: testNodeName,
432435
testInstanceID1: testNodeName1,
433436
testInstanceID2: testNodeName2,
434437
}
435438

436439
// Call the function being tested
437-
err := client.handleMaintenanceEvents(context.Background(), instanceIDs, eventChan, pollStartTime)
440+
err := client.handleMaintenanceEvents(context.Background(), eventChan, pollStartTime)
438441
assert.NoError(t, err)
439442

440443
// Verify we received a completed event
@@ -476,12 +479,12 @@ func TestErrorScenario(t *testing.T) {
476479

477480
// Setup test channel and test instance IDs
478481
eventChan := make(chan model.MaintenanceEvent, 10)
479-
instanceIDs := map[string]string{
482+
client.nodeInformer.instanceIDs = map[string]string{
480483
testInstanceID: testNodeName,
481484
}
482485

483486
// Call the function being tested - should not panic but return error
484-
err := client.handleMaintenanceEvents(context.Background(), instanceIDs, eventChan, pollStartTime)
487+
err := client.handleMaintenanceEvents(context.Background(), eventChan, pollStartTime)
485488
assert.Error(t, err)
486489

487490
mockAWSClient.AssertNotCalled(t, "DescribeAffectedEntities", mock.Anything, mock.Anything)
@@ -520,12 +523,12 @@ func TestTimeWindowFiltering(t *testing.T) {
520523

521524
// Setup test channel and test instance IDs
522525
eventChan := make(chan model.MaintenanceEvent, 10)
523-
instanceIDs := map[string]string{
526+
client.nodeInformer.instanceIDs = map[string]string{
524527
testInstanceID: testNodeName,
525528
}
526529

527530
// Call the function being tested
528-
err := client.handleMaintenanceEvents(context.Background(), instanceIDs, eventChan, pollStartTime)
531+
err := client.handleMaintenanceEvents(context.Background(), eventChan, pollStartTime)
529532
assert.NoError(t, err)
530533

531534
// Verify no events were received (as our filter should exclude the old event)
@@ -608,11 +611,11 @@ func TestInstanceFiltering(t *testing.T) {
608611

609612
// Setup test channel with our cluster's instance IDs only
610613
eventChan := make(chan model.MaintenanceEvent, 10)
611-
instanceIDs := map[string]string{
614+
client.nodeInformer.instanceIDs = map[string]string{
612615
testInstanceID: testNodeName,
613616
}
614617

615-
err := client.handleMaintenanceEvents(context.Background(), instanceIDs, eventChan, pollStartTime)
618+
err := client.handleMaintenanceEvents(context.Background(), eventChan, pollStartTime)
616619

617620
assert.Error(t, err)
618621
assert.Contains(t, err.Error(), "instance ID not found in node map")
@@ -709,12 +712,12 @@ func TestInvalidEntityData(t *testing.T) {
709712

710713
// Setup test channel and test instance IDs
711714
eventChan := make(chan model.MaintenanceEvent, 10)
712-
instanceIDs := map[string]string{
715+
client.nodeInformer.instanceIDs = map[string]string{
713716
testInstanceID: testNodeName,
714717
}
715718

716719
// Call the function - should handle nil values without panicking but return errors
717-
err := client.handleMaintenanceEvents(context.Background(), instanceIDs, eventChan, pollStartTime)
720+
err := client.handleMaintenanceEvents(context.Background(), eventChan, pollStartTime)
718721
// Should return error because 2 entities have invalid/nil values
719722
assert.Error(t, err)
720723
assert.Contains(t, err.Error(), "entity with nil EntityValue")
@@ -786,12 +789,12 @@ func TestInstanceRebootEvent(t *testing.T) {
786789
}, nil)
787790
// Setup test channel and test instance IDs
788791
eventChan := make(chan model.MaintenanceEvent, 10)
789-
instanceIDs := map[string]string{
792+
client.nodeInformer.instanceIDs = map[string]string{
790793
testInstanceID: testNodeName,
791794
}
792795

793796
// Call the function being tested
794-
err := client.handleMaintenanceEvents(context.Background(), instanceIDs, eventChan, pollStartTime)
797+
err := client.handleMaintenanceEvents(context.Background(), eventChan, pollStartTime)
795798
assert.NoError(t, err)
796799

797800
// Verify we received a maintenance event with correct type
@@ -808,11 +811,9 @@ func TestInstanceRebootEvent(t *testing.T) {
808811
}
809812
}
810813

811-
// Test that ignores instance retirement events
812814
func TestIgnoredEventTypes(t *testing.T) {
813815
client, mockAWSClient, _ := createTestClient(t)
814816

815-
// Setup test data for events that should be ignored
816817
startTime := time.Now().Add(24 * time.Hour)
817818
endTime := startTime.Add(2 * time.Hour)
818819
testInstanceIDIgnored := "i-0123456789abcdef1"
@@ -829,7 +830,6 @@ func TestIgnoredEventTypes(t *testing.T) {
829830
testRegion, testService, MAINTENANCE_SCHEDULED,
830831
)
831832

832-
// Setup AWS Health API mock with events to be ignored
833833
mockAWSClient.On("DescribeEvents", mock.Anything, mock.Anything).Return(&health.DescribeEventsOutput{
834834
Events: []types.Event{
835835
{
@@ -886,15 +886,13 @@ func TestIgnoredEventTypes(t *testing.T) {
886886
},
887887
}, nil)
888888

889-
// Setup test channel and test instance IDs
890889
eventChan := make(chan model.MaintenanceEvent, 10)
891-
instanceIDs := map[string]string{
890+
client.nodeInformer.instanceIDs = map[string]string{
892891
testInstanceID: testNodeName,
893892
testInstanceIDIgnored: testNodeNameIgnored,
894893
}
895894

896-
// Call the function being tested
897-
err := client.handleMaintenanceEvents(context.Background(), instanceIDs, eventChan, pollStartTime)
895+
err := client.handleMaintenanceEvents(context.Background(), eventChan, pollStartTime)
898896
assert.NoError(t, err)
899897

900898
// Verify no events were received (as these should be filtered out)
@@ -905,6 +903,5 @@ func TestIgnoredEventTypes(t *testing.T) {
905903
assert.Equal(t, testInstanceIDIgnored, event.ResourceID)
906904
assert.Equal(t, model.StatusDetected, event.Status)
907905
default:
908-
// This is expected, no events should be present
909906
}
910907
}

0 commit comments

Comments
 (0)