Skip to content

Commit 28117ba

Browse files
committed
try to fix ci failures
Signed-off-by: Davanum Srinivas <[email protected]>
1 parent 58510c0 commit 28117ba

File tree

4 files changed

+266
-19
lines changed

4 files changed

+266
-19
lines changed

node-drainer/main.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ import (
2424
"strconv"
2525
"syscall"
2626

27+
"github.com/nvidia/nvsentinel/commons/pkg/eventutil"
2728
"github.com/nvidia/nvsentinel/commons/pkg/flags"
2829
"github.com/nvidia/nvsentinel/commons/pkg/logger"
2930
"github.com/nvidia/nvsentinel/commons/pkg/server"
3031
"github.com/nvidia/nvsentinel/data-models/pkg/model"
3132
"github.com/nvidia/nvsentinel/node-drainer/pkg/initializer"
3233
"github.com/nvidia/nvsentinel/store-client/pkg/client"
33-
"github.com/nvidia/nvsentinel/store-client/pkg/datastore"
3434
"github.com/nvidia/nvsentinel/store-client/pkg/query"
3535
"golang.org/x/sync/errgroup"
3636
)
@@ -274,29 +274,37 @@ func handleColdStart(ctx context.Context, components *initializer.Components) er
274274

275275
slog.Info("Found events to re-process", "count", len(healthEvents))
276276

