Skip to content

Commit b68c3af

Browse files
committed
Replace in-mem (un)prep lock with file-based lock
Signed-off-by: Dr. Jan-Philip Gehrcke <[email protected]>
1 parent 9732ac3 commit b68c3af

File tree

3 files changed

+183
-14
lines changed

3 files changed

+183
-14
lines changed

cmd/compute-domain-kubelet-plugin/driver.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,22 @@ import (
3030
"k8s.io/dynamic-resource-allocation/resourceslice"
3131
"k8s.io/klog/v2"
3232

33+
"github.com/NVIDIA/k8s-dra-driver-gpu/pkg/flock"
3334
"github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue"
3435
)
3536

36-
// ErrorRetryMaxTimeout is the max amount of time we retry when errors are
37-
// returned before giving up.
38-
const ErrorRetryMaxTimeout = 45 * time.Second
37+
const (
38+
// ErrorRetryMaxTimeout limits the amount of time spent in the request
39+
// handlers UnprepareResourceClaims() and PrepareResourceClaims(), so that
40+
// we send a response to the kubelet in a predictable amount of time. Within
41+
// that deadline, retryable errors are retried (with backoff) via the
42+
// workqueue abstraction.
43+
ErrorRetryMaxTimeout = 45 * time.Second
44+
// DriverPrepUprepFlockPath is the path to a lock file used to make sure
45+
// that calls to nodePrepareResource() / nodeUnprepareResource() never
46+
// interleave, node-globally.
47+
DriverPrepUprepFlockPath = DriverPluginPath + "/pu.lock"
48+
)
3949

4050
// permanentError defines an error indicating that it is permanent.
4151
// By default, every error will be retried up to ErrorRetryMaxTimeout.
@@ -47,10 +57,10 @@ func isPermanentError(err error) bool {
4757
}
4858

4959
type driver struct {
50-
sync.Mutex
5160
client coreclientset.Interface
5261
pluginhelper *kubeletplugin.Helper
5362
state *DeviceState
63+
pulock *flock.Flock
5464
}
5565

5666
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
@@ -62,6 +72,7 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
6272
driver := &driver{
6373
client: config.clientsets.Core,
6474
state: state,
75+
pulock: flock.NewFlock(DriverPrepUprepFlockPath),
6576
}
6677

6778
helper, err := kubeletplugin.Start(
@@ -184,8 +195,14 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele
184195
}
185196

