@@ -100,85 +100,79 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted
100100
101101 wgStarted .Done ()
102102
103- errChan := make (chan error )
103+ errChan := make (chan error , 2 )
104104 defer close (errChan )
105105
106- complChan := make (chan bool )
106+ complChan := make (chan bool , 2 )
107107 defer close (complChan )
108108
109+ // Create an events queue that both the ring buffers can send to.
110+ // This means we can handle the events sequentially by reading from the
111+ // queue, avoiding issues of handling events concurrently.
112+ type testEvent struct {
113+ RawSample * []byte
114+ ComplChecker * testsensor.CompletionChecker
115+ Ctx context.Context
116+ CancelFunc context.CancelFunc
117+ }
118+
119+ eventsQueue := make (chan * testEvent , 65536 ) // arbitrary size for tests
120+
109121 var wg sync.WaitGroup
110122 wg .Add (1 )
123+ ctxPerfRing , cancelPerfRing := context .WithCancel (ctx )
111124 defer wg .Wait ()
112125
113126 go func () {
114127 defer wg .Done ()
115128
116129 complChecker := testsensor .NewCompletionChecker ()
117130
118- for ctx .Err () == nil {
131+ for ctxPerfRing .Err () == nil {
119132
120133 record , err := perfReader .Read ()
121134 if err != nil {
122- if ctx .Err () == nil && ! errors .Is (err , os .ErrClosed ) {
135+ if ctxPerfRing .Err () == nil && ! errors .Is (err , os .ErrClosed ) {
123136 errChan <- fmt .Errorf ("error reading perfring data: %w" , err )
124137 }
125138 break
126139 }
127140
128- if len (record .RawSample ) > 0 {
129- _ , events , handlerErr := observer .HandlePerfData (record .RawSample )
130- if handlerErr != nil {
131- errChan <- fmt .Errorf ("error handling perfring data: %w" , handlerErr )
132- break
133- }
134- err = loopEvents (events , eventFn , complChecker )
135- if err != nil {
136- errChan <- fmt .Errorf ("error loop event function returned: %w" , err )
137- break
138- }
141+ if complChecker .Done () || ctxPerfRing .Err () != nil {
142+ break
139143 }
140144
141- if complChecker .Done () {
142- complChan <- true
143- break
145+ if len (record .RawSample ) > 0 {
146+ eventsQueue <- & testEvent {& record .RawSample , complChecker , ctxPerfRing , cancelPerfRing }
144147 }
145148 }
146149 }()
147150
148151 if useBPFRingBuffer {
149152 // Service the BPF ring buffer.
150153 wg .Add (1 )
154+ ctxBPFRing , cancelBPFRing := context .WithCancel (ctx )
151155 go func () {
152156 defer wg .Done ()
153157
154158 complChecker := testsensor .NewCompletionChecker ()
155159
156- for ctx .Err () == nil {
160+ for ctxBPFRing .Err () == nil {
157161
158162 record , err := ringBufReader .Read ()
159163 if err != nil {
160- if ctx .Err () == nil && ! errors .Is (err , os .ErrClosed ) {
164+ if ctxBPFRing .Err () == nil && ! errors .Is (err , os .ErrClosed ) {
161165 errChan <- fmt .Errorf ("error reading ringbuf data: %w" , err )
162166 }
163167 break
164168 }
165169
166- if len (record .RawSample ) > 0 {
167- _ , events , handlerErr := observer .HandlePerfData (record .RawSample )
168- if handlerErr != nil {
169- errChan <- fmt .Errorf ("error handling ringbuf data: %w" , handlerErr )
170- break
171- }
172- err = loopEvents (events , eventFn , complChecker )
173- if err != nil {
174- errChan <- fmt .Errorf ("error loop event function returned: %w" , err )
175- break
176- }
170+ if complChecker .Done () || ctxBPFRing .Err () != nil {
171+ break
177172 }
178173
179- if complChecker .Done () {
180- complChan <- true
181- break
174+ if len (record .RawSample ) > 0 {
175+ eventsQueue <- & testEvent {& record .RawSample , complChecker , ctxBPFRing , cancelBPFRing }
182176 }
183177 }
184178 }()
@@ -191,6 +185,21 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted
191185 }
192186 for {
193187 select {
188+ case event := <- eventsQueue :
189+ _ , events , handlerErr := observer .HandlePerfData (* event .RawSample )
190+ if handlerErr != nil {
191+ errChan <- fmt .Errorf ("error handling ringbuf data: %w" , handlerErr )
192+ break
193+ }
194+ err = loopEvents (events , eventFn , event .ComplChecker )
195+ if err != nil {
196+ errChan <- fmt .Errorf ("error loop event function returned: %w" , err )
197+ break
198+ }
199+ if event .ComplChecker .Done () && event .Ctx .Err () == nil {
200+ event .CancelFunc ()
201+ complChan <- true
202+ }
194203 case err := <- errChan :
195204 t .Fatal (err )
196205 case <- complChan :
0 commit comments