Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/compute-domain-controller/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (m *CleanupManager[T]) periodicCleanup(ctx context.Context) {
return
case <-ticker.C:
if m.EnqueueCleanup() {
klog.V(6).Infof("Periodoc cleanup requested for %T objects", *new(T))
klog.V(6).Infof("Periodic cleanup requested for %T objects", *new(T))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/compute-domain-kubelet-plugin/cdi.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (cdi *CDIHandler) CreateClaimSpecFile(claimUID string, preparedDevices Prep
return cdi.cache.WriteSpec(spec.Raw(), specName)
}

func (cdi *CDIHandler) DeleteClaimSpecFile(claimUID string) error {
func (cdi *CDIHandler) DeleteClaimSpecFileIfExists(claimUID string) error {
specName := cdiapi.GenerateTransientSpecName(cdi.vendor, cdi.claimClass, claimUID)
return cdi.cache.RemoveSpec(specName)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/compute-domain-kubelet-plugin/checkpointv.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type PreparedClaimV2 struct {
CheckpointState ClaimCheckpointState `json:"checkpointState"`
Status resourceapi.ResourceClaimStatus `json:"status,omitempty"`
PreparedDevices PreparedDevices `json:"preparedDevices,omitempty"`
Name string `json:"name,omitempty"`
Namespace string `json:"namespace,omitempty"`
}

// V1 types
Expand Down
271 changes: 271 additions & 0 deletions cmd/compute-domain-kubelet-plugin/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"fmt"
"sync"
"time"

resourcev1 "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
draclient "k8s.io/dynamic-resource-allocation/client"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
)

const (
ResourceClaimCleanupInterval = 10 * time.Minute
)

type TypeUnprepCallable = func(ctx context.Context, claimRef kubeletplugin.NamespacedObject) (bool, error)

type CheckpointCleanupManager struct {
waitGroup sync.WaitGroup
cancelContext context.CancelFunc
queue chan struct{}
devicestate *DeviceState
draclient *draclient.Client

unprepfunc TypeUnprepCallable
}

func NewCheckpointCleanupManager(s *DeviceState, client *draclient.Client) *CheckpointCleanupManager {
// `queue`: buffered channel to implement a pragmatic fixed-size queue; to
// ensure at most one cleanup operation gets enqueued. TODO: review
// replacing this with a condition variable.
return &CheckpointCleanupManager{
devicestate: s,
draclient: client,
queue: make(chan struct{}, 1),
}
}

func (m *CheckpointCleanupManager) Start(ctx context.Context, unprepfunc TypeUnprepCallable) error {
ctx, cancel := context.WithCancel(ctx)
m.cancelContext = cancel
m.unprepfunc = unprepfunc

m.waitGroup.Add(1)
go func() {
defer m.waitGroup.Done()
// Start producer: periodically submit cleanup task.
m.triggerPeriodically(ctx)
}()

m.waitGroup.Add(1)
go func() {
defer m.waitGroup.Done()
// Start consumer
m.worker(ctx)
}()

klog.V(6).Infof("CheckpointCleanupManager started")
return nil
}

func (m *CheckpointCleanupManager) Stop() error {
if m.cancelContext != nil {
m.cancelContext()
}
m.waitGroup.Wait()
return nil
}

// cleanup() is the high-level cleanup routine run once upon plugin startup and
// then periodically. It gets all claims in PrepareStarted state from the
// current checkpoint, and runs `unprepareIfStale()` for each of them. Each
// invocation of `cleanup()` and each invocation of `unprepareIfStale()` is
// best-effort: errors do not need to be propagated (but are expected to be
// properly logged).
func (m *CheckpointCleanupManager) cleanup(ctx context.Context) {
cp, err := m.devicestate.getCheckpoint()
if err != nil {
klog.Errorf("Checkpointed RC cleanup: unable to get checkpoint: %s", err)
return
}

// Get checkpointed claims in PrepareStarted state.
filtered := make(PreparedClaimsByUIDV2)
for uid, claim := range cp.V2.PreparedClaims {
if claim.CheckpointState == ClaimCheckpointStatePrepareStarted {
filtered[uid] = claim
}
}

klog.V(4).Infof("Checkpointed RC cleanup: claims in PrepareStarted state: %d (of %d)", len(filtered), len(cp.V2.PreparedClaims))

for cpuid, cpclaim := range filtered {
m.unprepareIfStale(ctx, cpuid, cpclaim)
}
}

// Detect if claim is stale (not known to the API server). Call unprepare() if
// it is stale.
//
// There are two options to look up a claim with a specific UID from the API
// server:
//
// 1. List(), with `FieldSelector: "metadata.uid=<uid>"`. Especially
// across all namespaces (but also within one namespace) this is
// irresponsibly expensive lookup, cannot be done on a routine
// basis.
//
// 2. Get(), using a specific name/namespace + subsequent UID
// comparison. This is a cheap lookup for the API server.
//
// For (2), name and namespace must be stored in the checkpoint. That is not
// true for legacy deployments with checkpoint data created by version 25.3.x of
// this driver. Detect that situation by looking for an empty `Name`.
func (m *CheckpointCleanupManager) unprepareIfStale(ctx context.Context, cpuid string, cpclaim PreparedClaim) {
if cpclaim.Name == "" {
klog.V(4).Infof("Checkpointed RC cleanup: skip checkpointed claim '%s': RC name not in checkpoint", cpuid)
// Consider fallback: expensive lookup by UID across all namespaces.
return
}

claim, err := m.getClaimByName(ctx, cpclaim.Name, cpclaim.Namespace)
if err != nil && errors.IsNotFound(err) {
klog.V(4).Infof(
"Checkpointed RC cleanup: partially prepared claim '%s/%s:%s' is stale: not found in API server",
cpclaim.Namespace,
cpclaim.Name,
cpuid)
m.unprepare(ctx, cpuid, cpclaim)
return
}

// A transient error during API server lookup. No explicit retry required.
// The next periodic cleanup invocation will implicitly retry.
if err != nil {
klog.Infof("Checkpointed RC cleanup: skip for checkpointed claim %s: getClaimByName failed (retry later): %s", cpuid, err)
return
}

if string(claim.UID) != cpuid {
// There cannot be two ResourceClaim objects with the same name in
// the same namespace at the same time. It is possible for a
// ResourceClaim with the same name to have a different UID if the
// original object was deleted and a new one with the same name was
// created. Hence, this checkpointed claim is stale.
klog.V(4).Infof("Checkpointed RC cleanup: partially prepared claim '%s/%s' is stale: UID changed (checkpoint: %s, API server: %s)", cpclaim.Namespace, cpclaim.Name, cpuid, claim.UID)
m.unprepare(ctx, cpuid, cpclaim)
return
}

klog.V(4).Infof("Checkpointed RC cleanup: partially prepared claim not stale: %s", ResourceClaimToString(claim))
}

// unprepare() attempts to unprepare devices for the provided claim
// ('self-initiated unprepare'). Expected side effect: removal of the
// corresponding claim from the checkpoint.
func (m *CheckpointCleanupManager) unprepare(ctx context.Context, uid string, claim PreparedClaim) {
claimRef := kubeletplugin.NamespacedObject{
UID: types.UID(uid),
NamespacedName: types.NamespacedName{
Name: claim.Name,
Namespace: claim.Namespace,
},
}

// Perform one Unprepare attempt. Implicit retrying across periodic cleanup
// invocations is sufficient. Rely on Unprepare() to delete claim from
// checkpoint (upon success). TODO: review `Unprepare()` for code paths that
// allow for this claim never to be dropped from the checkpoint (resulting
// in infinite periodic cleanup attempts for this claim).
_, err := m.unprepfunc(ctx, claimRef)
if err != nil {
klog.Warningf("Checkpointed RC cleanup: error during unprepare for %s (retried later): %s", claimRef.String(), err)
return
}

klog.Infof("Checkpointed RC cleanup: unprepared stale claim: %s", claimRef.String())
}

// getClaimByName() attempts to fetch a ResourceClaim object directly from the
// API server.
func (m *CheckpointCleanupManager) getClaimByName(ctx context.Context, name string, ns string) (*resourcev1.ResourceClaim, error) {
// The API call below should be responded to with low latency. Choose a
// timeout constant here that reflects a pathological state if met; in this
// case give up.
childctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()

claim, err := m.draclient.ResourceClaims(ns).Get(childctx, name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting resource claim %s/%s: %w", name, ns, err)
}

return claim, nil
}

// enqueueCleanup() submits a cleanup task if the queue is currently empty.
// Return a Boolean indicating whether the task was submitted or not.
func (m *CheckpointCleanupManager) enqueueCleanup() bool {
select {
case m.queue <- struct{}{}:
return true
default:
// Channel full: one task already lined up, did not submit more.
return false
}
}

// Run forever until context is canceled.
func (m *CheckpointCleanupManager) worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-m.queue:
// Do we want to timeout-control this cleanup run? What may take
// unexpectedly long: lock acquisition (if we do any, e.g. around
// checkpoint file mutation), API server interaction.
m.cleanup(ctx)
}
}
}

// Immediately submit a cleanup task; then periodically submit cleanup tasks
// forever.
func (m *CheckpointCleanupManager) triggerPeriodically(ctx context.Context) {
// Maybe add jitter. Or delay first cleanup by a somewhat random amount.
// After all, this periodic cleanup runs in N kubelet plugins and upon
// driver upgrade they might restart at roughly the same time -- it makes
// sense to smear the API server load out over time.
ticker := time.NewTicker(ResourceClaimCleanupInterval)
defer ticker.Stop()

m.cleanup(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if m.enqueueCleanup() {
klog.V(6).Infof("Checkpointed RC cleanup: task submitted")
} else {
// A previous cleanup is taking long; that may not be normal.
klog.Warningf("Checkpointed RC cleanup: ongoing, skipped")
}
}
}
}
Loading