Skip to content

Commit 3c5f3b6

Browse files
committed
Fix Prepare() idempotency by preventing reverse state transition and atomically detecting already-prepared claims.
* Avoid overwriting PrepareCompleted with PrepareStarted on subsequent Prepare() calls. * Move claim prepared state checks into a single updateCheckpoint call for atomicity. * Ensure checkpoint state does not transition from completed to started state again. Signed-off-by: Shiva Krishna, Merla <[email protected]>
1 parent 11606fd commit 3c5f3b6

File tree

2 files changed

+40
-28
lines changed

2 files changed

+40
-28
lines changed

cmd/compute-domain-kubelet-plugin/device_state.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -149,34 +149,40 @@ func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceCl
149149

150150
claimUID := string(claim.UID)
151151

152-
checkpoint, err := s.getCheckpoint()
153-
if err != nil {
154-
return nil, fmt.Errorf("unable to get checkpoint: %w", err)
155-
}
152+
var preparedDevices PreparedDevices
153+
var alreadyPrepared bool
156154

157-
preparedClaim, exists := checkpoint.V2.PreparedClaims[claimUID]
158-
if exists && preparedClaim.CheckpointState == ClaimCheckpointStatePrepareCompleted {
159-
// Make this a noop. Associated device(s) has/ave been prepared by us.
160-
// Prepare() must be idempotent, as it may be invoked more than once per
161-
// claim (and actual device preparation must happen at most once).
162-
klog.V(4).Infof("Skip prepare: claim %v found in checkpoint", claimUID)
163-
return preparedClaim.PreparedDevices.GetDevices(), nil
164-
}
155+
// Atomically check if prepare for this claim is already completed, otherwise mark as started.
156+
err := s.updateCheckpoint(func(cp *Checkpoint) {
157+
if pc, ok := cp.V2.PreparedClaims[claimUID]; ok &&
158+
pc.CheckpointState == ClaimCheckpointStatePrepareCompleted {
165159

166-
err = s.updateCheckpoint(func(checkpoint *Checkpoint) {
167-
checkpoint.V2.PreparedClaims[claimUID] = PreparedClaim{
160+
alreadyPrepared = true
161+
preparedDevices = pc.PreparedDevices
162+
return
163+
}
164+
165+
cp.V2.PreparedClaims[claimUID] = PreparedClaim{
168166
CheckpointState: ClaimCheckpointStatePrepareStarted,
169167
Status: claim.Status,
170168
Name: claim.Name,
171169
Namespace: claim.Namespace,
172170
}
171+
klog.V(6).Infof("checkpoint updated for claim %v", claimUID)
173172
})
174173
if err != nil {
175174
return nil, fmt.Errorf("unable to update checkpoint: %w", err)
176175
}
177-
klog.V(6).Infof("checkpoint updated for claim %v", claimUID)
178176

179-
preparedDevices, err := s.prepareDevices(ctx, claim)
177+
if alreadyPrepared {
178+
// Make this a noop. Associated device(s) has/ave been prepared by us.
179+
// Prepare() must be idempotent, as it may be invoked more than once per
180+
// claim (and actual device preparation must happen at most once).
181+
klog.V(4).Infof("skip prepare: claim %v found in checkpoint", claimUID)
182+
return preparedDevices.GetDevices(), nil
183+
}
184+
185+
preparedDevices, err = s.prepareDevices(ctx, claim)
180186
if err != nil {
181187
return nil, fmt.Errorf("prepare devices failed: %w", err)
182188
}

cmd/gpu-kubelet-plugin/device_state.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,32 +150,38 @@ func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceCl
150150

151151
claimUID := string(claim.UID)
152152

153-
checkpoint, err := s.getCheckpoint()
154-
if err != nil {
155-
return nil, fmt.Errorf("unable to get checkpoint: %v", err)
156-
}
153+
var preparedDevices PreparedDevices
154+
var alreadyPrepared bool
157155

158-
err = s.updateCheckpoint(func(checkpoint *Checkpoint) {
159-
checkpoint.V2.PreparedClaims[claimUID] = PreparedClaim{
156+
// Atomically check if prepare for this claim is already completed, otherwise mark as started.
157+
err := s.updateCheckpoint(func(cp *Checkpoint) {
158+
if pc, ok := cp.V2.PreparedClaims[claimUID]; ok &&
159+
pc.CheckpointState == ClaimCheckpointStatePrepareCompleted {
160+
161+
alreadyPrepared = true
162+
preparedDevices = pc.PreparedDevices
163+
return
164+
}
165+
166+
cp.V2.PreparedClaims[claimUID] = PreparedClaim{
160167
CheckpointState: ClaimCheckpointStatePrepareStarted,
161168
Status: claim.Status,
162169
}
170+
klog.V(6).Infof("checkpoint updated for claim %v", claimUID)
163171
})
164172
if err != nil {
165173
return nil, fmt.Errorf("unable to update checkpoint: %w", err)
166174
}
167-
klog.V(6).Infof("checkpoint updated for claim %v", claimUID)
168175

169-
preparedClaim, exists := checkpoint.V2.PreparedClaims[claimUID]
170-
if exists && preparedClaim.CheckpointState == ClaimCheckpointStatePrepareCompleted {
176+
if alreadyPrepared {
171177
// Make this a noop. Associated device(s) has/ave been prepared by us.
172178
// Prepare() must be idempotent, as it may be invoked more than once per
173179
// claim (and actual device preparation must happen at most once).
174-
klog.V(6).Infof("skip prepare: claim %v found in checkpoint", claimUID)
175-
return preparedClaim.PreparedDevices.GetDevices(), nil
180+
klog.V(4).Infof("skip prepare: claim %v found in checkpoint", claimUID)
181+
return preparedDevices.GetDevices(), nil
176182
}
177183

178-
preparedDevices, err := s.prepareDevices(ctx, claim)
184+
preparedDevices, err = s.prepareDevices(ctx, claim)
179185
if err != nil {
180186
return nil, fmt.Errorf("prepare devices failed: %w", err)
181187
}

0 commit comments

Comments
 (0)