Skip to content

Commit 5a3e611

Browse files
committed
adaptation: support concurrent RPCs via per Pod lock
For users who do not need to perform cross Pod updates, a new per Pod lock implementation can be selected via WithPodLocking() Option func. Signed-off-by: JP Phillips <[email protected]>
1 parent 9b8befa commit 5a3e611

File tree

3 files changed

+337
-25
lines changed

3 files changed

+337
-25
lines changed

pkg/adaptation/adaptation.go

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type UpdateFn func(context.Context, []*ContainerUpdate) ([]*ContainerUpdate, err
5858

5959
// Adaptation is the NRI abstraction for container runtime NRI adaptation/integration.
6060
type Adaptation struct {
61-
sync.Mutex
61+
locker locker
6262
name string
6363
version string
6464
dropinPath string
@@ -144,6 +144,16 @@ func WithDefaultValidator(cfg *validator.DefaultValidatorConfig) Option {
144144
}
145145
}
146146

147+
// WithPodLocking enables the per-Pod locking strategy instead of the default global lock.
148+
// WARNING: This is ONLY safe if the consumer guarantees that no plugin
149+
// will ever request cross-Pod container updates in its responses.
150+
func WithPodLocking() Option {
151+
return func(r *Adaptation) error {
152+
r.locker = newPodLocker()
153+
return nil
154+
}
155+
}
156+
147157
// New creates a new NRI Runtime.
148158
func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option) (*Adaptation, error) {
149159
var err error
@@ -175,6 +185,7 @@ func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option)
175185
}
176186

177187
r := &Adaptation{
188+
locker: newGlobalLocker(),
178189
name: name,
179190
version: version,
180191
syncFn: syncFn,
@@ -192,7 +203,7 @@ func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option)
192203
}
193204
}
194205

195-
log.Infof(noCtx, "runtime interface created")
206+
log.Infof(noCtx, "runtime interface created, using locker: %T", r.locker)
196207

