From 68c069e21aa1ada714254ba9d2e0f7bbe43f06d3 Mon Sep 17 00:00:00 2001 From: Kevin Sheldrake Date: Tue, 4 Nov 2025 17:13:10 +0000 Subject: [PATCH] tetragon: handle test events sequentially The test infrastructure that monitors the ring buffers and handles events during tests does not mirror how tetragon does this in production. When monitoring events for real, we have an event cache channel that the ring buffer monitors write to, and the event handler reads from. This means that events are handled sequentially. In the test infrastructure, the ring buffer monitors handle the events directly, resulting in two events able to be handle concurrently, which causes problems. This commit introduces an event cache to the test infrastructure and uses that to handle events sequentially, avoiding the problem. Signed-off-by: Kevin Sheldrake --- pkg/testutils/perfring/perfring.go | 77 +++++++++++++++++------------- 1 file changed, 43 insertions(+), 34 deletions(-) diff --git a/pkg/testutils/perfring/perfring.go b/pkg/testutils/perfring/perfring.go index aedbe7b0e1d..1cde45d7202 100644 --- a/pkg/testutils/perfring/perfring.go +++ b/pkg/testutils/perfring/perfring.go @@ -100,14 +100,27 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted wgStarted.Done() - errChan := make(chan error) + errChan := make(chan error, 2) defer close(errChan) - complChan := make(chan bool) + complChan := make(chan bool, 2) defer close(complChan) + // Create an events queue that both the ring buffers can send to. + // This means we can handle the events sequentially by reading from the + // queue, avoiding issues of handling events concurrently. + type testEvent struct { + RawSample *[]byte + ComplChecker *testsensor.CompletionChecker + Ctx context.Context + CancelFunc context.CancelFunc + } + + eventsQueue := make(chan *testEvent, 65536) // arbitrary size for tests + var wg sync.WaitGroup wg.Add(1) + ctxPerfRing, cancelPerfRing := context.WithCancel(ctx) defer wg.Wait() go func() { @@ -115,68 +128,49 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted complChecker := testsensor.NewCompletionChecker() - for ctx.Err() == nil { + for ctxPerfRing.Err() == nil { record, err := perfReader.Read() if err != nil { - if ctx.Err() == nil && !errors.Is(err, os.ErrClosed) { + if ctxPerfRing.Err() == nil && !errors.Is(err, os.ErrClosed) { errChan <- fmt.Errorf("error reading perfring data: %w", err) } break } - if len(record.RawSample) > 0 { - _, events, handlerErr := observer.HandlePerfData(record.RawSample) - if handlerErr != nil { - errChan <- fmt.Errorf("error handling perfring data: %w", handlerErr) - break - } - err = loopEvents(events, eventFn, complChecker) - if err != nil { - errChan <- fmt.Errorf("error loop event function returned: %w", err) - break - } + if complChecker.Done() || ctxPerfRing.Err() != nil { + break } - if complChecker.Done() { - complChan <- true - break + if len(record.RawSample) > 0 { + eventsQueue <- &testEvent{&record.RawSample, complChecker, ctxPerfRing, cancelPerfRing} } } }() if useBPFRingBuffer { // Service the BPF ring buffer. + ctxBPFRing, cancelBPFRing := context.WithCancel(ctx) wg.Go(func() { complChecker := testsensor.NewCompletionChecker() - for ctx.Err() == nil { + for ctxBPFRing.Err() == nil { record, err := ringBufReader.Read() if err != nil { - if ctx.Err() == nil && !errors.Is(err, os.ErrClosed) { + if ctxBPFRing.Err() == nil && !errors.Is(err, os.ErrClosed) { errChan <- fmt.Errorf("error reading ringbuf data: %w", err) } break } - if len(record.RawSample) > 0 { - _, events, handlerErr := observer.HandlePerfData(record.RawSample) - if handlerErr != nil { - errChan <- fmt.Errorf("error handling ringbuf data: %w", handlerErr) - break - } - err = loopEvents(events, eventFn, complChecker) - if err != nil { - errChan <- fmt.Errorf("error loop event function returned: %w", err) - break - } + if complChecker.Done() || ctxBPFRing.Err() != nil { + break } - if complChecker.Done() { - complChan <- true - break + if len(record.RawSample) > 0 { + eventsQueue <- &testEvent{&record.RawSample, complChecker, ctxBPFRing, cancelBPFRing} } } }) @@ -189,6 +183,21 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted } for { select { + case event := <-eventsQueue: + _, events, handlerErr := observer.HandlePerfData(*event.RawSample) + if handlerErr != nil { + errChan <- fmt.Errorf("error handling ringbuf data: %w", handlerErr) + break + } + err = loopEvents(events, eventFn, event.ComplChecker) + if err != nil { + errChan <- fmt.Errorf("error loop event function returned: %w", err) + break + } + if event.ComplChecker.Done() && event.Ctx.Err() == nil { + event.CancelFunc() + complChan <- true + } case err := <-errChan: t.Fatal(err) case <-complChan: