@@ -265,7 +265,14 @@ func (r *Reconciler) validateAllSequenceCriteria(ctx context.Context, rule confi
265265 continue
266266 }
267267
268- facets = append (facets , getFacet (facetName , timeWindow , matchCriteria ))
268+ // Check if burst grouping is enabled for this sequence
269+ if rule .BurstGapSeconds > 0 {
270+ facets = append (facets , getFacetWithBurstDetection (facetName , timeWindow , rule .BurstGapSeconds ,
271+ matchCriteria , healthEventWithStatus .HealthEvent .ErrorCode [0 ]))
272+ slog .Info ("Burst detection facet" , "facet" , facets [len (facets )- 1 ])
273+ } else {
274+ facets = append (facets , getFacet (facetName , timeWindow , matchCriteria ))
275+ }
269276 }
270277
271278 if len (facets ) == 0 {
@@ -330,6 +337,125 @@ func getFacet(facetName string, timeWindow time.Duration, matchCriteria bson.D)
330337 }
331338}
332339
340+ // getFacetWithBurstDetection creates a facet pipeline with gap-based burst detection
341+ // Events are grouped into bursts when separated by more than burstGapSeconds
342+ func getFacetWithBurstDetection (facetName string , timeWindow time.Duration ,
343+ burstGapSeconds int , matchCriteria bson.D ,
344+ targetXID string ) bson.E {
345+ return bson.E {
346+ Key : facetName ,
347+ Value : bson.A {
348+ // Stage 1: Match events within time window
349+ bson.D {{Key : "$match" , Value : bson.D {
350+ {Key : "healthevent.generatedtimestamp.seconds" , Value : bson.D {
351+ {Key : "$gte" , Value : time .Now ().UTC ().Add (- timeWindow ).Unix ()},
352+ }},
353+ {Key : "healthevent.checkname" , Value : bson.D {{Key : "$ne" , Value : "HealthEventsAnalyzer" }}},
354+ }}},
355+
356+ // Stage 2: Match node, GPU criteria (but NOT XID - we need all XIDs for gap detection)
357+ bson.D {{Key : "$match" , Value : matchCriteria }},
358+
359+ // Stage 3: Sort by timestamp (required for window functions)
360+ bson.D {{Key : "$sort" , Value : bson.D {
361+ {Key : "healthevent.generatedtimestamp.seconds" , Value : 1 },
362+ }}},
363+
364+ // Stage 4: Use window function to get previous event timestamp (ANY XID)
365+ bson.D {{Key : "$setWindowFields" , Value : bson.D {
366+ {Key : "sortBy" , Value : bson.D {
367+ {Key : "healthevent.generatedtimestamp.seconds" , Value : 1 },
368+ }},
369+ {Key : "output" , Value : bson.D {
370+ {Key : "prevTimestamp" , Value : bson.D {
371+ {Key : "$shift" , Value : bson.D {
372+ {Key : "output" , Value : "$healthevent.generatedtimestamp.seconds" },
373+ {Key : "by" , Value : - 1 },
374+ }},
375+ }},
376+ }},
377+ }}},
378+
379+ // Stage 5: Calculate gap and mark new bursts
380+ bson.D {{Key : "$addFields" , Value : bson.D {
381+ {Key : "timeSincePrev" , Value : bson.D {
382+ {Key : "$cond" , Value : bson.D {
383+ {Key : "if" , Value : bson.D {{Key : "$eq" , Value : bson.A {"$prevTimestamp" , nil }}}},
384+ {Key : "then" , Value : 0 },
385+ {Key : "else" , Value : bson.D {
386+ {Key : "$subtract" , Value : bson.A {
387+ "$healthevent.generatedtimestamp.seconds" ,
388+ "$prevTimestamp" ,
389+ }},
390+ }},
391+ }},
392+ }},
393+ {Key : "isNewBurst" , Value : bson.D {
394+ {Key : "$cond" , Value : bson.D {
395+ {Key : "if" , Value : bson.D {{Key : "$eq" , Value : bson.A {"$prevTimestamp" , nil }}}}, // First event
396+ {Key : "then" , Value : 1 },
397+ {Key : "else" , Value : bson.D {
398+ {Key : "$cond" , Value : bson.D {
399+ {Key : "if" , Value : bson.D {
400+ {Key : "$gt" , Value : bson.A {
401+ bson.D {{Key : "$subtract" , Value : bson.A {
402+ "$healthevent.generatedtimestamp.seconds" ,
403+ "$prevTimestamp" ,
404+ }}},
405+ burstGapSeconds , // Gap threshold in seconds
406+ }},
407+ }},
408+ {Key : "then" , Value : 1 }, // New burst
409+ {Key : "else" , Value : 0 }, // Same burst continues
410+ }},
411+ }},
412+ }},
413+ }},
414+ }}},
415+
416+ // Stage 6: Assign burst IDs using cumulative sum - CRITICAL window bounds
417+ bson.D {{Key : "$setWindowFields" , Value : bson.D {
418+ {Key : "sortBy" , Value : bson.D {
419+ {Key : "healthevent.generatedtimestamp.seconds" , Value : 1 },
420+ }},
421+ {Key : "output" , Value : bson.D {
422+ {Key : "burstId" , Value : bson.D {
423+ {Key : "$sum" , Value : "$isNewBurst" },
424+ {Key : "window" , Value : bson.D {
425+ {Key : "documents" , Value : bson.A {"unbounded" , "current" }}, // Running sum - CRITICAL
426+ }},
427+ }},
428+ }},
429+ }}},
430+
431+ // Stage 7: Group by burst and collect XIDs
432+ bson.D {{Key : "$group" , Value : bson.D {
433+ {Key : "_id" , Value : bson.D {
434+ {Key : "burstId" , Value : "$burstId" },
435+ }},
436+ {Key : "uniqueXidsInBurst" , Value : bson.D {{Key : "$push" , Value : bson.D {
437+ {Key : "$arrayElemAt" , Value : bson.A {"$healthevent.errorcode" , 0 }},
438+ }}}},
439+ }}},
440+
441+ // Stage 8: Filter to only bursts containing the target XID
442+ bson.D {{Key : "$match" , Value : bson.D {
443+ {Key : "uniqueXidsInBurst" , Value : targetXID },
444+ }}},
445+
446+ // Stage 9: Count distinct bursts and collect burst details
447+ bson.D {{Key : "$group" , Value : bson.D {
448+ {Key : "_id" , Value : nil },
449+ {Key : "count" , Value : bson.D {{Key : "$sum" , Value : 1 }}},
450+ {Key : "bursts" , Value : bson.D {{Key : "$push" , Value : bson.D {
451+ {Key : "burstId" , Value : "$_id.burstId" },
452+ {Key : "uniqueXids" , Value : "$uniqueXidsInBurst" },
453+ }}}},
454+ }}},
455+ },
456+ }
457+ }
458+
333459func getPipeline (facets bson.D , rule config.HealthEventsAnalyzerRule ) mongo.Pipeline {
334460 return mongo.Pipeline {
335461 {{Key : "$facet" , Value : facets }},
0 commit comments