197208
return r, nil
198209
}
@@ -201,8 +212,8 @@ func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option)
201212
func (r *Adaptation) Start() error {
202213
log.Infof(noCtx, "runtime interface starting up...")
203214

204-
r.Lock()
205-
defer r.Unlock()
215+
unlock := r.locker.Lock(noCtx)
216+
defer unlock()
206217

207218
if err := r.startPlugins(); err != nil {
208219
return err
@@ -219,8 +230,8 @@ func (r *Adaptation) Start() error {
219230
func (r *Adaptation) Stop() {
220231
log.Infof(noCtx, "runtime interface shutting down...")
221232

222-
r.Lock()
223-
defer r.Unlock()
233+
unlock := r.locker.Lock(noCtx)
234+
defer unlock()
224235

225236
r.stopListener()
226237
r.stopPlugins()
@@ -234,8 +245,9 @@ func (r *Adaptation) RunPodSandbox(ctx context.Context, evt *StateChangeEvent) e
234245

235246
// UpdatePodSandbox relays the corresponding CRI request to plugins.
236247
func (r *Adaptation) UpdatePodSandbox(ctx context.Context, req *UpdatePodSandboxRequest) (*UpdatePodSandboxResponse, error) {
237-
r.Lock()
238-
defer r.Unlock()
248+
podUID := req.GetPod().GetId()
249+
unlock := r.locker.LockPod(ctx, podUID)
250+
defer unlock()
239251
defer r.removeClosedPlugins()
240252

241253
for _, plugin := range r.plugins {
@@ -268,8 +280,9 @@ func (r *Adaptation) RemovePodSandbox(ctx context.Context, evt *StateChangeEvent
268280

269281
// CreateContainer relays the corresponding CRI request to plugins.
270282
func (r *Adaptation) CreateContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {
271-
r.Lock()
272-
defer r.Unlock()
283+
podUID := req.GetPod().GetId()
284+
unlock := r.locker.LockPod(ctx, podUID)
285+
defer unlock()
273286
defer r.removeClosedPlugins()
274287

275288
var (
@@ -321,8 +334,9 @@ func (r *Adaptation) PostStartContainer(ctx context.Context, evt *StateChangeEve
321334

322335
// UpdateContainer relays the corresponding CRI request to plugins.
323336
func (r *Adaptation) UpdateContainer(ctx context.Context, req *UpdateContainerRequest) (*UpdateContainerResponse, error) {
324-
r.Lock()
325-
defer r.Unlock()
337+
podUID := req.GetPod().GetId()
338+
unlock := r.locker.LockPod(ctx, podUID)
339+
defer unlock()
326340
defer r.removeClosedPlugins()
327341

328342
result := collectUpdateContainerResult(req)
@@ -348,8 +362,9 @@ func (r *Adaptation) PostUpdateContainer(ctx context.Context, evt *StateChangeEv
348362

349363
// StopContainer relays the corresponding CRI request to plugins.
350364
func (r *Adaptation) StopContainer(ctx context.Context, req *StopContainerRequest) (*StopContainerResponse, error) {
351-
r.Lock()
352-
defer r.Unlock()
365+
podUID := req.GetPod().GetId()
366+
unlock := r.locker.LockPod(ctx, podUID)
367+
defer unlock()
353368
defer r.removeClosedPlugins()
354369

355370
result := collectStopContainerResult()
@@ -379,24 +394,39 @@ func (r *Adaptation) StateChange(ctx context.Context, evt *StateChangeEvent) err
379394
return errors.New("invalid (unset) event in state change notification")
380395
}
381396

382-
r.Lock()
383-
defer r.Unlock()
384-
defer r.removeClosedPlugins()
397+
defer func() {
398+
unlock := r.locker.Lock(ctx)
399+
r.removeClosedPlugins()
400+
unlock()
401+
}()
385402

386-
for _, plugin := range r.plugins {
387-
err := plugin.StateChange(ctx, evt)
388-
if err != nil {
389-
return err
403+
podUID := evt.GetPod().GetId()
404+
if err := func() error {
405+
unlock := r.locker.LockPod(ctx, podUID)
406+
defer unlock()
407+
for _, plugin := range r.plugins {
408+
err := plugin.StateChange(ctx, evt)
409+
if err != nil {
410+
return err
411+
}
390412
}
413+
return nil
414+
}(); err != nil {
415+
return err
416+
}
417+
418+
// Cleanup pod lock resources if this is the removal event
419+
if evt.Event == Event_REMOVE_POD_SANDBOX {
420+
r.locker.CleanupPod(ctx, podUID)
391421
}
392422

393423
return nil
394424
}
395425

396426
// Perform a set of unsolicited container updates requested by a plugin.
397427
func (r *Adaptation) updateContainers(ctx context.Context, req []*ContainerUpdate) ([]*ContainerUpdate, error) {
398-
r.Lock()
399-
defer r.Unlock()
428+
unlock := r.locker.Lock(ctx)
429+
defer unlock()
400430

401431
return r.updateFn(ctx, req)
402432
}
@@ -608,13 +638,13 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
608638
if err != nil {
609639
log.Infof(ctx, "failed to synchronize plugin: %v", err)
610640
} else {
611-
r.Lock()
641+
unlock := r.locker.Lock(ctx)
612642
r.plugins = append(r.plugins, p)
613643
if p.isContainerAdjustmentValidator() {
614644
r.validators = append(r.validators, p)
615645
}
616646
r.sortPlugins()
617-
r.Unlock()
647+
unlock()
618648
log.Infof(ctx, "plugin %q connected and synchronized", p.name())
619649
}
620650

pkg/adaptation/lock.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package adaptation
18+
19+
import (
20+
"context"
21+
"sync"
22+
)
23+
24+
// UnlockFunc is a function returned by Lock methods to release the acquired lock.
25+
type UnlockFunc func()
26+
27+
// locker defines the internal interface for locking strategies.
28+
type locker interface {
29+
// Lock acquires the global lock.
30+
Lock(ctx context.Context) UnlockFunc
31+
// LockPod acquires the lock for a specific pod.
32+
LockPod(ctx context.Context, podUID string) UnlockFunc
33+
// CleanupPod performs cleanup for a pod lock.
34+
CleanupPod(ctx context.Context, podUID string)
35+
}
36+
37+
var _ locker = &globalLocker{}
38+
39+
// globalLocker implements locker using a single global mutex.
40+
type globalLocker struct {
41+
mu sync.Mutex
42+
}
43+
44+
// newGlobalLocker creates the default global locker.
45+
func newGlobalLocker() locker {
46+
return &globalLocker{}
47+
}
48+
49+
func (m *globalLocker) Lock(context.Context) UnlockFunc {
50+
m.mu.Lock()
51+
return m.mu.Unlock
52+
}
53+
54+
func (m *globalLocker) LockPod(context.Context, string) UnlockFunc {
55+
// Ignores podUID, uses the single global lock
56+
m.mu.Lock()
57+
return m.mu.Unlock
58+
}
59+
60+
func (m *globalLocker) CleanupPod(context.Context, string) {
61+
// No-op
62+
}
63+
64+
var _ locker = &podLocker{}
65+
66+
// podLocker implements locker using a separate mutex for each Pod UID
67+
// and using its own main mutex 'mu' as the global lock.
68+
type podLocker struct {
69+
mu sync.Mutex // Protects access to the locks map AND acts as the global lock.
70+
locks map[string]*sync.Mutex // Map from Pod UID to its mutex
71+
}
72+
73+
// newPodLocker creates the per-pod locker.
74+
func newPodLocker() locker {
75+
return &podLocker{
76+
locks: make(map[string]*sync.Mutex),
77+
}
78+
}
79+
80+
// Lock acquires the main mutex, acting as the global lock.
81+
func (m *podLocker) Lock(context.Context) UnlockFunc {
82+
m.mu.Lock()
83+
return m.mu.Unlock
84+
}
85+
86+
// LockPod acquires the lock for a specific Pod UID.
87+
func (m *podLocker) LockPod(_ context.Context, podUID string) UnlockFunc {
88+
m.mu.Lock()
89+
podMu, ok := m.locks[podUID]
90+
if !ok {
91+
podMu = &sync.Mutex{}
92+
m.locks[podUID] = podMu
93+
}
94+
m.mu.Unlock()
95+
96+
podMu.Lock()
97+
return podMu.Unlock
98+
}
99+
100+
// CleanupPod removes the lock for a specific Pod UID from the map.
101+
func (m *podLocker) CleanupPod(_ context.Context, podUID string) {
102+
m.mu.Lock()
103+
delete(m.locks, podUID)
104+
m.mu.Unlock()
105+
}

0 commit comments

Comments
 (0)