Skip to content

Commit 3ac1786

Browse files
committed
fix priority queue
Signed-off-by: dongjiang <[email protected]>
1 parent b9a9ca0 commit 3ac1786

File tree

2 files changed

+27
-0
lines changed

2 files changed

+27
-0
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,13 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
277277
w.waiters.Add(1)
278278

279279
w.notifyItemOrWaiterAdded()
280+
281+
// ref: https://github.com/kubernetes-sigs/controller-runtime/issues/3239
282+
if w.shutdown.Load() == true {
283+
var zero T
284+
return zero, 0, true
285+
}
286+
280287
item := <-w.get
281288

282289
return item.Key, item.Priority, w.shutdown.Load()

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,26 @@ var _ = Describe("Controllerworkqueue", func() {
300300
Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 2}))
301301
})
302302

303+
// ref: https://github.com/kubernetes-sigs/controller-runtime/issues/3239
304+
It("Get from priority queue might get stuck when the priority queue is shut down", func() {
305+
q, _ := newQueue()
306+
307+
q.Add("baz")
308+
// shut down
309+
q.ShutDown()
310+
q.AddWithOpts(AddOpts{After: time.Second}, "foo")
311+
312+
item, priority, isShutDown := q.GetWithPriority()
313+
Expect(item).To(Equal(""))
314+
Expect(priority).To(Equal(0))
315+
Expect(isShutDown).To(Equal(true))
316+
317+
item1, priority1, isShutDown := q.GetWithPriority()
318+
Expect(item1).To(Equal(""))
319+
Expect(priority1).To(Equal(0))
320+
Expect(isShutDown).To(Equal(true))
321+
})
322+
303323
It("items are included in Len() and the queueDepth metric once they are ready", func() {
304324
q, metrics := newQueue()
305325
defer q.ShutDown()

0 commit comments

Comments
 (0)