From 7e18c334086592c6899ec22ab043115cbe25fd5d Mon Sep 17 00:00:00 2001 From: "Dr. Jan-Philip Gehrcke" Date: Tue, 30 Sep 2025 12:18:41 +0000 Subject: [PATCH 1/2] Introduce DefaultPrepUnprepRateLimiter (less aggressive) Signed-off-by: Dr. Jan-Philip Gehrcke --- cmd/compute-domain-kubelet-plugin/driver.go | 7 +++++-- pkg/workqueue/workqueue.go | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/cmd/compute-domain-kubelet-plugin/driver.go b/cmd/compute-domain-kubelet-plugin/driver.go index 72e5cbda6..d2a846ecd 100644 --- a/cmd/compute-domain-kubelet-plugin/driver.go +++ b/cmd/compute-domain-kubelet-plugin/driver.go @@ -157,7 +157,7 @@ func (d *driver) PrepareResourceClaims(ctx context.Context, claims []*resourceap var wg sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout) - workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter()) + workQueue := workqueue.New(workqueue.DefaultPrepUnprepRateLimiter()) results := make(map[types.UID]kubeletplugin.PrepareResult) for _, claim := range claims { @@ -188,7 +188,10 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele var wg sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout) - workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter()) + + // Review: do we want to have a new queue per incoming Prepare/Unprepare + // request? + workQueue := workqueue.New(workqueue.DefaultPrepUnprepRateLimiter()) results := make(map[types.UID]error) for _, claim := range claimRefs { diff --git a/pkg/workqueue/workqueue.go b/pkg/workqueue/workqueue.go index bee84f03b..3c9f25e98 100644 --- a/pkg/workqueue/workqueue.go +++ b/pkg/workqueue/workqueue.go @@ -20,7 +20,9 @@ import ( "context" "fmt" "sync" + "time" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -38,6 +40,24 @@ type WorkItem struct { Callback func(ctx context.Context, obj any) error } +// Return composite rate limiter that combines both per-item exponential backoff +// and an overall token bucket rate-limiting strategy. It calculates the +// exponential backoff for the individual item (based on its personal retry +// history), checks the global rate against the token bucket, and picks the +// longest delay from either strategy, ensuring that both per-item and overall +// queue health are respected. +func DefaultPrepUnprepRateLimiter() workqueue.TypedRateLimiter[any] { + return workqueue.NewTypedMaxOfRateLimiter( + // This is a per-item exponential backoff limiter. Each time an item + // fails and is retried, the delay grows exponentially starting from the + // lower value up to the upper bound. + workqueue.NewTypedItemExponentialFailureRateLimiter[any](250*time.Millisecond, 3000*time.Second), + // Global (not per-item) rate limiter. Allows up to 5 retries per + // second, with bursts of up to 10. + &workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(5), 10)}, + ) +} + func DefaultControllerRateLimiter() workqueue.TypedRateLimiter[any] { return workqueue.DefaultTypedControllerRateLimiter[any]() } From d34a12f7aea43ba9142758d539bee92cbafc1e62 Mon Sep 17 00:00:00 2001 From: "Dr. Jan-Philip Gehrcke" Date: Wed, 8 Oct 2025 15:18:53 +0200 Subject: [PATCH 2/2] Adjust go.mod to recent changes Signed-off-by: Dr. Jan-Philip Gehrcke --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 85982ccb8..04a5359f7 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/spf13/pflag v1.0.10 github.com/stretchr/testify v1.11.1 github.com/urfave/cli/v2 v2.27.7 + golang.org/x/time v0.9.0 google.golang.org/grpc v1.75.1 k8s.io/api v0.34.0 k8s.io/apimachinery v0.34.0 @@ -80,7 +81,6 @@ require ( golang.org/x/sys v0.36.0 // indirect golang.org/x/term v0.34.0 // indirect golang.org/x/text v0.28.0 // indirect - golang.org/x/time v0.9.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect google.golang.org/protobuf v1.36.8 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect