Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/compute-domain-kubelet-plugin/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (d *driver) PrepareResourceClaims(ctx context.Context, claims []*resourceap

var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout)
workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())
workQueue := workqueue.New(workqueue.DefaultPrepUnprepRateLimiter())
results := make(map[types.UID]kubeletplugin.PrepareResult)

for _, claim := range claims {
Expand Down Expand Up @@ -188,7 +188,10 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele

var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout)
workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())

// Review: do we want to have a new queue per incoming Prepare/Unprepare
// request?
Copy link
Collaborator Author

@jgehrcke jgehrcke Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked this question here also for my future self. I probably know everything to answer that. Some thoughts:

If this plugin operates on multiple retry loops concurrently (which it does), it could make sense to use a a global retry rate limit as some kind of upper bound in case things go unexpectedly crazy. Currently, this is a noop:

&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(5), 10)},

workQueue := workqueue.New(workqueue.DefaultPrepUnprepRateLimiter())
results := make(map[types.UID]error)

for _, claim := range claimRefs {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/spf13/pflag v1.0.10
github.com/stretchr/testify v1.11.1
github.com/urfave/cli/v2 v2.27.7
golang.org/x/time v0.9.0
google.golang.org/grpc v1.75.1
k8s.io/api v0.34.0
k8s.io/apimachinery v0.34.0
Expand Down Expand Up @@ -80,7 +81,6 @@ require (
golang.org/x/sys v0.36.0 // indirect
golang.org/x/term v0.34.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
Expand Down
20 changes: 20 additions & 0 deletions pkg/workqueue/workqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"fmt"
"sync"
"time"

"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
Expand All @@ -38,6 +40,24 @@ type WorkItem struct {
Callback func(ctx context.Context, obj any) error
}

// Return composite rate limiter that combines both per-item exponential backoff
// and an overall token bucket rate-limiting strategy. It calculates the
// exponential backoff for the individual item (based on its personal retry
// history), checks the global rate against the token bucket, and picks the
// longest delay from either strategy, ensuring that both per-item and overall
// queue health are respected.
func DefaultPrepUnprepRateLimiter() workqueue.TypedRateLimiter[any] {
Copy link
Collaborator Author

@jgehrcke jgehrcke Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review feedback was

Can we call this DefaultKubeletPluginRateLimiter to be symmetrical with DefaultControllerRateLimiter

(from #633 (comment))

This is not necessarily for all retrying work in the kubelet plugins.

This is supposed to be specific for retrying Prepare and Unprepare requests.

The context-specific choice of retrying methodology is kinda the point of this patch, and hence I'd like the name to also reflect that.

So, I am not unhappy with the name DefaultPrepUnprepRateLimiter. Because it's supposed to be the default rate limiter for prep and unprep work! :)

In the future, we may have different retrying parameters for e.g. each of these:

  • preparing GPU
  • preparing CD
  • unpreparing CD
  • unpreparing GPU (mps/mig variants..)
  • ...

The retrying parameters will become highly relevant when we measure overall duration/latencies, and generally one-size-fits-all doesn't quite work.

(I really like the idea of tuning time constants over time! in software -- it's a sign of growing maturity and clearer goals)

return workqueue.NewTypedMaxOfRateLimiter(
// This is a per-item exponential backoff limiter. Each time an item
// fails and is retried, the delay grows exponentially starting from the
// lower value up to the upper bound.
workqueue.NewTypedItemExponentialFailureRateLimiter[any](250*time.Millisecond, 3000*time.Second),
// Global (not per-item) rate limiter. Allows up to 5 retries per
// second, with bursts of up to 10.
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(5), 10)},
)
}

func DefaultControllerRateLimiter() workqueue.TypedRateLimiter[any] {
return workqueue.DefaultTypedControllerRateLimiter[any]()
}
Expand Down