@@ -227,3 +227,120 @@ func TestWatchDeadCancel(t *testing.T) {
227227 }
228228 })
229229}
230+
231+ //nolint:gocognit,gocyclo,cyclop
232+ func TestWatchKindStress (t * testing.T ) {
233+ t .Cleanup (func () { goleak .VerifyNone (t , goleak .IgnoreCurrent ()) })
234+
235+ withEtcd (t , func (s state.State ) {
236+ ctx , cancel := context .WithTimeout (context .Background (), 60 * time .Second )
237+ defer cancel ()
238+
239+ const (
240+ initialCreated = 100
241+ dummyWatches = 3
242+ iterations = 20
243+ numObjects = 100
244+ watchEventTimeout = 10 * time .Second
245+ )
246+
247+ for i := range initialCreated {
248+ require .NoError (t , s .Create (ctx , conformance .NewPathResource ("default" , fmt .Sprintf ("path-%d" , i ))))
249+ }
250+
251+ var watchChannels []chan state.Event
252+
253+ for iteration := range iterations {
254+ // test setup:
255+ // - on each iteration, set yet another watch with bootstrap contents
256+ // - the new watch should receive 'initialContents'
257+ // - add new watch to the list of watches
258+ // - issue create, update, and destroy for numObjects resources
259+ // - all watch channels should receive each update
260+ t .Logf ("iteration %d" , iteration )
261+
262+ dummyCh := make (chan state.Event )
263+
264+ // these watches never receive any events, as they use a different namespace
265+ for j := range dummyWatches {
266+ require .NoError (t , s .WatchKind (ctx , conformance .NewPathResource (fmt .Sprintf ("default-%d" , j + iteration * dummyWatches ), "" ).Metadata (), dummyCh , state .WithBootstrapContents (true )))
267+ }
268+
269+ watchCh := make (chan state.Event , numObjects )
270+
271+ require .NoError (t , s .WatchKind (ctx , conformance .NewPathResource ("default" , "" ).Metadata (), watchCh , state .WithBootstrapContents (true )))
272+
273+ for range initialCreated {
274+ select {
275+ case <- time .After (watchEventTimeout ):
276+ t .Fatal ("timeout waiting for event" )
277+ case ev := <- watchCh :
278+ assert .Equal (t , state .Created , ev .Type )
279+ }
280+ }
281+
282+ select {
283+ case <- time .After (watchEventTimeout ):
284+ t .Fatal ("timeout waiting for event" )
285+ case ev := <- watchCh :
286+ assert .Equal (t , state .Bootstrapped , ev .Type )
287+ }
288+
289+ watchChannels = append (watchChannels , watchCh )
290+
291+ for i := range numObjects {
292+ r := conformance .NewPathResource ("default" , fmt .Sprintf ("o-%d" , iteration * numObjects + i ))
293+
294+ // add some metadata to make the object bigger
295+ for j := range 5 {
296+ r .Metadata ().Labels ().Set (fmt .Sprintf ("label-%d" , j ), "prettybigvalueIwanttoputherejusttomakeitbig" )
297+ }
298+
299+ for j := range 5 {
300+ r .Metadata ().Finalizers ().Add (fmt .Sprintf ("finalizer-%d" , j ))
301+ }
302+
303+ require .NoError (t , s .Create (ctx , r ))
304+
305+ for j := range 5 {
306+ r .Metadata ().Finalizers ().Remove (fmt .Sprintf ("finalizer-%d" , j ))
307+ }
308+
309+ require .NoError (t , s .Update (ctx , r ))
310+ }
311+
312+ for i := range numObjects {
313+ require .NoError (t , s .Destroy (ctx , conformance .NewPathResource ("default" , fmt .Sprintf ("o-%d" , iteration * numObjects + i )).Metadata ()))
314+ }
315+
316+ for range numObjects {
317+ for _ , watchCh := range watchChannels {
318+ select {
319+ case <- time .After (watchEventTimeout ):
320+ t .Fatal ("timeout waiting for event" )
321+ case ev := <- watchCh :
322+ assert .Equal (t , state .Created , ev .Type )
323+ }
324+
325+ select {
326+ case <- time .After (watchEventTimeout ):
327+ t .Fatal ("timeout waiting for event" )
328+ case ev := <- watchCh :
329+ assert .Equal (t , state .Updated , ev .Type )
330+ }
331+ }
332+ }
333+
334+ for range numObjects {
335+ for _ , watchCh := range watchChannels {
336+ select {
337+ case <- time .After (watchEventTimeout ):
338+ t .Fatal ("timeout waiting for event" )
339+ case ev := <- watchCh :
340+ assert .Equal (t , state .Destroyed , ev .Type )
341+ }
342+ }
343+ }
344+ }
345+ })
346+ }
0 commit comments