diff --git a/pkg/testutils/perfring/perfring.go b/pkg/testutils/perfring/perfring.go index aedbe7b0e1d..1cde45d7202 100644 --- a/pkg/testutils/perfring/perfring.go +++ b/pkg/testutils/perfring/perfring.go @@ -100,14 +100,27 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted wgStarted.Done() - errChan := make(chan error) + errChan := make(chan error, 2) defer close(errChan) - complChan := make(chan bool) + complChan := make(chan bool, 2) defer close(complChan) + // Create an events queue that both the ring buffers can send to. + // This means we can handle the events sequentially by reading from the + // queue, avoiding issues of handling events concurrently. + type testEvent struct { + RawSample *[]byte + ComplChecker *testsensor.CompletionChecker + Ctx context.Context + CancelFunc context.CancelFunc + } + + eventsQueue := make(chan *testEvent, 65536) // arbitrary size for tests + var wg sync.WaitGroup wg.Add(1) + ctxPerfRing, cancelPerfRing := context.WithCancel(ctx) defer wg.Wait() go func() { @@ -115,68 +128,49 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted complChecker := testsensor.NewCompletionChecker() - for ctx.Err() == nil { + for ctxPerfRing.Err() == nil { record, err := perfReader.Read() if err != nil { - if ctx.Err() == nil && !errors.Is(err, os.ErrClosed) { + if ctxPerfRing.Err() == nil && !errors.Is(err, os.ErrClosed) { errChan <- fmt.Errorf("error reading perfring data: %w", err) } break } - if len(record.RawSample) > 0 { - _, events, handlerErr := observer.HandlePerfData(record.RawSample) - if handlerErr != nil { - errChan <- fmt.Errorf("error handling perfring data: %w", handlerErr) - break - } - err = loopEvents(events, eventFn, complChecker) - if err != nil { - errChan <- fmt.Errorf("error loop event function returned: %w", err) - break - } + if complChecker.Done() || ctxPerfRing.Err() != nil { + break } - if complChecker.Done() { - complChan <- true - break + if len(record.RawSample) > 0 { + eventsQueue <- &testEvent{&record.RawSample, complChecker, ctxPerfRing, cancelPerfRing} } } }() if useBPFRingBuffer { // Service the BPF ring buffer. + ctxBPFRing, cancelBPFRing := context.WithCancel(ctx) wg.Go(func() { complChecker := testsensor.NewCompletionChecker() - for ctx.Err() == nil { + for ctxBPFRing.Err() == nil { record, err := ringBufReader.Read() if err != nil { - if ctx.Err() == nil && !errors.Is(err, os.ErrClosed) { + if ctxBPFRing.Err() == nil && !errors.Is(err, os.ErrClosed) { errChan <- fmt.Errorf("error reading ringbuf data: %w", err) } break } - if len(record.RawSample) > 0 { - _, events, handlerErr := observer.HandlePerfData(record.RawSample) - if handlerErr != nil { - errChan <- fmt.Errorf("error handling ringbuf data: %w", handlerErr) - break - } - err = loopEvents(events, eventFn, complChecker) - if err != nil { - errChan <- fmt.Errorf("error loop event function returned: %w", err) - break - } + if complChecker.Done() || ctxBPFRing.Err() != nil { + break } - if complChecker.Done() { - complChan <- true - break + if len(record.RawSample) > 0 { + eventsQueue <- &testEvent{&record.RawSample, complChecker, ctxBPFRing, cancelBPFRing} } } }) @@ -189,6 +183,21 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted } for { select { + case event := <-eventsQueue: + _, events, handlerErr := observer.HandlePerfData(*event.RawSample) + if handlerErr != nil { + errChan <- fmt.Errorf("error handling ringbuf data: %w", handlerErr) + break + } + err = loopEvents(events, eventFn, event.ComplChecker) + if err != nil { + errChan <- fmt.Errorf("error loop event function returned: %w", err) + break + } + if event.ComplChecker.Done() && event.Ctx.Err() == nil { + event.CancelFunc() + complChan <- true + } case err := <-errChan: t.Fatal(err) case <-complChan: