@@ -100,83 +100,77 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted
100100
101101 wgStarted .Done ()
102102
103- errChan := make (chan error )
103+ errChan := make (chan error , 2 )
104104 defer close (errChan )
105105
106- complChan := make (chan bool )
106+ complChan := make (chan bool , 2 )
107107 defer close (complChan )
108108
109+ // Create an events queue that both the ring buffers can send to.
110+ // This means we can handle the events sequentially by reading from the
111+ // queue, avoiding issues of handling events concurrently.
112+ type testEvent struct {
113+ RawSample * []byte
114+ ComplChecker * testsensor.CompletionChecker
115+ Ctx context.Context
116+ CancelFunc context.CancelFunc
117+ }
118+
119+ eventsQueue := make (chan * testEvent , 65536 ) // arbitrary size for tests
120+
109121 var wg sync.WaitGroup
110122 wg .Add (1 )
123+ ctxPerfRing , cancelPerfRing := context .WithCancel (ctx )
111124 defer wg .Wait ()
112125
113126 go func () {
114127 defer wg .Done ()
115128
116129 complChecker := testsensor .NewCompletionChecker ()
117130
118- for ctx .Err () == nil {
131+ for ctxPerfRing .Err () == nil {
119132
120133 record , err := perfReader .Read ()
121134 if err != nil {
122- if ctx .Err () == nil && ! errors .Is (err , os .ErrClosed ) {
135+ if ctxPerfRing .Err () == nil && ! errors .Is (err , os .ErrClosed ) {
123136 errChan <- fmt .Errorf ("error reading perfring data: %w" , err )
124137 }
125138 break
126139 }
127140
128- if len (record .RawSample ) > 0 {
129- _ , events , handlerErr := observer .HandlePerfData (record .RawSample )
130- if handlerErr != nil {
131- errChan <- fmt .Errorf ("error handling perfring data: %w" , handlerErr )
132- break
133- }
134- err = loopEvents (events , eventFn , complChecker )
135- if err != nil {
136- errChan <- fmt .Errorf ("error loop event function returned: %w" , err )
137- break
138- }
141+ if complChecker .Done () || ctxPerfRing .Err () != nil {
142+ break
139143 }
140144
141- if complChecker .Done () {
142- complChan <- true
143- break
145+ if len (record .RawSample ) > 0 {
146+ eventsQueue <- & testEvent {& record .RawSample , complChecker , ctxPerfRing , cancelPerfRing }
144147 }
145148 }
146149 }()
147150
148151 if useBPFRingBuffer {
149152 // Service the BPF ring buffer.
153+ ctxBPFRing , cancelBPFRing := context .WithCancel (ctx )
150154 wg .Go (func () {
151155
152156 complChecker := testsensor .NewCompletionChecker ()
153157
154- for ctx .Err () == nil {
158+ for ctxBPFRing .Err () == nil {
155159
156160 record , err := ringBufReader .Read ()
157161 if err != nil {
158- if ctx .Err () == nil && ! errors .Is (err , os .ErrClosed ) {
162+ if ctxBPFRing .Err () == nil && ! errors .Is (err , os .ErrClosed ) {
159163 errChan <- fmt .Errorf ("error reading ringbuf data: %w" , err )
160164 }
161165 break
162166 }
163167
164- if len (record .RawSample ) > 0 {
165- _ , events , handlerErr := observer .HandlePerfData (record .RawSample )
166- if handlerErr != nil {
167- errChan <- fmt .Errorf ("error handling ringbuf data: %w" , handlerErr )
168- break
169- }
170- err = loopEvents (events , eventFn , complChecker )
171- if err != nil {
172- errChan <- fmt .Errorf ("error loop event function returned: %w" , err )
173- break
174- }
168+ if complChecker .Done () || ctxBPFRing .Err () != nil {
169+ break
175170 }
176171
177- if complChecker .Done () {
178- complChan <- true
179- break
172+ if len (record .RawSample ) > 0 {
173+ eventsQueue <- & testEvent {& record .RawSample , complChecker , ctxBPFRing , cancelBPFRing }
180174 }
181175 }
182176 })
@@ -189,6 +183,21 @@ func ProcessEvents(t *testing.T, ctx context.Context, eventFn EventFn, wgStarted
189183 }
190184 for {
191185 select {
186+ case event := <- eventsQueue :
187+ _ , events , handlerErr := observer .HandlePerfData (* event .RawSample )
188+ if handlerErr != nil {
189+ errChan <- fmt .Errorf ("error handling ringbuf data: %w" , handlerErr )
190+ break
191+ }
192+ err = loopEvents (events , eventFn , event .ComplChecker )
193+ if err != nil {
194+ errChan <- fmt .Errorf ("error loop event function returned: %w" , err )
195+ break
196+ }
197+ if event .ComplChecker .Done () && event .Ctx .Err () == nil {
198+ event .CancelFunc ()
199+ complChan <- true
200+ }
192201 case err := <- errChan :
193202 t .Fatal (err )
194203 case <- complChan :
0 commit comments