Skip to content

Commit 14dc9fe

Browse files
authored
Merge pull request #656 from jgehrcke/jp/custom-rate-limiting
Introduce DefaultPrepUnprepRateLimiter (less aggressive)
2 parents 8788dd1 + d34a12f commit 14dc9fe

File tree

3 files changed

+26
-3
lines changed

3 files changed

+26
-3
lines changed

cmd/compute-domain-kubelet-plugin/driver.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (d *driver) PrepareResourceClaims(ctx context.Context, claims []*resourceap
157157

158158
var wg sync.WaitGroup
159159
ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout)
160-
workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())
160+
workQueue := workqueue.New(workqueue.DefaultPrepUnprepRateLimiter())
161161
results := make(map[types.UID]kubeletplugin.PrepareResult)
162162

163163
for _, claim := range claims {
@@ -188,7 +188,10 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele
188188

189189
var wg sync.WaitGroup
190190
ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout)
191-
workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())
191+
192+
// Review: do we want to have a new queue per incoming Prepare/Unprepare
193+
// request?
194+
workQueue := workqueue.New(workqueue.DefaultPrepUnprepRateLimiter())
192195
results := make(map[types.UID]error)
193196

194197
for _, claim := range claimRefs {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/spf13/pflag v1.0.10
1616
github.com/stretchr/testify v1.11.1
1717
github.com/urfave/cli/v2 v2.27.7
18+
golang.org/x/time v0.9.0
1819
google.golang.org/grpc v1.75.1
1920
k8s.io/api v0.34.0
2021
k8s.io/apimachinery v0.34.0
@@ -80,7 +81,6 @@ require (
8081
golang.org/x/sys v0.36.0 // indirect
8182
golang.org/x/term v0.34.0 // indirect
8283
golang.org/x/text v0.28.0 // indirect
83-
golang.org/x/time v0.9.0 // indirect
8484
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
8585
google.golang.org/protobuf v1.36.8 // indirect
8686
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect

pkg/workqueue/workqueue.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"time"
2324

25+
"golang.org/x/time/rate"
2426
"k8s.io/apimachinery/pkg/runtime"
2527
"k8s.io/client-go/util/workqueue"
2628
"k8s.io/klog/v2"
@@ -38,6 +40,24 @@ type WorkItem struct {
3840
Callback func(ctx context.Context, obj any) error
3941
}
4042

43+
// Return composite rate limiter that combines both per-item exponential backoff
44+
// and an overall token bucket rate-limiting strategy. It calculates the
45+
// exponential backoff for the individual item (based on its personal retry
46+
// history), checks the global rate against the token bucket, and picks the
47+
// longest delay from either strategy, ensuring that both per-item and overall
48+
// queue health are respected.
49+
func DefaultPrepUnprepRateLimiter() workqueue.TypedRateLimiter[any] {
50+
return workqueue.NewTypedMaxOfRateLimiter(
51+
// This is a per-item exponential backoff limiter. Each time an item
52+
// fails and is retried, the delay grows exponentially starting from the
53+
// lower value up to the upper bound.
54+
workqueue.NewTypedItemExponentialFailureRateLimiter[any](250*time.Millisecond, 3000*time.Second),
55+
// Global (not per-item) rate limiter. Allows up to 5 retries per
56+
// second, with bursts of up to 10.
57+
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(5), 10)},
58+
)
59+
}
60+
4161
func DefaultControllerRateLimiter() workqueue.TypedRateLimiter[any] {
4262
return workqueue.DefaultTypedControllerRateLimiter[any]()
4363
}

0 commit comments

Comments
 (0)