@@ -16,6 +16,7 @@ package worker
1616import (
1717 "context"
1818 "fmt"
19+ "sync"
1920 "testing"
2021 "time"
2122
@@ -34,15 +35,18 @@ var (
3435 maxRequeue = 3
3536)
3637
38+ var mu sync.RWMutex
39+
3740func GetMockWorkerPool (ctx context.Context ) Worker {
3841 log := zap .New (zap .UseDevMode (true )).WithValues ("worker resource Id" , resourceName )
3942 return NewDefaultWorkerPool (resourceName , workerCount , maxRequeue , log , ctx )
4043}
4144
4245func MockWorkerFunc (job interface {}) (result ctrl.Result , err error ) {
46+ mu .Lock ()
47+ defer mu .Unlock ()
4348 v := job .(* int )
4449 * v ++
45- time .Sleep (time .Millisecond * mockTimeToProcessWorkerFunc )
4650
4751 return ctrl.Result {}, nil
4852}
@@ -75,15 +79,20 @@ func TestWorker_SubmitJob(t *testing.T) {
7579 time .Sleep (time .Millisecond * (mockTimeToProcessWorkerFunc + bufferTimeBwWorkerFuncExecution ) * time .Duration (jobCount ))
7680
7781 // Verify job completed.
78- assert .Equal (t , job1 , 1 )
79- assert .Equal (t , job2 , 1 )
82+ mu .RLock ()
83+ defer mu .RUnlock ()
84+ for _ , j := range []int {job1 , job2 } {
85+ assert .Equal (t , j , 1 )
86+ }
8087}
8188
8289func TestWorker_SubmitJob_RequeueOnError (t * testing.T ) {
8390 ctx , cancel := context .WithCancel (context .Background ())
8491 defer cancel ()
8592
8693 workerFunc := func (job interface {}) (result ctrl.Result , err error ) {
94+ mu .Lock ()
95+ defer mu .Unlock ()
8796 invoked := job .(* int )
8897 * invoked ++
8998
@@ -100,14 +109,18 @@ func TestWorker_SubmitJob_RequeueOnError(t *testing.T) {
100109 time .Sleep ((mockTimeToProcessWorkerFunc + bufferTimeBwWorkerFuncExecution ) * time .Millisecond * time .Duration (maxRequeue ))
101110
102111 // expected invocation = max requeue + the first invocation
112+ mu .RLock ()
103113 assert .Equal (t , maxRequeue + 1 , invoked )
114+ mu .RUnlock ()
104115}
105116
106117func TestWorker_SubmitJob_NotRequeueOnError (t * testing.T ) {
107118 ctx , cancel := context .WithCancel (context .Background ())
108119 defer cancel ()
109120
110121 workerFunc := func (job interface {}) (result ctrl.Result , err error ) {
122+ mu .Lock ()
123+ defer mu .Unlock ()
111124 invoked := job .(* int )
112125 * invoked ++
113126
@@ -127,5 +140,7 @@ func TestWorker_SubmitJob_NotRequeueOnError(t *testing.T) {
127140 actualInqueue := 1
128141 // invoked should be only incremented once
129142 assert .NotEqual (t , maxRequeue , actualInqueue )
143+ mu .RLock ()
130144 assert .Equal (t , actualInqueue , invoked )
145+ mu .RUnlock ()
131146}
0 commit comments