Skip to content

Commit 9119740

Browse files
committed
Replace in-mem (un)prep lock with file-based lock
Signed-off-by: Dr. Jan-Philip Gehrcke <[email protected]>
1 parent 9732ac3 commit 9119740

File tree

3 files changed

+167
-14
lines changed

3 files changed

+167
-14
lines changed

cmd/compute-domain-kubelet-plugin/driver.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,22 @@ import (
3030
"k8s.io/dynamic-resource-allocation/resourceslice"
3131
"k8s.io/klog/v2"
3232

33+
"github.com/NVIDIA/k8s-dra-driver-gpu/pkg/flock"
3334
"github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue"
3435
)
3536

36-
// ErrorRetryMaxTimeout is the max amount of time we retry when errors are
37-
// returned before giving up.
38-
const ErrorRetryMaxTimeout = 45 * time.Second
37+
const (
38+
// ErrorRetryMaxTimeout limits the amount of time spent in the request
39+
// handlers UnprepareResourceClaims() and PrepareResourceClaims(), so that
40+
// we send a response to the kubelet in a predictable amount of time. Within
41+
// that deadline, retryable errors are retried (with backoff) via the
42+
// workqueue abstraction.
43+
ErrorRetryMaxTimeout = 45 * time.Second
44+
// DriverPrepUprepFlockPath is the path to a lock file used to make sure
45+
// that calls to nodePrepareResource() / nodeUnprepareResource() never
46+
// interleave, node-globally.
47+
DriverPrepUprepFlockPath = DriverPluginPath + "/pu.lock"
48+
)
3949

4050
// permanentError defines an error indicating that it is permanent.
4151
// By default, every error will be retried up to ErrorRetryMaxTimeout.
@@ -47,10 +57,10 @@ func isPermanentError(err error) bool {
4757
}
4858

4959
type driver struct {
50-
sync.Mutex
5160
client coreclientset.Interface
5261
pluginhelper *kubeletplugin.Helper
5362
state *DeviceState
63+
pulock *flock.Flock
5464
}
5565

5666
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
@@ -62,6 +72,7 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
6272
driver := &driver{
6373
client: config.clientsets.Core,
6474
state: state,
75+
pulock: flock.NewFlock(DriverPrepUprepFlockPath),
6576
}
6677

6778
helper, err := kubeletplugin.Start(
@@ -184,8 +195,14 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele
184195
}
185196

186197
func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) (bool, kubeletplugin.PrepareResult) {
187-
d.Lock()
188-
defer d.Unlock()
198+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
199+
if err != nil {
200+
res := kubeletplugin.PrepareResult{
201+
Err: fmt.Errorf("error acquiring prep/unprep lock: %w", err),
202+
}
203+
return false, res
204+
}
205+
defer release()
189206

190207
if claim.Status.Allocation == nil {
191208
res := kubeletplugin.PrepareResult{
@@ -207,8 +224,11 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res
207224
}
208225

209226
func (d *driver) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) (bool, error) {
210-
d.Lock()
211-
defer d.Unlock()
227+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
228+
if err != nil {
229+
return false, fmt.Errorf("error acquiring prep/unprep lock: %w", err)
230+
}
231+
defer release()
212232

213233
if err := d.state.Unprepare(ctx, claimRef); err != nil {
214234
return isPermanentError(err), fmt.Errorf("error unpreparing devices for claim '%v': %w", claimRef.String(), err)

cmd/gpu-kubelet-plugin/driver.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,28 @@ package main
1919
import (
2020
"context"
2121
"fmt"
22-
"sync"
22+
"time"
2323

2424
resourceapi "k8s.io/api/resource/v1beta1"
2525
"k8s.io/apimachinery/pkg/types"
2626
coreclientset "k8s.io/client-go/kubernetes"
2727
"k8s.io/dynamic-resource-allocation/kubeletplugin"
2828
"k8s.io/dynamic-resource-allocation/resourceslice"
2929
"k8s.io/klog/v2"
30+
31+
"github.com/NVIDIA/k8s-dra-driver-gpu/pkg/flock"
3032
)
3133

34+
// DriverPrepUprepFlockPath is the path to a lock file used to make sure
35+
// that calls to nodePrepareResource() / nodeUnprepareResource() never
36+
// interleave, node-globally.
37+
const DriverPrepUprepFlockPath = DriverPluginPath + "/pu.lock"
38+
3239
type driver struct {
33-
sync.Mutex
3440
client coreclientset.Interface
3541
pluginhelper *kubeletplugin.Helper
3642
state *DeviceState
43+
pulock *flock.Flock
3744
}
3845

3946
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
@@ -44,6 +51,7 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
4451
driver := &driver{
4552
client: config.clientsets.Core,
4653
state: state,
54+
pulock: flock.NewFlock(DriverPrepUprepFlockPath),
4755
}
4856

4957
helper, err := kubeletplugin.Start(
@@ -110,8 +118,13 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele
110118
}
111119

112120
func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) kubeletplugin.PrepareResult {
113-
d.Lock()
114-
defer d.Unlock()
121+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
122+
if err != nil {
123+
return kubeletplugin.PrepareResult{
124+
Err: fmt.Errorf("error acquiring prep/unprep lock: %w", err),
125+
}
126+
}
127+
defer release()
115128

116129
devs, err := d.state.Prepare(ctx, claim)
117130

@@ -126,8 +139,11 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res
126139
}
127140

128141
func (d *driver) nodeUnprepareResource(ctx context.Context, claimNs kubeletplugin.NamespacedObject) error {
129-
d.Lock()
130-
defer d.Unlock()
142+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
143+
if err != nil {
144+
return fmt.Errorf("error acquiring prep/unprep lock: %w", err)
145+
}
146+
defer release()
131147

132148
if err := d.state.Unprepare(ctx, string(claimNs.UID)); err != nil {
133149
return fmt.Errorf("error unpreparing devices for claim %v: %w", claimNs.UID, err)

pkg/flock/flock.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package flock
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"syscall"
8+
"time"
9+
)
10+
11+
type Flock struct {
12+
path string
13+
}
14+
15+
type AcquireOption func(*acquireConfig)
16+
17+
type acquireConfig struct {
18+
timeout time.Duration
19+
pollPeriod time.Duration
20+
}
21+
22+
func WithTimeout(timeout time.Duration) AcquireOption {
23+
return func(cfg *acquireConfig) {
24+
cfg.timeout = timeout
25+
}
26+
}
27+
28+
func WithPollPeriod(period time.Duration) AcquireOption {
29+
return func(cfg *acquireConfig) {
30+
cfg.pollPeriod = period
31+
}
32+
}
33+
34+
func NewFlock(path string) *Flock {
35+
return &Flock{
36+
path: path,
37+
}
38+
}
39+
40+
// Acquire attempts to acquire an exclusive file lock, polling with the configured
41+
// PollPeriod until whichever comes first:
42+
//
43+
// - lock successfully acquired
44+
// - timeout (if provided)
45+
// - external cancellation of context
46+
//
47+
// Returns a release function that must be called to unlock the file, typically
48+
// with defer().
49+
//
50+
// Introduced to protect the work in nodePrepareResource() and
51+
// nodeUnprepareResource() under a file-based lock because more than one driver
52+
// pod may be running on a node, but at most one such function must execute at
53+
// any given time.
54+
func (l *Flock) Acquire(ctx context.Context, opts ...AcquireOption) (func(), error) {
55+
cfg := &acquireConfig{
56+
// Default: short period to keep lock acquisition rather responsive
57+
pollPeriod: 200 * time.Millisecond,
58+
// Default: timeout disabled
59+
timeout: 0,
60+
}
61+
for _, opt := range opts {
62+
opt(cfg)
63+
}
64+
65+
f, oerr := os.OpenFile(l.path, os.O_RDWR|os.O_CREATE, 0644)
66+
if oerr != nil {
67+
return nil, fmt.Errorf("error opening lock file (%s): %w", l.path, oerr)
68+
}
69+
70+
t0 := time.Now()
71+
ticker := time.NewTicker(cfg.pollPeriod)
72+
defer ticker.Stop()
73+
74+
// Use non-blocking peek with LOCK_NB flag and polling; trade-off:
75+
//
76+
// - pro: no need for having to reliably cancel a potentially long-blocking
77+
// flock() system call (can only be done with signals).
78+
// - con: lock acquisition time after a release is not immediate, but may
79+
// take up to PollPeriod amount of time. Not an issue in this context.
80+
for {
81+
flerr := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
82+
if flerr == nil {
83+
// Lock acquired. Return release function. An exclusive flock() lock
84+
// gets released when its file descriptor gets closed (also true
85+
// when the lock-holding process crashes).
86+
release := func() {
87+
f.Close()
88+
}
89+
return release, nil
90+
}
91+
92+
if flerr != syscall.EWOULDBLOCK {
93+
// May be EBADF, EINTR, EINVAl, ENOLCK, and
94+
// in general we want an outer retry mechanism
95+
// to retry in view of any of those.
96+
f.Close()
97+
return nil, fmt.Errorf("error acquiring lock (%s): %w", l.path, flerr)
98+
}
99+
100+
// Lock is currently held by other entity. Check for exit criteria;
101+
// otherwise retry lock acquisition upon next tick.
102+
103+
if cfg.timeout > 0 && time.Since(t0) > cfg.timeout {
104+
f.Close()
105+
return nil, fmt.Errorf("timeout acquiring lock (%s)", l.path)
106+
}
107+
108+
select {
109+
case <-ctx.Done():
110+
f.Close()
111+
return nil, fmt.Errorf("error acquiring lock (%s): %w", l.path, ctx.Err())
112+
case <-ticker.C:
113+
// Retry flock().
114+
continue
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)