Skip to content

Commit 2262c92

Browse files
committed
Replace in-mem (un)prep lock with file-based lock
Signed-off-by: Dr. Jan-Philip Gehrcke <[email protected]>
1 parent 9732ac3 commit 2262c92

File tree

3 files changed

+149
-11
lines changed

3 files changed

+149
-11
lines changed

cmd/compute-domain-kubelet-plugin/driver.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/dynamic-resource-allocation/resourceslice"
3131
"k8s.io/klog/v2"
3232

33+
"github.com/NVIDIA/k8s-dra-driver-gpu/pkg/flock"
3334
"github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue"
3435
)
3536

@@ -47,10 +48,10 @@ func isPermanentError(err error) bool {
4748
}
4849

4950
type driver struct {
50-
sync.Mutex
5151
client coreclientset.Interface
5252
pluginhelper *kubeletplugin.Helper
5353
state *DeviceState
54+
pulock *flock.Flock
5455
}
5556

5657
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
@@ -62,6 +63,7 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
6263
driver := &driver{
6364
client: config.clientsets.Core,
6465
state: state,
66+
pulock: flock.NewFlock(DriverPluginPath + "/pu.lock"),
6567
}
6668

6769
helper, err := kubeletplugin.Start(
@@ -184,8 +186,14 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele
184186
}
185187

