Skip to content

Commit 02ed5d1

Browse files
committed
fix problems 4
Signed-off-by: Davanum Srinivas <[email protected]>
1 parent d638c57 commit 02ed5d1

File tree

3 files changed

+70
-9
lines changed

3 files changed

+70
-9
lines changed

health-events-analyzer/pkg/parser/parser.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ package parser
1717
import (
1818
"encoding/json"
1919
"fmt"
20+
"log/slog"
2021
"reflect"
2122
"regexp"
2223
"strconv"
2324
"strings"
2425

2526
datamodels "github.com/nvidia/nvsentinel/data-models/pkg/model"
27+
"github.com/nvidia/nvsentinel/store-client/pkg/utils"
2628
)
2729

2830
// ParseSequenceStage parses a JSON stage string and replaces "this." references with actual event values
@@ -56,7 +58,17 @@ func processValue(value interface{}, event datamodels.HealthEventWithStatus) (in
5658
return nil, fmt.Errorf("error in getting value from path '%s': %w", fieldPath, err)
5759
}
5860

59-
return resolvedValue, nil
61+
// CRITICAL: Normalize field names for protobuf values embedded in MongoDB aggregation pipelines
62+
// This converts camelCase JSON field names (entityType, entityValue) to lowercase BSON field names
63+
// (entitytype, entityvalue) so they match MongoDB's storage and filter expectations
64+
normalized := utils.NormalizeFieldNamesForMongoDB(resolvedValue)
65+
66+
slog.Debug("Resolved this. reference",
67+
"path", fieldPath,
68+
"original_value", resolvedValue,
69+
"normalized_value", normalized)
70+
71+
return normalized, nil
6072
}
6173

6274
return v, nil