186197
func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) (bool, kubeletplugin.PrepareResult) {
187-
d.Lock()
188-
defer d.Unlock()
198+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
199+
if err != nil {
200+
res := kubeletplugin.PrepareResult{
201+
Err: fmt.Errorf("error acquiring prep/unprep lock: %w", err),
202+
}
203+
return false, res
204+
}
205+
defer release()
189206

190207
if claim.Status.Allocation == nil {
191208
res := kubeletplugin.PrepareResult{
@@ -207,8 +224,11 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res
207224
}
208225

209226
func (d *driver) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) (bool, error) {
210-
d.Lock()
211-
defer d.Unlock()
227+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
228+
if err != nil {
229+
return false, fmt.Errorf("error acquiring prep/unprep lock: %w", err)
230+
}
231+
defer release()
212232

213233
if err := d.state.Unprepare(ctx, claimRef); err != nil {
214234
return isPermanentError(err), fmt.Errorf("error unpreparing devices for claim '%v': %w", claimRef.String(), err)

cmd/gpu-kubelet-plugin/driver.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,28 @@ package main
1919
import (
2020
"context"
2121
"fmt"
22-
"sync"
22+
"time"
2323

2424
resourceapi "k8s.io/api/resource/v1beta1"
2525
"k8s.io/apimachinery/pkg/types"
2626
coreclientset "k8s.io/client-go/kubernetes"
2727
"k8s.io/dynamic-resource-allocation/kubeletplugin"
2828
"k8s.io/dynamic-resource-allocation/resourceslice"
2929
"k8s.io/klog/v2"
30+
31+
"github.com/NVIDIA/k8s-dra-driver-gpu/pkg/flock"
3032
)
3133

34+
// DriverPrepUprepFlockPath is the path to a lock file used to make sure
35+
// that calls to nodePrepareResource() / nodeUnprepareResource() never
36+
// interleave, node-globally.
37+
const DriverPrepUprepFlockPath = DriverPluginPath + "/pu.lock"
38+
3239
type driver struct {
33-
sync.Mutex
3440
client coreclientset.Interface
3541
pluginhelper *kubeletplugin.Helper
3642
state *DeviceState
43+
pulock *flock.Flock
3744
}
3845

3946
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
@@ -44,6 +51,7 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
4451
driver := &driver{
4552
client: config.clientsets.Core,
4653
state: state,
54+
pulock: flock.NewFlock(DriverPrepUprepFlockPath),
4755
}
4856

4957
helper, err := kubeletplugin.Start(
@@ -110,8 +118,13 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele
110118
}
111119

112120
func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) kubeletplugin.PrepareResult {
113-
d.Lock()
114-
defer d.Unlock()
121+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
122+
if err != nil {
123+
return kubeletplugin.PrepareResult{
124+
Err: fmt.Errorf("error acquiring prep/unprep lock: %w", err),
125+
}
126+
}
127+
defer release()
115128

116129
devs, err := d.state.Prepare(ctx, claim)
117130

@@ -126,8 +139,11 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res
126139
}
127140

128141
func (d *driver) nodeUnprepareResource(ctx context.Context, claimNs kubeletplugin.NamespacedObject) error {
129-
d.Lock()
130-
defer d.Unlock()
142+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
143+
if err != nil {
144+
return fmt.Errorf("error acquiring prep/unprep lock: %w", err)
145+
}
146+
defer release()
131147

132148
if err := d.state.Unprepare(ctx, string(claimNs.UID)); err != nil {
133149
return fmt.Errorf("error unpreparing devices for claim %v: %w", claimNs.UID, err)

pkg/flock/flock.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright (c) 2025 NVIDIA CORPORATION. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package flock
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"os"
23+
"syscall"
24+
"time"
25+
)
26+
27+
type Flock struct {
28+
path string
29+
}
30+
31+
type AcquireOption func(*acquireConfig)
32+
33+
type acquireConfig struct {
34+
timeout time.Duration
35+
pollPeriod time.Duration
36+
}
37+
38+
func WithTimeout(timeout time.Duration) AcquireOption {
39+
return func(cfg *acquireConfig) {
40+
cfg.timeout = timeout
41+
}
42+
}
43+
44+
func WithPollPeriod(period time.Duration) AcquireOption {
45+
return func(cfg *acquireConfig) {
46+
cfg.pollPeriod = period
47+
}
48+
}
49+
50+
func NewFlock(path string) *Flock {
51+
return &Flock{
52+
path: path,
53+
}
54+
}
55+
56+
// Acquire attempts to acquire an exclusive file lock, polling with the configured
57+
// PollPeriod until whichever comes first:
58+
//
59+
// - lock successfully acquired
60+
// - timeout (if provided)
61+
// - external cancellation of context
62+
//
63+
// Returns a release function that must be called to unlock the file, typically
64+
// with defer().
65+
//
66+
// Introduced to protect the work in nodePrepareResource() and
67+
// nodeUnprepareResource() under a file-based lock because more than one driver
68+
// pod may be running on a node, but at most one such function must execute at
69+
// any given time.
70+
func (l *Flock) Acquire(ctx context.Context, opts ...AcquireOption) (func(), error) {
71+
cfg := &acquireConfig{
72+
// Default: short period to keep lock acquisition rather responsive
73+
pollPeriod: 200 * time.Millisecond,
74+
// Default: timeout disabled
75+
timeout: 0,
76+
}
77+
for _, opt := range opts {
78+
opt(cfg)
79+
}
80+
81+
f, oerr := os.OpenFile(l.path, os.O_RDWR|os.O_CREATE, 0644)
82+
if oerr != nil {
83+
return nil, fmt.Errorf("error opening lock file (%s): %w", l.path, oerr)
84+
}
85+
86+
t0 := time.Now()
87+
ticker := time.NewTicker(cfg.pollPeriod)
88+
defer ticker.Stop()
89+
90+
// Use non-blocking peek with LOCK_NB flag and polling; trade-off:
91+
//
92+
// - pro: no need for having to reliably cancel a potentially long-blocking
93+
// flock() system call (can only be done with signals).
94+
// - con: lock acquisition time after a release is not immediate, but may
95+
// take up to PollPeriod amount of time. Not an issue in this context.
96+
for {
97+
flerr := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
98+
if flerr == nil {
99+
// Lock acquired. Return release function. An exclusive flock() lock
100+
// gets released when its file descriptor gets closed (also true
101+
// when the lock-holding process crashes).
102+
release := func() {
103+
f.Close()
104+
}
105+
return release, nil
106+
}
107+
108+
if flerr != syscall.EWOULDBLOCK {
109+
// May be EBADF, EINTR, EINVAl, ENOLCK, and
110+
// in general we want an outer retry mechanism
111+
// to retry in view of any of those.
112+
f.Close()
113+
return nil, fmt.Errorf("error acquiring lock (%s): %w", l.path, flerr)
114+
}
115+
116+
// Lock is currently held by other entity. Check for exit criteria;
117+
// otherwise retry lock acquisition upon next tick.
118+
119+
if cfg.timeout > 0 && time.Since(t0) > cfg.timeout {
120+
f.Close()
121+
return nil, fmt.Errorf("timeout acquiring lock (%s)", l.path)
122+
}
123+
124+
select {
125+
case <-ctx.Done():
126+
f.Close()
127+
return nil, fmt.Errorf("error acquiring lock (%s): %w", l.path, ctx.Err())
128+
case <-ticker.C:
129+
// Retry flock().
130+
continue
131+
}
132+
}
133+
}

0 commit comments

Comments
 (0)