186188
func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) (bool, kubeletplugin.PrepareResult) {
187-
d.Lock()
188-
defer d.Unlock()
189+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
190+
if err != nil {
191+
res := kubeletplugin.PrepareResult{
192+
Err: fmt.Errorf("error acquiring prep/unprep lock: %w", err),
193+
}
194+
return false, res
195+
}
196+
defer release()
189197

190198
if claim.Status.Allocation == nil {
191199
res := kubeletplugin.PrepareResult{
@@ -207,8 +215,11 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res
207215
}
208216

209217
func (d *driver) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) (bool, error) {
210-
d.Lock()
211-
defer d.Unlock()
218+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
219+
if err != nil {
220+
return false, fmt.Errorf("error acquiring prep/unprep lock: %w", err)
221+
}
222+
defer release()
212223

213224
if err := d.state.Unprepare(ctx, claimRef); err != nil {
214225
return isPermanentError(err), fmt.Errorf("error unpreparing devices for claim '%v': %w", claimRef.String(), err)

cmd/gpu-kubelet-plugin/driver.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,23 @@ package main
1919
import (
2020
"context"
2121
"fmt"
22-
"sync"
22+
"time"
2323

2424
resourceapi "k8s.io/api/resource/v1beta1"
2525
"k8s.io/apimachinery/pkg/types"
2626
coreclientset "k8s.io/client-go/kubernetes"
2727
"k8s.io/dynamic-resource-allocation/kubeletplugin"
2828
"k8s.io/dynamic-resource-allocation/resourceslice"
2929
"k8s.io/klog/v2"
30+
31+
"github.com/NVIDIA/k8s-dra-driver-gpu/pkg/flock"
3032
)
3133

3234
type driver struct {
33-
sync.Mutex
3435
client coreclientset.Interface
3536
pluginhelper *kubeletplugin.Helper
3637
state *DeviceState
38+
pulock *flock.Flock
3739
}
3840

3941
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
@@ -44,6 +46,7 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
4446
driver := &driver{
4547
client: config.clientsets.Core,
4648
state: state,
49+
pulock: flock.NewFlock(DriverPluginPath + "/pu.lock"),
4750
}
4851

4952
helper, err := kubeletplugin.Start(
@@ -110,8 +113,13 @@ func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubele
110113
}
111114

112115
func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) kubeletplugin.PrepareResult {
113-
d.Lock()
114-
defer d.Unlock()
116+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
117+
if err != nil {
118+
return kubeletplugin.PrepareResult{
119+
Err: fmt.Errorf("error acquiring prep/unprep lock: %w", err),
120+
}
121+
}
122+
defer release()
115123

116124
devs, err := d.state.Prepare(ctx, claim)
117125

@@ -126,8 +134,11 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res
126134
}
127135

128136
func (d *driver) nodeUnprepareResource(ctx context.Context, claimNs kubeletplugin.NamespacedObject) error {
129-
d.Lock()
130-
defer d.Unlock()
137+
release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
138+
if err != nil {
139+
return fmt.Errorf("error acquiring prep/unprep lock: %w", err)
140+
}
141+
defer release()
131142

132143
if err := d.state.Unprepare(ctx, string(claimNs.UID)); err != nil {
133144
return fmt.Errorf("error unpreparing devices for claim %v: %w", claimNs.UID, err)

pkg/flock/flock.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package flock
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"syscall"
8+
"time"
9+
)
10+
11+
type Flock struct {
12+
path string
13+
}
14+
15+
type AcquireOption func(*acquireConfig)
16+
17+
type acquireConfig struct {
18+
timeout time.Duration
19+
pollPeriod time.Duration
20+
}
21+
22+
func WithTimeout(timeout time.Duration) AcquireOption {
23+
return func(cfg *acquireConfig) {
24+
cfg.timeout = timeout
25+
}
26+
}
27+
28+
func WithPollPeriod(period time.Duration) AcquireOption {
29+
return func(cfg *acquireConfig) {
30+
cfg.pollPeriod = period
31+
}
32+
}
33+
34+
func NewFlock(path string) *Flock {
35+
return &Flock{
36+
path: path,
37+
}
38+
}
39+
40+
// Acquire attempts to acquire an exclusive file lock, polling with the configured
41+
// PollPeriod until whichever comes first:
42+
//
43+
// - lock successfully acquired
44+
// - timeout (if provided)
45+
// - external cancellation of context
46+
//
47+
// Returns a release function that must be called to unlock the file, typically
48+
// with defer().
49+
//
50+
// Introduced to protect the work in nodePrepareResource() and
51+
// nodeUnprepareResource() under a file-based lock because more than one driver
52+
// pod may be running on a node, but at most one such function must execute at
53+
// any given time.
54+
func (l *Flock) Acquire(ctx context.Context, opts ...AcquireOption) (func(), error) {
55+
cfg := &acquireConfig{
56+
// Default: short period to keep lock acquisition rather responsive
57+
pollPeriod: 200 * time.Millisecond,
58+
// Default: timeout disabled
59+
timeout: 0,
60+
}
61+
for _, opt := range opts {
62+
opt(cfg)
63+
}
64+
65+
f, oerr := os.OpenFile(l.path, os.O_RDWR|os.O_CREATE, 0644)
66+
if oerr != nil {
67+
return nil, fmt.Errorf("error opening lock file (%s): %w", l.path, oerr)
68+
}
69+
70+
t0 := time.Now()
71+
ticker := time.NewTicker(cfg.pollPeriod)
72+
defer ticker.Stop()
73+
74+
// Use non-blocking peek with LOCK_NB flag and polling; trade-off:
75+
//
76+
// - pro: no need for having to reliably cancel a potentially long-blocking
77+
// flock() system call (can only be done with signals).
78+
// - con: lock acquisition time after a release is not immediate, but may
79+
// take up to PollPeriod amount of time. Not an issue in this context.
80+
for {
81+
flerr := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
82+
if flerr == nil {
83+
// Lock acquired. Return release function. An exclusive flock() lock
84+
// gets released when its file descriptor gets closed (also true
85+
// when the lock-holding process crashes).
86+
return func() {
87+
f.Close()
88+
}, nil
89+
}
90+
91+
if flerr != syscall.EWOULDBLOCK {
92+
// May be EBADF, EINTR, EINVAl, ENOLCK, and
93+
// in general we want an outer retry mechanism
94+
// to retry in view of any of those.
95+
f.Close()
96+
return nil, fmt.Errorf("error acquiring lock (%s): %w", l.path, flerr)
97+
}
98+
99+
// Lock is currently held by other entity. Check for exit criteria;
100+
// otherwise retry lock acquisition upon next tick.
101+
102+
if cfg.timeout > 0 && time.Since(t0) > cfg.timeout {
103+
f.Close()
104+
return nil, fmt.Errorf("timeout acquiring lock (%s)", l.path)
105+
}
106+
107+
select {
108+
case <-ctx.Done():
109+
f.Close()
110+
return nil, fmt.Errorf("context done before lock acquired (%s)", l.path)
111+
case <-ticker.C:
112+
// Retry flock().
113+
continue
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)