health-events-analyzer/pkg/parser/parser_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,33 @@ func TestParseSequenceStage(t *testing.T) {
253253
},
254254
wantErr: false,
255255
},
256+
{
257+
name: "stage with full entitiesimpacted array (MongoDB filter context)",
258+
stage: `{"$match": {"$filter": {"input": "this.healthevent.entitiesimpacted", "cond": {"$eq": ["$$this.entitytype", "GPU"]}}}}`,
259+
event: datamodels.HealthEventWithStatus{
260+
HealthEvent: &protos.HealthEvent{
261+
EntitiesImpacted: []*protos.Entity{
262+
{EntityType: "GPU", EntityValue: "0"},
263+
{EntityType: "CPU", EntityValue: "1"},
264+
},
265+
},
266+
},
267+
// After normalization, protobuf array should have lowercase field names
268+
want: map[string]interface{}{
269+
"$match": map[string]interface{}{
270+
"$filter": map[string]interface{}{
271+
"input": []interface{}{
272+
map[string]interface{}{"entitytype": "GPU", "entityvalue": "0"},
273+
map[string]interface{}{"entitytype": "CPU", "entityvalue": "1"},
274+
},
275+
"cond": map[string]interface{}{
276+
"$eq": []interface{}{"$$this.entitytype", "GPU"},
277+
},
278+
},
279+
},
280+
},
281+
wantErr: false,
282+
},
256283
{
257284
name: "invalid JSON stage",
258285
stage: `{invalid json}`,

health-events-analyzer/pkg/reconciler/reconciler.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,11 @@ func (r *Reconciler) getRecommendedActionValue(recommendedAction, ruleName strin
223223

224224
func (r *Reconciler) validateAllSequenceCriteria(ctx context.Context, rule config.HealthEventsAnalyzerRule,
225225
healthEventWithStatus datamodels.HealthEventWithStatus) (bool, error) {
226-
slog.Debug("Evaluating rule for event", "rule_name", rule.Name, "event", healthEventWithStatus)
226+
slog.Info("→ Evaluating rule for event",
227+
"rule_name", rule.Name,
228+
"node", healthEventWithStatus.HealthEvent.NodeName,
229+
"error_code", healthEventWithStatus.HealthEvent.ErrorCode,
230+
"agent", healthEventWithStatus.HealthEvent.Agent)
227231

228232
// Build aggregation pipeline from stages
229233
pipelineStages, err := r.getPipelineStages(rule, healthEventWithStatus)
@@ -234,12 +238,14 @@ func (r *Reconciler) validateAllSequenceCriteria(ctx context.Context, rule confi
234238
return false, fmt.Errorf("failed to build pipeline stages: %w", err)
235239
}
236240

241+
slog.Debug("Generated pipeline", "rule_name", rule.Name, "pipeline_stages", pipelineStages)
242+
237243
var result []map[string]interface{}
238244

239245
// Execute aggregation using store-client abstraction
240246
cursor, err := r.databaseClient.Aggregate(ctx, pipelineStages)
241247
if err != nil {
242-
slog.Error("Failed to execute aggregation pipeline", "error", err)
248+
slog.Error("Failed to execute aggregation pipeline", "error", err, "rule_name", rule.Name)
243249
totalEventProcessingError.WithLabelValues("execute_pipeline_error").Inc()
244250

245251
return false, fmt.Errorf("failed to execute aggregation pipeline: %w", err)
@@ -248,33 +254,45 @@ func (r *Reconciler) validateAllSequenceCriteria(ctx context.Context, rule confi
248254
defer cursor.Close(ctx)
249255

250256
if err = cursor.All(ctx, &result); err != nil {
251-
slog.Error("Failed to decode cursor", "error", err)
257+
slog.Error("Failed to decode cursor", "error", err, "rule_name", rule.Name)
252258
totalEventProcessingError.WithLabelValues("decode_cursor_error").Inc()
253259

254260
return false, fmt.Errorf("failed to decode cursor: %w", err)
255261
}
256262

263+
slog.Debug("Aggregation results", "rule_name", rule.Name, "result_count", len(result), "results", result)
264+
257265
// Check if we have results (rule matched)
258266
if len(result) > 0 {
259267
// Check for explicit ruleMatched field (used in tests and by SequenceFacet pipelines)
260268
if matched, ok := result[0]["ruleMatched"].(bool); ok {
261269
if matched {
262-
slog.Debug("Rule matched (via ruleMatched field)", "rule_name", rule.Name)
270+
slog.Info("✓ Rule matched via ruleMatched field",
271+
"rule_name", rule.Name,
272+
"node", healthEventWithStatus.HealthEvent.NodeName)
263273
return true, nil
264274
}
265275

266-
slog.Debug("Rule did not match (ruleMatched=false)", "rule_name", rule.Name)
276+
slog.Info("✗ Rule did not match (ruleMatched=false)",
277+
"rule_name", rule.Name,
278+
"node", healthEventWithStatus.HealthEvent.NodeName,
279+
"result", result[0])
267280

268281
return false, nil
269282
}
270283

271284
// For Stage-based pipelines, presence of results indicates a match
272-
slog.Debug("Rule matched (results exist)", "rule_name", rule.Name, "result_count", len(result))
285+
slog.Info("✓ Rule matched via results existence",
286+
"rule_name", rule.Name,
287+
"node", healthEventWithStatus.HealthEvent.NodeName,
288+
"result_count", len(result))
273289

274290
return true, nil
275291
}
276292

277-
slog.Debug("Rule did not match (no results)", "rule_name", rule.Name)
293+
slog.Info("✗ Rule did not match (no results)",
294+
"rule_name", rule.Name,
295+
"node", healthEventWithStatus.HealthEvent.NodeName)
278296

279297
return false, nil
280298
}
@@ -297,14 +315,18 @@ func (r *Reconciler) getPipelineStages(
297315

298316
for i, stageStr := range rule.Stage {
299317
// Parse the stage and resolve "this." references
318+
slog.Debug("Parsing stage", "stage_index", i, "stage_string", stageStr)
319+
300320
stageMap, err := parser.ParseSequenceStage(stageStr, healthEventWithStatus)
301321
if err != nil {
302-
slog.Error("Failed to parse stage", "stage_index", i, "error", err)
322+
slog.Error("Failed to parse stage", "stage_index", i, "error", err, "stage_string", stageStr)
303323
totalEventProcessingError.WithLabelValues("parse_stage_error").Inc()
304324

305325
return nil, fmt.Errorf("failed to parse stage %d: %w", i, err)
306326
}
307327

328+
slog.Debug("Parsed stage successfully", "stage_index", i, "parsed_stage", stageMap)
329+
308330
pipeline = append(pipeline, stageMap)
309331
}
310332

0 commit comments

Comments
 (0)