Skip to content

Commit 2f1fe78

Browse files
committed
CD plugin: handle stale partially prepared claims
Add a fundamentally required state reconciliation: Periodically, perform a self-initiated Unprepare() of previously partially prepared claims. Perform periodically: - Read checkpoint - Iterate through RCs in PrepareStarted state - For each: RC still known in API server? If not: 1) initiate an Unprepare 2) Remove from checkpoint file if unprepr was successful Relevance: Unpreparing any partially performed claim preparation might revert a state mutation that would otherwise be permanently inconsistent with API server state (e.g., this could remove a node label). Signed-off-by: Dr. Jan-Philip Gehrcke <[email protected]>
1 parent 7b5e2cd commit 2f1fe78

File tree

7 files changed

+342
-34
lines changed

7 files changed

+342
-34
lines changed

cmd/compute-domain-controller/cleanup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func (m *CleanupManager[T]) periodicCleanup(ctx context.Context) {
155155
return
156156
case <-ticker.C:
157157
if m.EnqueueCleanup() {
158-
klog.V(6).Infof("Periodoc cleanup requested for %T objects", *new(T))
158+
klog.V(6).Infof("Periodic cleanup requested for %T objects", *new(T))
159159
}
160160
}
161161
}

cmd/compute-domain-kubelet-plugin/cdi.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ func (cdi *CDIHandler) CreateClaimSpecFile(claimUID string, preparedDevices Prep
259259
return cdi.cache.WriteSpec(spec.Raw(), specName)
260260
}
261261

