Skip to content

Commit 482000d

Browse files
committed
Add a workqueue abstraction for processing objects pulled from informers
Signed-off-by: Kevin Klues <[email protected]>
1 parent 6ff696a commit 482000d

File tree

1 file changed

+102
-0
lines changed

1 file changed

+102
-0
lines changed

pkg/workqueue/workqueue.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright (c) 2025 NVIDIA CORPORATION. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package workqueue
18+
19+
import (
20+
"fmt"
21+
22+
"k8s.io/apimachinery/pkg/runtime"
23+
"k8s.io/client-go/util/workqueue"
24+
"k8s.io/klog/v2"
25+
)
26+
27+
type WorkQueue struct {
28+
queue workqueue.TypedRateLimitingInterface[any]
29+
}
30+
31+
type WorkItem struct {
32+
Object any
33+
Callback func(obj any) error
34+
}
35+
36+
func DefaultControllerRateLimiter() workqueue.TypedRateLimiter[any] {
37+
return workqueue.DefaultTypedControllerRateLimiter[any]()
38+
}
39+
40+
func New(r workqueue.TypedRateLimiter[any]) *WorkQueue {
41+
queue := workqueue.NewTypedRateLimitingQueue(r)
42+
return &WorkQueue{queue}
43+
}
44+
45+
func (q *WorkQueue) Run(done <-chan struct{}) {
46+
go func() {
47+
<-done
48+
q.queue.ShutDown()
49+
}()
50+
for {
51+
select {
52+
case <-done:
53+
return
54+
default:
55+
q.processNextWorkItem()
56+
}
57+
}
58+
}
59+
60+
func (q *WorkQueue) Enqueue(obj any, callback func(obj any) error) {
61+
runtimeObj, ok := obj.(runtime.Object)
62+
if !ok {
63+
klog.Warningf("unexpected object type %T: runtime.Object required", obj)
64+
return
65+
}
66+
67+
workItem := &WorkItem{
68+
Object: runtimeObj.DeepCopyObject(),
69+
Callback: callback,
70+
}
71+
72+
q.queue.AddRateLimited(workItem)
73+
}
74+
75+
func (q *WorkQueue) processNextWorkItem() {
76+
item, shutdown := q.queue.Get()
77+
if shutdown {
78+
return
79+
}
80+
defer q.queue.Done(item)
81+
82+
workItem, ok := item.(*WorkItem)
83+
if !ok {
84+
klog.Errorf("Unexpected item in queue: %v", item)
85+
return
86+
}
87+
88+
err := q.reconcile(workItem)
89+
if err != nil {
90+
klog.Errorf("Failed to reconcile work item %v: %v", workItem.Object, err)
91+
q.queue.AddRateLimited(workItem)
92+
} else {
93+
q.queue.Forget(workItem)
94+
}
95+
}
96+
97+
func (q *WorkQueue) reconcile(workItem *WorkItem) error {
98+
if workItem.Callback == nil {
99+
return fmt.Errorf("no callback to process work item: %+v", workItem)
100+
}
101+
return workItem.Callback(workItem.Object)
102+
}

0 commit comments

Comments
 (0)