Skip to content

Commit 9ada771

Browse files
committed
fix broken things
Signed-off-by: Davanum Srinivas <[email protected]>
1 parent 1f8a23b commit 9ada771

File tree

3 files changed

+80
-86
lines changed

3 files changed

+80
-86
lines changed

health-events-analyzer/pkg/reconciler/reconciler.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,9 @@ import (
3232
"github.com/nvidia/nvsentinel/store-client/pkg/client"
3333
"github.com/nvidia/nvsentinel/store-client/pkg/datastore"
3434
"github.com/nvidia/nvsentinel/store-client/pkg/helper"
35-
"google.golang.org/protobuf/proto"
3635
)
3736

38-
const (
39-
maxRetries int = 5
40-
delay time.Duration = 10 * time.Second
41-
)
37+
// No retry constants needed - EventProcessor no longer retries internally
4238

4339
type HealthEventsAnalyzerReconcilerConfig struct {
4440
DataStoreConfig *datastore.DataStoreConfig
@@ -74,11 +70,12 @@ func (r *Reconciler) Start(ctx context.Context) error {
7470
r.databaseClient = bundle.DatabaseClient
7571

7672
// Create and configure the unified EventProcessor
73+
// Note: EventProcessor no longer retries internally to prevent blocking the event stream
74+
// Failed events will be retried on next pod restart (via resume token)
7775
processorConfig := client.EventProcessorConfig{
78-
MaxRetries: maxRetries,
79-
RetryDelay: delay,
80-
EnableMetrics: true,
81-
MetricsLabels: map[string]string{"module": "health-events-analyzer"},
76+
EnableMetrics: true,
77+
MetricsLabels: map[string]string{"module": "health-events-analyzer"},
78+
MarkProcessedOnError: false, // IMPORTANT: Don't mark failed events as processed
8279
}
8380

8481
r.eventProcessor = client.NewEventProcessor(bundle.ChangeStreamWatcher, bundle.DatabaseClient, processorConfig)
@@ -115,8 +112,10 @@ func (r *Reconciler) processHealthEvent(ctx context.Context, event *datamodels.H
115112
// Process the event using existing business logic
116113
publishedNewEvent, err := r.handleEvent(ctx, event)
117114
if err != nil {
118-
// Log error but let the EventProcessor handle retry logic
115+
// Return error - EventProcessor will NOT mark as processed
116+
// Event will be retried on next pod restart
119117
totalEventProcessingError.WithLabelValues("handle_event_error").Inc()
118+
slog.Error("Failed to process health event", "error", err, "nodeName", labelValue)
120119
return fmt.Errorf("failed to handle event: %w", err)
121120
}
122121

@@ -195,13 +194,9 @@ func (r *Reconciler) publishMatchedEvent(ctx context.Context,
195194

196195
actionVal := r.getRecommendedActionValue(rule.RecommendedAction, rule.Name)
197196

198-
// CRITICAL FIX: Clone the HealthEvent BEFORE passing to publisher
199-
// The event pointer comes from EventProcessor's reusable variable and may be mutated
200-
// by subsequent change stream events before the async gRPC call completes.
201-
// This prevents data corruption where published events have wrong field values.
202-
clonedHealthEvent := proto.Clone(event.HealthEvent).(*protos.HealthEvent)
203-
204-
err := r.config.Publisher.Publish(ctx, clonedHealthEvent, protos.RecommendedAction(actionVal), rule.Name)
197+
// No need to clone here - Publisher.Publish already clones the event
198+
// The EventProcessor creates a fresh stack variable for each event, so no mutation risk
199+
err := r.config.Publisher.Publish(ctx, event.HealthEvent, protos.RecommendedAction(actionVal), rule.Name)
205200
if err != nil {
206201
slog.Error("Error in publishing the new fatal event", "error", err)
207202
return fmt.Errorf("error in publishing the new fatal event: %w", err)

store-client/pkg/client/event_processor.go

Lines changed: 41 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,18 @@ import (
2525

2626
// EventProcessorConfig holds configuration for the unified event processor
2727
type EventProcessorConfig struct {
28-
// MaxRetries for event processing (0 = no retries, -1 = infinite retries)
28+
// MaxRetries for event processing (used by QueueEventProcessor for retry limit)
29+
// DefaultEventProcessor ignores this and does not retry
30+
// Set to 0 for no retries, -1 for unlimited retries
2931
MaxRetries int
30-
// RetryDelay between attempts
31-
RetryDelay time.Duration
3232
// EnableMetrics controls whether to track processing metrics
3333
EnableMetrics bool
3434
// MetricsLabels for module-specific metrics categorization
3535
MetricsLabels map[string]string
36+
// MarkProcessedOnError determines whether to mark events as processed even if handler returns error
37+
// Set to false (default) to preserve event for retry on next restart
38+
// Set to true to skip failed events and continue (use with caution - may lose events)
39+
MarkProcessedOnError bool
3640
}
3741

3842
// EventProcessor provides a unified interface for processing change stream events
@@ -71,14 +75,6 @@ type DefaultEventProcessor struct {
7175
func NewEventProcessor(
7276
watcher ChangeStreamWatcher, dbClient DatabaseClient, config EventProcessorConfig,
7377
) EventProcessor {
74-
if config.MaxRetries == 0 {
75-
config.MaxRetries = 3 // Default retry attempts
76-
}
77-
78-
if config.RetryDelay == 0 {
79-
config.RetryDelay = time.Second * 2 // Default retry delay
80-
}
81-
8278
return &DefaultEventProcessor{
8379
changeStreamWatcher: watcher,
8480
databaseClient: dbClient,
@@ -143,73 +139,63 @@ func (p *DefaultEventProcessor) processEvents(ctx context.Context) error {
143139
}
144140
}
145141

146-
// handleSingleEvent processes a single event with retry logic
142+
// handleSingleEvent processes a single event
143+
// IMPORTANT: Does NOT retry internally - handler is responsible for its own retries if needed
144+
// This prevents retry-induced blocking of the event stream
147145
func (p *DefaultEventProcessor) handleSingleEvent(ctx context.Context, event Event) error {
148146
startTime := time.Now()
149147

150148
// Unmarshal the event
151149
var healthEventWithStatus model.HealthEventWithStatus
152150
if err := event.UnmarshalDocument(&healthEventWithStatus); err != nil {
153151
p.updateMetrics("unmarshal_error", "", time.Since(startTime), false)
152+
// Unmarshal errors are non-recoverable, mark as processed to skip bad data
153+
if markErr := p.changeStreamWatcher.MarkProcessed(ctx, []byte{}); markErr != nil {
154+
slog.Error("Failed to mark processed after unmarshal error", "error", markErr)
155+
}
154156
return fmt.Errorf("failed to unmarshal event: %w", err)
155157
}
156158

157159
eventID, err := event.GetDocumentID()
158160
if err != nil {
159161
p.updateMetrics("document_id_error", "", time.Since(startTime), false)
162+
// Document ID errors are non-recoverable, mark as processed
163+
if markErr := p.changeStreamWatcher.MarkProcessed(ctx, []byte{}); markErr != nil {
164+
slog.Error("Failed to mark processed after document ID error", "error", markErr)
165+
}
160166
return fmt.Errorf("failed to get document ID: %w", err)
161167
}
162168

163-
// DEBUG: Log RAW event immediately after unmarshal, before any business logic
164-
if healthEventWithStatus.HealthEvent != nil {
165-
slog.Info("EventProcessor - RAW event received from change stream",
166-
"eventID", eventID,
167-
"agent", healthEventWithStatus.HealthEvent.Agent,
168-
"checkName", healthEventWithStatus.HealthEvent.CheckName,
169-
"isFatal", healthEventWithStatus.HealthEvent.IsFatal,
170-
"isHealthy", healthEventWithStatus.HealthEvent.IsHealthy,
171-
"recommendedAction", healthEventWithStatus.HealthEvent.RecommendedAction,
172-
"errorCode", healthEventWithStatus.HealthEvent.ErrorCode,
173-
"nodeName", healthEventWithStatus.HealthEvent.NodeName,
174-
"message", healthEventWithStatus.HealthEvent.Message,
175-
"createdAt", healthEventWithStatus.CreatedAt)
176-
}
177-
178169
slog.Debug("Processing event", "eventID", eventID, "event", healthEventWithStatus)
179170

180-
// Process with retry logic
181-
var processErr error
182-
183-
maxRetries := p.config.MaxRetries
184-
if maxRetries < 0 {
185-
maxRetries = 1000000 // Treat -1 as infinite retries (large number)
186-
}
187-
188-
for attempt := 1; attempt <= maxRetries; attempt++ {
189-
slog.Debug("Processing event", "attempt", attempt, "eventID", eventID, "maxRetries", p.config.MaxRetries)
190-
191-
processErr = p.eventHandler.ProcessEvent(ctx, &healthEventWithStatus)
192-
if processErr == nil {
193-
// Success
194-
p.updateMetrics("processing_success", eventID, time.Since(startTime), true)
195-
break
196-
}
197-
198-
slog.Error("Event processing failed", "attempt", attempt, "eventID", eventID, "error", processErr)
199-
200-
// Don't sleep on the last attempt
201-
if attempt < maxRetries {
202-
time.Sleep(p.config.RetryDelay)
203-
}
204-
}
171+
// Process event - NO internal retries
172+
// Handler is responsible for any retries it needs
173+
processErr := p.eventHandler.ProcessEvent(ctx, &healthEventWithStatus)
205174

206175
if processErr != nil {
207176
p.updateMetrics("processing_failed", eventID, time.Since(startTime), false)
208-
slog.Error("Max retries reached, event processing failed", "eventID", eventID, "error", processErr)
177+
slog.Error("Event processing failed", "eventID", eventID, "error", processErr)
178+
179+
// CRITICAL FIX: Only mark as processed if configured to do so OR if handler succeeded
180+
if p.config.MarkProcessedOnError {
181+
slog.Warn("Marking failed event as processed due to MarkProcessedOnError=true",
182+
"eventID", eventID, "error", processErr)
183+
if markErr := p.changeStreamWatcher.MarkProcessed(ctx, []byte{}); markErr != nil {
184+
slog.Error("Failed to mark processed after error", "error", markErr)
185+
return fmt.Errorf("failed to mark event as processed: %w", markErr)
186+
}
187+
} else {
188+
// Do NOT mark as processed - event will be retried on next restart
189+
slog.Error("Event processing failed, NOT marking as processed - will retry on restart",
190+
"eventID", eventID, "error", processErr)
191+
return processErr
192+
}
193+
194+
return processErr
209195
}
210196

211-
// Always mark as processed to advance the resume token
212-
// Note: Passing empty token allows MongoDB watcher to use its internal cursor state
197+
// Success - mark as processed
198+
p.updateMetrics("processing_success", eventID, time.Since(startTime), true)
213199
if err := p.changeStreamWatcher.MarkProcessed(ctx, []byte{}); err != nil {
214200
p.updateMetrics("mark_processed_error", eventID, time.Since(startTime), false)
215201
return fmt.Errorf("failed to mark event as processed: %w", err)

store-client/pkg/client/queue_event_processor.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -233,25 +233,38 @@ func (p *QueueEventProcessor) processQueuedEvent(ctx context.Context, queuedEven
233233
// Re-queue for retry with exponential backoff
234234
p.queue.AddRateLimited(queuedEvent)
235235
p.updateMetrics("processing_retry", queuedEvent.EventID, time.Since(startTime), false)
236-
236+
// Do NOT mark as processed yet - still retrying
237237
return
238+
}
239+
240+
// Max retries reached - final failure
241+
slog.Error("Max retries reached for event",
242+
"eventID", queuedEvent.EventID,
243+
"attempts", queuedEvent.Attempt)
244+
p.updateMetrics("processing_failed", queuedEvent.EventID, time.Since(startTime), false)
245+
246+
// CRITICAL FIX: Only mark as processed if configured to do so
247+
if p.config.MarkProcessedOnError {
248+
slog.Warn("Marking failed event as processed due to MarkProcessedOnError=true",
249+
"eventID", queuedEvent.EventID, "attempts", queuedEvent.Attempt)
250+
if markErr := p.changeStreamWatcher.MarkProcessed(ctx, []byte{}); markErr != nil {
251+
slog.Error("Failed to mark event as processed", "error", markErr)
252+
}
238253
} else {
239-
// Max retries reached
240-
slog.Error("Max retries reached for event",
241-
"eventID", queuedEvent.EventID,
242-
"attempts", queuedEvent.Attempt)
243-
p.updateMetrics("processing_failed", queuedEvent.EventID, time.Since(startTime), false)
254+
// Do NOT mark as processed - event will be retried on next restart
255+
slog.Error("Event processing failed after max retries, NOT marking as processed - will retry on restart",
256+
"eventID", queuedEvent.EventID, "attempts", queuedEvent.Attempt)
244257
}
245-
} else {
246-
// Success
247-
slog.Debug("Event processed successfully",
248-
"workerID", workerID,
249-
"eventID", queuedEvent.EventID)
250-
p.updateMetrics("processing_success", queuedEvent.EventID, time.Since(startTime), true)
258+
259+
return
251260
}
252261

253-
// Always mark as processed in the change stream to advance resume token
254-
// This prevents the change stream from getting stuck on failed events
262+
// Success - mark as processed
263+
slog.Debug("Event processed successfully",
264+
"workerID", workerID,
265+
"eventID", queuedEvent.EventID)
266+
p.updateMetrics("processing_success", queuedEvent.EventID, time.Since(startTime), true)
267+
255268
if markErr := p.changeStreamWatcher.MarkProcessed(ctx, []byte{}); markErr != nil {
256269
slog.Error("Failed to mark event as processed in change stream",
257270
"eventID", queuedEvent.EventID,

0 commit comments

Comments
 (0)