Skip to content

Commit 7e18c33

Browse files
committed
Introduce DefaultPrepUnprepRateLimiter (less aggressive)
Signed-off-by: Dr. Jan-Philip Gehrcke <[email protected]>
1 parent 2b7e899 commit 7e18c33

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

cmd/compute-domain-kubelet-plugin/driver.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (d *driver) PrepareResourceClaims(ctx context.Context, claims []*resourceap
157157

158158
var wg sync.WaitGroup
159159
ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout)
160-
workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())
160+
workQueue := workqueue.New(workqueue.DefaultPrepUnprepRateLimiter())
161161
results := make(map[types.UID]kubeletplugin.PrepareResult)
162162

163163
for _, claim := range claims {
@@ -188,7 +188,10 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele
188188

189189
var wg sync.WaitGroup
190190
ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout)
191-
workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())
191+
192+
// Review: do we want to have a new queue per incoming Prepare/Unprepare
193+
// request?
194+
workQueue := workqueue.New(workqueue.DefaultPrepUnprepRateLimiter())
192195
results := make(map[types.UID]error)
193196

194197
for _, claim := range claimRefs {

pkg/workqueue/workqueue.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"time"
2324

25+
"golang.org/x/time/rate"
2426
"k8s.io/apimachinery/pkg/runtime"
2527
"k8s.io/client-go/util/workqueue"
2628
"k8s.io/klog/v2"
@@ -38,6 +40,24 @@ type WorkItem struct {
3840
Callback func(ctx context.Context, obj any) error
3941
}
4042

43+
// Return composite rate limiter that combines both per-item exponential backoff
44+
// and an overall token bucket rate-limiting strategy. It calculates the
45+
// exponential backoff for the individual item (based on its personal retry
46+
// history), checks the global rate against the token bucket, and picks the
47+
// longest delay from either strategy, ensuring that both per-item and overall
48+
// queue health are respected.
49+
func DefaultPrepUnprepRateLimiter() workqueue.TypedRateLimiter[any] {
50+
return workqueue.NewTypedMaxOfRateLimiter(
51+
// This is a per-item exponential backoff limiter. Each time an item
52+
// fails and is retried, the delay grows exponentially starting from the
53+
// lower value up to the upper bound.
54+
workqueue.NewTypedItemExponentialFailureRateLimiter[any](250*time.Millisecond, 3000*time.Second),
55+
// Global (not per-item) rate limiter. Allows up to 5 retries per
56+
// second, with bursts of up to 10.
57+
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(5), 10)},
58+
)
59+
}
60+
4161
func DefaultControllerRateLimiter() workqueue.TypedRateLimiter[any] {
4262
return workqueue.DefaultTypedControllerRateLimiter[any]()
4363
}

0 commit comments

Comments
 (0)