262-
func (cdi *CDIHandler) DeleteClaimSpecFile(claimUID string) error {
262+
func (cdi *CDIHandler) DeleteClaimSpecFileIfExists(claimUID string) error {
263263
specName := cdiapi.GenerateTransientSpecName(cdi.vendor, cdi.claimClass, claimUID)
264264
return cdi.cache.RemoveSpec(specName)
265265
}

cmd/compute-domain-kubelet-plugin/checkpointv.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type PreparedClaimV2 struct {
3232
CheckpointState ClaimCheckpointState `json:"checkpointState"`
3333
Status resourceapi.ResourceClaimStatus `json:"status,omitempty"`
3434
PreparedDevices PreparedDevices `json:"preparedDevices,omitempty"`
35+
Name string `json:"name,omitempty"`
36+
Namespace string `json:"namespace,omitempty"`
3537
}
3638

3739
// V1 types
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package main
19+
20+
import (
21+
"context"
22+
"fmt"
23+
"sync"
24+
"time"
25+
26+
resourcev1 "k8s.io/api/resource/v1"
27+
"k8s.io/apimachinery/pkg/api/errors"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/types"
30+
draclient "k8s.io/dynamic-resource-allocation/client"
31+
"k8s.io/dynamic-resource-allocation/kubeletplugin"
32+
"k8s.io/klog/v2"
33+
)
34+
35+
const (
36+
cleanupIntervalRC = 10 * time.Minute
37+
)
38+
39+
type CheckpointCleanupManager struct {
40+
waitGroup sync.WaitGroup
41+
cancelContext context.CancelFunc
42+
queue chan struct{}
43+
s *DeviceState
44+
draclient *draclient.Client
45+
}
46+
47+
func NewCheckpointCleanupManager(s *DeviceState, client *draclient.Client) *CheckpointCleanupManager {
48+
// `queue`: buffered channel to implement a pragmatic fixed-size queue; to
49+
// ensure at most one cleanup operation gets enqueued.
50+
return &CheckpointCleanupManager{
51+
s: s,
52+
draclient: client,
53+
queue: make(chan struct{}, 1),
54+
}
55+
}
56+
57+
// Log relevant error detail, but do not propagate errors anywhere.
58+
func (m *CheckpointCleanupManager) cleanup(ctx context.Context) {
59+
cp, err := m.s.getCheckpoint()
60+
if err != nil {
61+
klog.Errorf("Checkpointed RC cleanup: unable to get checkpoint: %s", err)
62+
return
63+
}
64+
65+
// Get all checkpointed claims in PrepareStarted state.
66+
filtered := make(PreparedClaimsByUIDV2)
67+
for uid, claim := range cp.V2.PreparedClaims {
68+
if claim.CheckpointState == ClaimCheckpointStatePrepareStarted {
69+
filtered[uid] = claim
70+
}
71+
}
72+
73+
klog.V(4).Infof("Checkpointed ResourceClaims in PrepareStarted state: %d found (of total: %d)", len(filtered), len(cp.V2.PreparedClaims))
74+
75+
for cpuid, cpclaim := range filtered {
76+
// Drop the claim from the checkpoint if it is not present anymore
77+
// according to the API server. There are two options to look up a claim
78+
// with a specific UID from the API server:
79+
//
80+
// 1) List(), with `FieldSelector: "metadata.uid=your-uid"`. Especially
81+
// across all namespaces (but also within one namespace) this can be
82+
// considered an irresponsibly expensive lookup.
83+
//
84+
// 2) Get(), using a specific name/namespace + subsequent UID
85+
// comparison. This is a cheap lookup for the API server.
86+
87+
// For (2), name and namespace must be stored in the checkpoint. That is
88+
// not true for legacy deployments with checkpoint data created by
89+
// version 25.3.x of this driver. Detect that situation by looking for
90+
// an empty `Name`.
91+
92+
if cpclaim.Name == "" {
93+
klog.V(4).Infof("Checkpointed RC cleanup: skip checkpointed claim '%s': RC name not in checkpoint", cpuid)
94+
// Consider fallback: expensive lookup by UID across all namespaces.
95+
continue
96+
}
97+
98+
claim, err := m.getClaimByName(ctx, cpclaim.Name, cpclaim.Namespace)
99+
if err != nil {
100+
if errors.IsNotFound(err) {
101+
klog.V(4).Infof(
102+
"Checkpointed RC cleanup: partially prepared claim '%s/%s:%s' is stale: not found in API server",
103+
cpclaim.Namespace,
104+
cpclaim.Name,
105+
cpuid)
106+
m.unprepare(ctx, cpuid, cpclaim)
107+
} else {
108+
// A transient error during API server lookup. No explicit retry
109+
// required. The next periodic cleanup invocation will
110+
// implicitly retry.
111+
klog.Infof("Checkpointed RC cleanup: skip for checkpointed claim %s: getClaimByName failed (retry later): %s", cpuid, err)
112+
}
113+
continue
114+
}
115+
116+
if string(claim.UID) != cpuid {
117+
// There cannot be two ResourceClaim objects with the same name in
118+
// the same namespace at the same time. It is possible for a
119+
// ResourceClaim with the same name to have a different UID if the
120+
// original object was deleted and a new one with the same name was
121+
// created. Hence, this checkpointed claim is stale.
122+
klog.V(4).Infof("Checkpointed RC cleanup: partially prepared claim '%s/%s' is stale: UID changed (checkpoint: %s, API server: %s)", cpclaim.Namespace, cpclaim.Name, cpuid, claim.UID)
123+
m.unprepare(ctx, cpuid, cpclaim)
124+
continue
125+
}
126+
127+
klog.V(4).Infof("Checkpointed RC cleanup: partially prepared claim not stale: %s", RCToString(claim))
128+
}
129+
}
130+
131+
// Do not propagate error back (but log it)
132+
func (m *CheckpointCleanupManager) unprepare(ctx context.Context, uid string, claim PreparedClaim) {
133+
134+
claimRef := kubeletplugin.NamespacedObject{
135+
UID: types.UID(uid),
136+
NamespacedName: types.NamespacedName{
137+
Name: claim.Name,
138+
Namespace: claim.Namespace,
139+
},
140+
}
141+
142+
// Perform one Unprepare attempt. Implicit retrying across periodic cleanup
143+
// invocations is sufficient. Rely on Unprepare() to delete claim from
144+
// checkpoint (upon success). TODO: review `Unprepare()` for code paths that
145+
// allow for this claim never to be dropped from the checkpoint (resulting
146+
// in infinite periodic cleanup attempts for this claim).
147+
err := m.s.Unprepare(ctx, claimRef)
148+
if err != nil {
149+
klog.Warningf("Error during self-initiated unprepare for %s (retried later): %s", claimRef.String(), err)
150+
return
151+
}
152+
153+
klog.Infof("Checkpointed RC cleanup: unprepared stale claim: %s", claimRef.String())
154+
}
155+
156+
func (m *CheckpointCleanupManager) getClaimByName(ctx context.Context, name string, ns string) (*resourcev1.ResourceClaim, error) {
157+
// The API call below should be responded to with low latency. Choose a
158+
// timeout constant here that reflects a pathological state if met; in this
159+
// case give up.
160+
childctx, cancel := context.WithTimeout(ctx, 20*time.Second)
161+
defer cancel()
162+
163+
// Works across DRA API versions -- but how can we not hardcode the return
164+
// type version? Or does this always automatically convert to a v1 type?
165+
claim, err := m.draclient.ResourceClaims(ns).Get(childctx, name, metav1.GetOptions{})
166+
if err != nil {
167+
return nil, fmt.Errorf("error getting resource claim %s/%s: %w", name, ns, err)
168+
}
169+
170+
return claim, nil
171+
}
172+
173+
func (m *CheckpointCleanupManager) Start(ctx context.Context) error {
174+
ctx, cancel := context.WithCancel(ctx)
175+
m.cancelContext = cancel
176+
177+
m.waitGroup.Add(1)
178+
go func() {
179+
defer m.waitGroup.Done()
180+
// Start producer: periodically submit cleanup task.
181+
m.triggerPeriodically(ctx)
182+
}()
183+
184+
m.waitGroup.Add(1)
185+
go func() {
186+
defer m.waitGroup.Done()
187+
// Start consumer
188+
m.worker(ctx)
189+
}()
190+
191+
klog.V(6).Infof("CheckpointCleanupManager started")
192+
return nil
193+
}
194+
195+
func (m *CheckpointCleanupManager) Stop() error {
196+
if m.cancelContext != nil {
197+
m.cancelContext()
198+
}
199+
m.waitGroup.Wait()
200+
return nil
201+
}
202+
203+
// enqueueCleanup() submits a cleanup task if the queue is currently empty.
204+
// Return a Boolean indicating whether the task was submitted or not.
205+
func (m *CheckpointCleanupManager) enqueueCleanup() bool {
206+
select {
207+
case m.queue <- struct{}{}:
208+
return true
209+
default:
210+
// Channel full: one task already lined up, did not submit more.
211+
return false
212+
}
213+
}
214+
215+
// Run forever until context is canceled.
216+
func (m *CheckpointCleanupManager) worker(ctx context.Context) {
217+
for {
218+
select {
219+
case <-ctx.Done():
220+
return
221+
case <-m.queue:
222+
// Do we want to timeout-control this cleanup run? What may take
223+
// unexpectedly long: lock acquisition (if we do any, e.g. around
224+
// checkpoint file mutation), API server interaction.
225+
m.cleanup(ctx)
226+
}
227+
}
228+
}
229+
230+
// Immediately submit a cleanup task; then periodically submit cleanup tasks
231+
// forever.
232+
func (m *CheckpointCleanupManager) triggerPeriodically(ctx context.Context) {
233+
// Maybe add jitter. Or delay first cleanup by a somewhat random amount.
234+
// After all, this periodic cleanup runs in N kubelet plugins and upon
235+
// driver upgrade they might restart at roughly the same time -- it makes
236+
// sense to smear the API server load out over time.
237+
ticker := time.NewTicker(cleanupIntervalRC)
238+
defer ticker.Stop()
239+
240+
m.cleanup(ctx)
241+
for {
242+
select {
243+
case <-ctx.Done():
244+
return
245+
case <-ticker.C:
246+
if m.enqueueCleanup() {
247+
klog.V(6).Infof("Cleanup for checkpointed ResourceClaims in PrepareStarted state: task submitted")
248+
} else {
249+
// A previous cleanup is taking long; that may not be normal.
250+
klog.Warningf("Cleanup for checkpointed ResourceClaims in PrepareStarted state: ongoing, skipped")
251+
}
252+
}
253+
}
254+
}
255+
256+
func RCToString(rc *resourcev1.ResourceClaim) string {
257+
return fmt.Sprintf("%s/%s:%s", rc.Namespace, rc.Name, rc.UID)
258+
}

0 commit comments

Comments
 (0)