Skip to content

Commit 380a475

Browse files
committed
fix postgres impl
Signed-off-by: Davanum Srinivas <[email protected]>
1 parent 6676af9 commit 380a475

File tree

6 files changed

+172
-5
lines changed

6 files changed

+172
-5
lines changed

fault-quarantine/pkg/informer/k8s_client.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ func (c *FaultQuarantineClient) QuarantineNodeAndSetAnnotations(
203203
annotations map[string]string,
204204
labels map[string]string,
205205
) error {
206+
slog.Info("DRY-RUN DEBUG: QuarantineNodeAndSetAnnotations called", "node", nodename,
207+
"numAnnotations", len(annotations), "isCordon", isCordon, "numTaints", len(taints),
208+
"dryRunMode", c.DryRunMode)
209+
206210
updateFn := func(node *v1.Node) error {
207211
if len(taints) > 0 {
208212
if err := c.applyTaints(node, taints, nodename); err != nil {
@@ -264,6 +268,10 @@ func (c *FaultQuarantineClient) applyTaints(node *v1.Node, taints []config.Taint
264268

265269
func (c *FaultQuarantineClient) handleCordon(node *v1.Node, nodename string) bool {
266270
_, exist := node.Annotations[common.QuarantineHealthEventAnnotationKey]
271+
272+
slog.Info("DRY-RUN DEBUG: handleCordon", "node", nodename, "isUnschedulable", node.Spec.Unschedulable,
273+
"hasQuarantineAnnotation", exist, "dryRunMode", c.DryRunMode)
274+
267275
if node.Spec.Unschedulable {
268276
if exist {
269277
slog.Info("Node already cordoned by FQM; skipping taint/annotation updates", "node", nodename)
@@ -287,6 +295,16 @@ func (c *FaultQuarantineClient) applyAnnotations(node *v1.Node, annotations map[
287295
node.Annotations = make(map[string]string)
288296
}
289297

298+
slog.Info("DRY-RUN DEBUG: applyAnnotations called", "node", nodename, "numAnnotations", len(annotations),
299+
"annotationKeys", func() []string {
300+
keys := make([]string, 0, len(annotations))
301+
for k := range annotations {
302+
keys = append(keys, k)
303+
}
304+
305+
return keys
306+
}())
307+
290308
slog.Info("Setting annotations on node", "node", nodename, "annotations", annotations)
291309

292310
for annotationKey, annotationValue := range annotations {

fault-quarantine/pkg/reconciler/reconciler.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,9 @@ func (r *Reconciler) handleEvent(
436436
annotationsMap := r.prepareAnnotations(taintsToBeApplied, &labelsMap, &isCordoned)
437437

438438
isNodeQuarantined := len(taintsToBeApplied) > 0 || isCordoned.Load()
439-
if !isNodeQuarantined {
439+
440+
// In dry-run mode, always apply annotations for observability even if no actions would be taken
441+
if !isNodeQuarantined && !r.config.DryRun {
440442
return nil
441443
}
442444

@@ -679,6 +681,8 @@ func (r *Reconciler) applyQuarantine(
679681
labelsMap *sync.Map,
680682
isCordoned *atomic.Bool,
681683
) *model.Status {
684+
slog.Info("DRY-RUN DEBUG: applyQuarantine called", "node", event.HealthEvent.NodeName,
685+
"numAnnotations", len(annotationsMap), "dryRunMode", r.config.DryRun)
682686
r.recordCordonEventInCircuitBreaker(event)
683687

684688
healthEvents := healthEventsAnnotation.NewHealthEventsAnnotationMap()
@@ -692,9 +696,21 @@ func (r *Reconciler) applyQuarantine(
692696
}
693697

694698
if err := r.addHealthEventAnnotation(healthEvents, annotationsMap); err != nil {
699+
slog.Error("DRY-RUN DEBUG: Failed to add health event annotation", "error", err, "node", event.HealthEvent.NodeName)
700+
695701
return nil
696702
}
697703

704+
slog.Info("DRY-RUN DEBUG: Added health event annotation", "node", event.HealthEvent.NodeName,
705+
"numAnnotations", len(annotationsMap), "annotationKeys", func() []string {
706+
keys := make([]string, 0, len(annotationsMap))
707+
for k := range annotationsMap {
708+
keys = append(keys, k)
709+
}
710+
711+
return keys
712+
}())
713+
698714
// Remove manual uncordon annotation if present before applying new quarantine
699715
r.cleanupManualUncordonAnnotation(ctx, event.HealthEvent.NodeName, annotations)
700716

platform-connectors/pkg/connectors/store/store_connector.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,28 +136,46 @@ func (r *DatabaseStoreConnector) insertHealthEvents(
136136
// Prepare all documents for batch insertion
137137
healthEventWithStatusList := make([]interface{}, 0, len(healthEvents.GetEvents()))
138138

139-
for _, healthEvent := range healthEvents.GetEvents() {
139+
slog.Info("=== [PLATFORM-CONNECTORS-DEBUG] insertHealthEvents ENTRY ===",
140+
"eventCount", len(healthEvents.GetEvents()))
141+
142+
for i, healthEvent := range healthEvents.GetEvents() {
140143
// CRITICAL FIX: Clone the HealthEvent to avoid pointer reuse issues with gRPC buffers
141144
// Without this clone, the healthEvent pointer may point to reused gRPC buffer memory
142145
// that gets overwritten by subsequent requests, causing data corruption in MongoDB.
143146
// This manifests as events having wrong isfatal/ishealthy/message values.
144147
clonedHealthEvent := proto.Clone(healthEvent).(*protos.HealthEvent)
145148

149+
slog.Info("[PLATFORM-CONNECTORS-DEBUG] Processing health event",
150+
"index", i,
151+
"nodeName", clonedHealthEvent.NodeName,
152+
"checkName", clonedHealthEvent.CheckName,
153+
"isFatal", clonedHealthEvent.IsFatal,
154+
"componentClass", clonedHealthEvent.ComponentClass)
155+
146156
healthEventWithStatusObj := model.HealthEventWithStatus{
147157
CreatedAt: time.Now().UTC(),
148158
HealthEvent: clonedHealthEvent,
149159
}
150160
healthEventWithStatusList = append(healthEventWithStatusList, healthEventWithStatusObj)
151161
}
152162

163+
slog.Info("=== [PLATFORM-CONNECTORS-DEBUG] About to call databaseClient.InsertMany ===",
164+
"documentCount", len(healthEventWithStatusList),
165+
"documentType", fmt.Sprintf("%T", healthEventWithStatusList[0]))
166+
153167
// Insert all documents in a single batch operation
154168
// This ensures MongoDB generates INSERT operations (not UPDATE) for change streams
155169
// Note: InsertMany is already atomic - either all documents are inserted or none are
156170
_, err := r.databaseClient.InsertMany(ctx, healthEventWithStatusList)
157171
if err != nil {
172+
slog.Error("=== [PLATFORM-CONNECTORS-DEBUG] InsertMany FAILED ===",
173+
"error", err)
158174
return fmt.Errorf("insertMany failed: %w", err)
159175
}
160176

177+
slog.Info("=== [PLATFORM-CONNECTORS-DEBUG] InsertMany SUCCESS ===")
178+
161179
return nil
162180
}
163181

store-client/pkg/client/postgresql_client.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"database/sql"
2020
"encoding/json"
2121
"fmt"
22+
"log/slog"
2223
"strings"
2324

2425
_ "github.com/lib/pq" // PostgreSQL driver
@@ -85,10 +86,23 @@ func (c *PostgreSQLClient) Close(ctx context.Context) error {
8586

8687
// InsertMany inserts multiple documents
8788
func (c *PostgreSQLClient) InsertMany(ctx context.Context, documents []interface{}) (*InsertManyResult, error) {
89+
slog.Info("=== [POSTGRESQL-CLIENT-DEBUG] PostgreSQLClient.InsertMany ENTRY ===",
90+
"documentCount", len(documents),
91+
"tableName", c.table)
92+
8893
if len(documents) == 0 {
94+
slog.Info("[POSTGRESQL-CLIENT-DEBUG] Empty documents, returning early")
8995
return &InsertManyResult{InsertedIDs: []interface{}{}}, nil
9096
}
9197

98+
// Check type of first document to see if it's a health event
99+
if len(documents) > 0 {
100+
firstDocType := fmt.Sprintf("%T", documents[0])
101+
slog.Info("=== [POSTGRESQL-CLIENT-DEBUG] Document type check ===",
102+
"documentType", firstDocType,
103+
"documentCount", len(documents))
104+
}
105+
92106
// Build batch insert query
93107
// INSERT INTO table (document) VALUES ($1), ($2), ... RETURNING id
94108
placeholders := make([]string, len(documents))
@@ -98,13 +112,20 @@ func (c *PostgreSQLClient) InsertMany(ctx context.Context, documents []interface
98112
// Marshal document to JSON
99113
docJSON, err := json.Marshal(doc)
100114
if err != nil {
115+
slog.Error("[POSTGRESQL-CLIENT-DEBUG] Failed to marshal document",
116+
"index", i,
117+
"error", err)
101118
return nil, datastore.NewSerializationError(
102119
datastore.ProviderPostgreSQL,
103120
fmt.Sprintf("failed to marshal document at index %d", i),
104121
err,
105122
)
106123
}
107124

125+
slog.Info("[POSTGRESQL-CLIENT-DEBUG] Marshaled document",
126+
"index", i,
127+
"jsonLength", len(docJSON))
128+
108129
placeholders[i] = fmt.Sprintf("($%d)", i+1)
109130
args[i] = docJSON
110131
}
@@ -116,9 +137,16 @@ func (c *PostgreSQLClient) InsertMany(ctx context.Context, documents []interface
116137
strings.Join(placeholders, ", "),
117138
)
118139

140+
slog.Info("=== [POSTGRESQL-CLIENT-DEBUG] About to execute INSERT query ===",
141+
"query", query,
142+
"argCount", len(args))
143+
119144
// Execute query and collect returned IDs
120145
rows, err := c.db.QueryContext(ctx, query, args...)
121146
if err != nil {
147+
slog.Error("=== [POSTGRESQL-CLIENT-DEBUG] INSERT query FAILED ===",
148+
"error", err,
149+
"query", query)
122150
return nil, datastore.NewInsertError(
123151
datastore.ProviderPostgreSQL,
124152
"failed to insert documents",
@@ -127,6 +155,8 @@ func (c *PostgreSQLClient) InsertMany(ctx context.Context, documents []interface
127155
}
128156
defer rows.Close()
129157

158+
slog.Info("[POSTGRESQL-CLIENT-DEBUG] INSERT query succeeded, reading IDs")
159+
130160
var insertedIDs []interface{}
131161

132162
for rows.Next() {

store-client/pkg/datastore/providers/postgresql/changestream.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,29 @@ func (e *PostgreSQLEventAdapter) GetResumeToken() []byte {
451451
func (e *PostgreSQLEventAdapter) UnmarshalDocument(v interface{}) error {
452452
// The fullDocument contains the actual document data
453453
if fullDoc, ok := e.eventData["fullDocument"]; ok {
454+
// Convert to map for easier manipulation
455+
docMap, ok := fullDoc.(map[string]interface{})
456+
if !ok {
457+
return fmt.Errorf("fullDocument is not a map[string]interface{}")
458+
}
459+
460+
// The PostgreSQL provider stores the document in a nested "document" field
461+
// Extract just the document field which contains the actual HealthEventWithStatus
462+
var actualDoc map[string]interface{}
463+
if nestedDoc, ok := docMap["document"].(map[string]interface{}); ok {
464+
actualDoc = nestedDoc
465+
} else {
466+
// Fall back to using the whole docMap if there's no nested document
467+
actualDoc = docMap
468+
}
469+
470+
// Transform lowercase keys to match the struct field names
471+
// This handles the case where PostgreSQL stores lowercase JSON field names
472+
// but Go struct tags may expect different casing
473+
transformedDoc := transformJSONKeys(actualDoc)
474+
454475
// Use JSON marshaling/unmarshaling for type conversion
455-
jsonData, err := json.Marshal(fullDoc)
476+
jsonData, err := json.Marshal(transformedDoc)
456477
if err != nil {
457478
return fmt.Errorf("failed to marshal event document: %w", err)
458479
}
@@ -467,6 +488,45 @@ func (e *PostgreSQLEventAdapter) UnmarshalDocument(v interface{}) error {
467488
return fmt.Errorf("fullDocument not found in event")
468489
}
469490

491+
// transformJSONKeys transforms lowercase JSON keys to match Go struct field names
492+
// This is needed because PostgreSQL stores lowercase JSON field names from bson tags
493+
// but protobuf fields need specific casing for proper unmarshaling
494+
func transformJSONKeys(doc map[string]interface{}) map[string]interface{} {
495+
result := make(map[string]interface{})
496+
497+
for key, value := range doc {
498+
// Handle nested maps recursively
499+
if nestedMap, ok := value.(map[string]interface{}); ok {
500+
value = transformJSONKeys(nestedMap)
501+
}
502+
503+
// Apply specific transformations for known fields
504+
// Map lowercase keys from bson tags to proper JSON keys
505+
switch key {
506+
case "healthevent":
507+
// Keep it as healthevent since that's what the bson tag expects
508+
result["healthevent"] = value
509+
case "healtheventstatus":
510+
result["healtheventstatus"] = value
511+
case "createdat":
512+
result["createdAt"] = value
513+
case "nodequarantined":
514+
result["nodequarantined"] = value
515+
case "userpodsevictionstatus":
516+
result["userpodsevictionstatus"] = value
517+
case "faultremediated":
518+
result["faultremediated"] = value
519+
case "lastremediationtimestamp":
520+
result["lastremediationtimestamp"] = value
521+
default:
522+
// Keep other keys as-is
523+
result[key] = value
524+
}
525+
}
526+
527+
return result
528+
}
529+
470530
// Verify PostgreSQLEventAdapter implements client.Event interface at compile time
471531
var _ client.Event = (*PostgreSQLEventAdapter)(nil)
472532

store-client/pkg/factory/client_factory.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ package factory
1616

1717
import (
1818
"context"
19+
"database/sql"
1920
"fmt"
2021
"os"
2122

23+
_ "github.com/lib/pq" // PostgreSQL driver
2224
"github.com/nvidia/nvsentinel/store-client/pkg/client"
2325
"github.com/nvidia/nvsentinel/store-client/pkg/config"
2426
"github.com/nvidia/nvsentinel/store-client/pkg/datastore"
27+
providers_postgresql "github.com/nvidia/nvsentinel/store-client/pkg/datastore/providers/postgresql"
2528
)
2629

2730
// ClientFactory provides a simple interface for creating database clients
@@ -72,8 +75,30 @@ func (f *ClientFactory) CreateDatabaseClient(ctx context.Context) (client.Databa
7275

7376
switch provider {
7477
case string(datastore.ProviderPostgreSQL):
75-
// PostgreSQL client (Phase 2-3: Basic skeleton with Ping/Close implemented)
76-
return client.NewPostgreSQLClient(ctx, f.dbConfig)
78+
// Create PostgreSQL connection
79+
db, err := sql.Open("postgres", f.dbConfig.GetConnectionURI())
80+
if err != nil {
81+
return nil, datastore.NewConnectionError(
82+
datastore.ProviderPostgreSQL,
83+
"failed to open PostgreSQL connection",
84+
err,
85+
)
86+
}
87+
88+
// Test the connection
89+
if err := db.PingContext(ctx); err != nil {
90+
db.Close()
91+
92+
return nil, datastore.NewConnectionError(
93+
datastore.ProviderPostgreSQL,
94+
"failed to connect to PostgreSQL",
95+
err,
96+
)
97+
}
98+
99+
// Return the provider's PostgreSQL database client which has health event field extraction
100+
tableName := f.dbConfig.GetCollectionName() // In PostgreSQL context, collection = table
101+
return providers_postgresql.NewPostgreSQLDatabaseClient(db, tableName), nil
77102

78103
case string(datastore.ProviderMongoDB), "":
79104
// Default to MongoDB for backward compatibility

0 commit comments

Comments
 (0)