diff --git a/cmd/compute-domain-kubelet-plugin/driver.go b/cmd/compute-domain-kubelet-plugin/driver.go index 72e5cbda6..d2a846ecd 100644 --- a/cmd/compute-domain-kubelet-plugin/driver.go +++ b/cmd/compute-domain-kubelet-plugin/driver.go @@ -157,7 +157,7 @@ func (d *driver) PrepareResourceClaims(ctx context.Context, claims []*resourceap var wg sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout) - workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter()) + workQueue := workqueue.New(workqueue.DefaultPrepUnprepRateLimiter()) results := make(map[types.UID]kubeletplugin.PrepareResult) for _, claim := range claims { @@ -188,7 +188,10 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele var wg sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout) - workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter()) + + // Review: do we want to have a new queue per incoming Prepare/Unprepare + // request? + workQueue := workqueue.New(workqueue.DefaultPrepUnprepRateLimiter()) results := make(map[types.UID]error) for _, claim := range claimRefs { diff --git a/go.mod b/go.mod index 85982ccb8..04a5359f7 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/spf13/pflag v1.0.10 github.com/stretchr/testify v1.11.1 github.com/urfave/cli/v2 v2.27.7 + golang.org/x/time v0.9.0 google.golang.org/grpc v1.75.1 k8s.io/api v0.34.0 k8s.io/apimachinery v0.34.0 @@ -80,7 +81,6 @@ require ( golang.org/x/sys v0.36.0 // indirect golang.org/x/term v0.34.0 // indirect golang.org/x/text v0.28.0 // indirect - golang.org/x/time v0.9.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect google.golang.org/protobuf v1.36.8 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect diff --git a/pkg/workqueue/workqueue.go b/pkg/workqueue/workqueue.go index bee84f03b..3c9f25e98 100644 --- a/pkg/workqueue/workqueue.go +++ b/pkg/workqueue/workqueue.go @@ -20,7 +20,9 @@ import ( "context" "fmt" "sync" + "time" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -38,6 +40,24 @@ type WorkItem struct { Callback func(ctx context.Context, obj any) error } +// Return composite rate limiter that combines both per-item exponential backoff +// and an overall token bucket rate-limiting strategy. It calculates the +// exponential backoff for the individual item (based on its personal retry +// history), checks the global rate against the token bucket, and picks the +// longest delay from either strategy, ensuring that both per-item and overall +// queue health are respected. +func DefaultPrepUnprepRateLimiter() workqueue.TypedRateLimiter[any] { + return workqueue.NewTypedMaxOfRateLimiter( + // This is a per-item exponential backoff limiter. Each time an item + // fails and is retried, the delay grows exponentially starting from the + // lower value up to the upper bound. + workqueue.NewTypedItemExponentialFailureRateLimiter[any](250*time.Millisecond, 3000*time.Second), + // Global (not per-item) rate limiter. Allows up to 5 retries per + // second, with bursts of up to 10. + &workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(5), 10)}, + ) +} + func DefaultControllerRateLimiter() workqueue.TypedRateLimiter[any] { return workqueue.DefaultTypedControllerRateLimiter[any]() }