277-
// Convert to Event format for compatibility with existing code
278-
events := make([]datastore.Event, len(healthEvents))
279-
for i, he := range healthEvents {
280-
events[i] = he.RawEvent
281-
}
282-
283277
// Re-process each event
284-
for _, event := range events {
285-
// Extract health event (datastore.Event is map[string]interface{})
286-
healthEvent, ok := event["healthevent"].(datastore.Event)
287-
if !ok {
288-
slog.Error("Failed to extract healthevent from cold start event")
278+
for _, he := range healthEvents {
279+
// Use the RawEvent from the database query which includes _id
280+
// This is critical for status updates to work properly
281+
event := he.RawEvent
282+
if len(event) == 0 {
283+
slog.Error("RawEvent is empty, skipping cold start event")
284+
continue
285+
}
286+
287+
// Parse the event to extract node name
288+
parsedEvent, err := eventutil.ParseHealthEventFromEvent(event)
289+
if err != nil {
290+
slog.Error("Failed to parse health event from cold start event", "error", err)
289291
continue
290292
}
291293

292-
nodeName, ok := healthEvent["nodename"].(string)
293-
if !ok {
294-
slog.Error("Failed to extract node name from cold start event")
294+
if parsedEvent.HealthEvent == nil {
295+
slog.Error("Health event is nil in cold start event")
296+
continue
297+
}
298+
299+
nodeName := parsedEvent.HealthEvent.GetNodeName()
300+
if nodeName == "" {
301+
slog.Error("Node name is empty in cold start event")
295302
continue
296303
}
297304

298305
// Create adapter to bridge interface differences
299306
dbAdapter := &dataStoreAdapter{DatabaseClient: components.DatabaseClient}
307+
300308
if err := components.QueueManager.EnqueueEventGeneric(ctx, nodeName, event, dbAdapter); err != nil {
301309
slog.Error("Failed to enqueue cold start event", "error", err, "nodeName", nodeName)
302310
} else {

store-client/pkg/datastore/providers/mongodb/health_store.go

Lines changed: 129 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ import (
1818
"context"
1919
"fmt"
2020

21+
"go.mongodb.org/mongo-driver/bson"
22+
"go.mongodb.org/mongo-driver/bson/primitive"
23+
2124
"github.com/nvidia/nvsentinel/store-client/pkg/client"
2225
"github.com/nvidia/nvsentinel/store-client/pkg/datastore"
2326
)
@@ -100,6 +103,9 @@ func (h *MongoHealthEventStore) FindHealthEventsByNode(ctx context.Context,
100103
).WithMetadata("nodeName", nodeName)
101104
}
102105

106+
// Normalize HealthEvent fields from bson.M to map[string]interface{}
107+
normalizeHealthEvents(events)
108+
103109
return events, nil
104110
}
105111

@@ -118,15 +124,55 @@ func (h *MongoHealthEventStore) FindHealthEventsByFilter(ctx context.Context,
118124

119125
var events []datastore.HealthEventWithStatus
120126

121-
err = cursor.All(ctx, &events)
122-
if err != nil {
127+
// Iterate manually to populate both the struct and RawEvent with _id
128+
for cursor.Next(ctx) {
129+
// First decode into a raw map to preserve _id
130+
var rawDoc map[string]interface{}
131+
if err := cursor.Decode(&rawDoc); err != nil {
132+
return nil, datastore.NewQueryError(
133+
datastore.ProviderMongoDB,
134+
"failed to decode raw document",
135+
err,
136+
).WithMetadata("filter", filter)
137+
}
138+
139+
// Then use bson to convert the raw map into our struct
140+
var event datastore.HealthEventWithStatus
141+
142+
bsonBytes, err := bson.Marshal(rawDoc)
143+
if err != nil {
144+
return nil, datastore.NewQueryError(
145+
datastore.ProviderMongoDB,
146+
"failed to marshal raw document to BSON",
147+
err,
148+
).WithMetadata("filter", filter)
149+
}
150+
151+
if err := bson.Unmarshal(bsonBytes, &event); err != nil {
152+
return nil, datastore.NewQueryError(
153+
datastore.ProviderMongoDB,
154+
"failed to unmarshal BSON to health event",
155+
err,
156+
).WithMetadata("filter", filter)
157+
}
158+
159+
// Store the raw document (with _id) in RawEvent
160+
event.RawEvent = rawDoc
161+
162+
events = append(events, event)
163+
}
164+
165+
if err := cursor.Err(); err != nil {
123166
return nil, datastore.NewQueryError(
124167
datastore.ProviderMongoDB,
125-
"failed to decode health events by filter",
168+
"cursor error while iterating health events",
126169
err,
127170
).WithMetadata("filter", filter)
128171
}
129172

173+
// Normalize HealthEvent fields from bson.M to map[string]interface{}
174+
normalizeHealthEvents(events)
175+
130176
return events, nil
131177
}
132178

@@ -162,6 +208,9 @@ func (h *MongoHealthEventStore) FindHealthEventsByStatus(ctx context.Context,
162208
).WithMetadata("status", string(status))
163209
}
164210

211+
// Normalize HealthEvent fields from bson.M to map[string]interface{}
212+
normalizeHealthEvents(events)
213+
165214
return events, nil
166215
}
167216

@@ -281,3 +330,80 @@ func (h *MongoHealthEventStore) UpdateHealthEventsByQuery(ctx context.Context,
281330

282331
return nil
283332
}
333+
334+
// normalizeHealthEvents converts bson.M types to map[string]interface{} in HealthEvent fields
335+
// This ensures consistency across database providers (MongoDB-specific types -> generic maps)
336+
func normalizeHealthEvents(events []datastore.HealthEventWithStatus) {
337+
for i := range events {
338+
if events[i].HealthEvent != nil {
339+
events[i].HealthEvent = normalizeValue(events[i].HealthEvent)
340+
}
341+
}
342+
}
343+
344+
// normalizeValue recursively converts MongoDB types (bson.M, primitive.D, primitive.A, etc.) to standard Go types
345+
func normalizeValue(v interface{}) interface{} {
346+
switch val := v.(type) {
347+
case primitive.D:
348+
return normalizePrimitiveD(val)
349+
case primitive.A:
350+
return normalizeArray(val)
351+
case map[string]interface{}:
352+
return normalizeMap(val)
353+
case []interface{}:
354+
return normalizeArray(val)
355+
default:
356+
// Primitive types, return as-is
357+
return val
358+
}
359+
}
360+
361+
// normalizePrimitiveD converts primitive.D to map[string]interface{} and normalizes nested values
362+
func normalizePrimitiveD(val primitive.D) interface{} {
363+
bsonBytes, err := bson.Marshal(val)
364+
if err != nil {
365+
return val // Return as-is if marshal fails
366+
}
367+
368+
var m map[string]interface{}
369+
if err := bson.Unmarshal(bsonBytes, &m); err != nil {
370+
return val // Return as-is if unmarshal fails
371+
}
372+
373+
return normalizeMap(m)
374+
}
375+
376+
// normalizeMap recursively normalizes all values in a map
377+
func normalizeMap(m map[string]interface{}) map[string]interface{} {
378+
result := make(map[string]interface{}, len(m))
379+
for k, v := range m {
380+
result[k] = normalizeValue(v)
381+
}
382+
383+
return result
384+
}
385+
386+
// normalizeArray recursively normalizes all elements in an array
387+
func normalizeArray(arr interface{}) []interface{} {
388+
var length int
389+
390+
var getValue func(int) interface{}
391+
392+
switch v := arr.(type) {
393+
case primitive.A:
394+
length = len(v)
395+
getValue = func(i int) interface{} { return v[i] }
396+
case []interface{}:
397+
length = len(v)
398+
getValue = func(i int) interface{} { return v[i] }
399+
default:
400+
return nil
401+
}
402+
403+
result := make([]interface{}, length)
404+
for i := 0; i < length; i++ {
405+
result[i] = normalizeValue(getValue(i))
406+
}
407+
408+
return result
409+
}

store-client/pkg/datastore/providers/mongodb/health_store_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ package mongodb
1717
import (
1818
"context"
1919
"errors"
20+
"reflect"
2021
"testing"
2122
"time"
2223

2324
"github.com/stretchr/testify/assert"
2425
"github.com/stretchr/testify/mock"
26+
"go.mongodb.org/mongo-driver/bson/primitive"
2527

2628
"github.com/nvidia/nvsentinel/store-client/pkg/client"
2729
"github.com/nvidia/nvsentinel/store-client/pkg/datastore"
@@ -287,3 +289,113 @@ func TestMongoHealthEventStore_FindLatestEventForNode(t *testing.T) {
287289
mockResult.AssertExpectations(t)
288290
})
289291
}
292+
293+
func TestNormalizeValue(t *testing.T) {
294+
tests := []struct {
295+
name string
296+
input interface{}
297+
expected interface{}
298+
}{
299+
{
300+
name: "primitive.D with nested values",
301+
input: primitive.D{
302+
{Key: "nodename", Value: "test-node"},
303+
{Key: "status", Value: "healthy"},
304+
{Key: "metadata", Value: primitive.D{
305+
{Key: "region", Value: "us-west"},
306+
{Key: "zone", Value: "a"},
307+
}},
308+
},
309+
expected: map[string]interface{}{
310+
"nodename": "test-node",
311+
"status": "healthy",
312+
"metadata": map[string]interface{}{
313+
"region": "us-west",
314+
"zone": "a",
315+
},
316+
},
317+
},
318+
{
319+
name: "primitive.D with array containing primitive.D",
320+
input: primitive.D{
321+
{Key: "items", Value: []interface{}{
322+
primitive.D{
323+
{Key: "name", Value: "item1"},
324+
{Key: "value", Value: int32(100)},
325+
},
326+
primitive.D{
327+
{Key: "name", Value: "item2"},
328+
{Key: "value", Value: int32(200)},
329+
},
330+
}},
331+
},
332+
expected: map[string]interface{}{
333+
"items": []interface{}{
334+
map[string]interface{}{
335+
"name": "item1",
336+
"value": int32(100),
337+
},
338+
map[string]interface{}{
339+
"name": "item2",
340+
"value": int32(200),
341+
},
342+
},
343+
},
344+
},
345+
{
346+
name: "already normalized map",
347+
input: map[string]interface{}{
348+
"key1": "value1",
349+
"key2": map[string]interface{}{
350+
"nested": "value2",
351+
},
352+
},
353+
expected: map[string]interface{}{
354+
"key1": "value1",
355+
"key2": map[string]interface{}{
356+
"nested": "value2",
357+
},
358+
},
359+
},
360+
{
361+
name: "array with mixed types",
362+
input: []interface{}{
363+
"string",
364+
123,
365+
primitive.D{{Key: "key", Value: "value"}},
366+
map[string]interface{}{"already": "normalized"},
367+
},
368+
expected: []interface{}{
369+
"string",
370+
123,
371+
map[string]interface{}{"key": "value"},
372+
map[string]interface{}{"already": "normalized"},
373+
},
374+
},
375+
{
376+
name: "primitive string",
377+
input: "test-string",
378+
expected: "test-string",
379+
},
380+
{
381+
name: "primitive int",
382+
input: 42,
383+
expected: 42,
384+
},
385+
{
386+
name: "nil value",
387+
input: nil,
388+
expected: nil,
389+
},
390+
}
391+
392+
for _, tt := range tests {
393+
t.Run(tt.name, func(t *testing.T) {
394+
result := normalizeValue(tt.input)
395+
if !reflect.DeepEqual(result, tt.expected) {
396+
t.Errorf("normalizeValue() = %v (type: %T), expected %v (type: %T)",
397+
result, result, tt.expected, tt.expected)
398+
}
399+
})
400+
}
401+
}

store-client/pkg/datastore/providers/postgresql/datastore.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ func (p *PostgreSQLDataStore) NewChangeStreamWatcher(
195195
watcher := NewPostgreSQLChangeStreamWatcher(p.db, clientName, tableName)
196196
watcher.pipelineFilter = pipelineFilter
197197

198-
return watcher, nil
198+
// Wrap the watcher to provide Unwrap() support for backward compatibility
199+
return NewPostgreSQLChangeStreamWatcherWithUnwrap(watcher), nil
199200
}
200201

201202
// --- Backward Compatibility Methods for MongoDB-style Type Assertions ---

0 commit comments

Comments
 (0)