diff --git a/cmd/compute-domain-controller/cleanup.go b/cmd/compute-domain-controller/cleanup.go index c3bf6b85a..7291f5523 100644 --- a/cmd/compute-domain-controller/cleanup.go +++ b/cmd/compute-domain-controller/cleanup.go @@ -155,7 +155,7 @@ func (m *CleanupManager[T]) periodicCleanup(ctx context.Context) { return case <-ticker.C: if m.EnqueueCleanup() { - klog.V(6).Infof("Periodoc cleanup requested for %T objects", *new(T)) + klog.V(6).Infof("Periodic cleanup requested for %T objects", *new(T)) } } } diff --git a/cmd/compute-domain-kubelet-plugin/cdi.go b/cmd/compute-domain-kubelet-plugin/cdi.go index dde65b425..95be15a72 100644 --- a/cmd/compute-domain-kubelet-plugin/cdi.go +++ b/cmd/compute-domain-kubelet-plugin/cdi.go @@ -259,7 +259,7 @@ func (cdi *CDIHandler) CreateClaimSpecFile(claimUID string, preparedDevices Prep return cdi.cache.WriteSpec(spec.Raw(), specName) } -func (cdi *CDIHandler) DeleteClaimSpecFile(claimUID string) error { +func (cdi *CDIHandler) DeleteClaimSpecFileIfExists(claimUID string) error { specName := cdiapi.GenerateTransientSpecName(cdi.vendor, cdi.claimClass, claimUID) return cdi.cache.RemoveSpec(specName) } diff --git a/cmd/compute-domain-kubelet-plugin/checkpointv.go b/cmd/compute-domain-kubelet-plugin/checkpointv.go index 80315e2e0..32c57f20f 100644 --- a/cmd/compute-domain-kubelet-plugin/checkpointv.go +++ b/cmd/compute-domain-kubelet-plugin/checkpointv.go @@ -32,6 +32,8 @@ type PreparedClaimV2 struct { CheckpointState ClaimCheckpointState `json:"checkpointState"` Status resourceapi.ResourceClaimStatus `json:"status,omitempty"` PreparedDevices PreparedDevices `json:"preparedDevices,omitempty"` + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` } // V1 types diff --git a/cmd/compute-domain-kubelet-plugin/cleanup.go b/cmd/compute-domain-kubelet-plugin/cleanup.go new file mode 100644 index 000000000..3580be0b9 --- /dev/null +++ b/cmd/compute-domain-kubelet-plugin/cleanup.go @@ -0,0 +1,271 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "sync" + "time" + + resourcev1 "k8s.io/api/resource/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + draclient "k8s.io/dynamic-resource-allocation/client" + "k8s.io/dynamic-resource-allocation/kubeletplugin" + "k8s.io/klog/v2" +) + +const ( + ResourceClaimCleanupInterval = 10 * time.Minute +) + +type TypeUnprepCallable = func(ctx context.Context, claimRef kubeletplugin.NamespacedObject) (bool, error) + +type CheckpointCleanupManager struct { + waitGroup sync.WaitGroup + cancelContext context.CancelFunc + queue chan struct{} + devicestate *DeviceState + draclient *draclient.Client + + unprepfunc TypeUnprepCallable +} + +func NewCheckpointCleanupManager(s *DeviceState, client *draclient.Client) *CheckpointCleanupManager { + // `queue`: buffered channel to implement a pragmatic fixed-size queue; to + // ensure at most one cleanup operation gets enqueued. TODO: review + // replacing this with a condition variable. + return &CheckpointCleanupManager{ + devicestate: s, + draclient: client, + queue: make(chan struct{}, 1), + } +} + +func (m *CheckpointCleanupManager) Start(ctx context.Context, unprepfunc TypeUnprepCallable) error { + ctx, cancel := context.WithCancel(ctx) + m.cancelContext = cancel + m.unprepfunc = unprepfunc + + m.waitGroup.Add(1) + go func() { + defer m.waitGroup.Done() + // Start producer: periodically submit cleanup task. + m.triggerPeriodically(ctx) + }() + + m.waitGroup.Add(1) + go func() { + defer m.waitGroup.Done() + // Start consumer + m.worker(ctx) + }() + + klog.V(6).Infof("CheckpointCleanupManager started") + return nil +} + +func (m *CheckpointCleanupManager) Stop() error { + if m.cancelContext != nil { + m.cancelContext() + } + m.waitGroup.Wait() + return nil +} + +// cleanup() is the high-level cleanup routine run once upon plugin startup and +// then periodically. It gets all claims in PrepareStarted state from the +// current checkpoint, and runs `unprepareIfStale()` for each of them. Each +// invocation of `cleanup()` and each invocation of `unprepareIfStale()` is +// best-effort: errors do not need to be propagated (but are expected to be +// properly logged). +func (m *CheckpointCleanupManager) cleanup(ctx context.Context) { + cp, err := m.devicestate.getCheckpoint() + if err != nil { + klog.Errorf("Checkpointed RC cleanup: unable to get checkpoint: %s", err) + return + } + + // Get checkpointed claims in PrepareStarted state. + filtered := make(PreparedClaimsByUIDV2) + for uid, claim := range cp.V2.PreparedClaims { + if claim.CheckpointState == ClaimCheckpointStatePrepareStarted { + filtered[uid] = claim + } + } + + klog.V(4).Infof("Checkpointed RC cleanup: claims in PrepareStarted state: %d (of %d)", len(filtered), len(cp.V2.PreparedClaims)) + + for cpuid, cpclaim := range filtered { + m.unprepareIfStale(ctx, cpuid, cpclaim) + } +} + +// Detect if claim is stale (not known to the API server). Call unprepare() if +// it is stale. +// +// There are two options to look up a claim with a specific UID from the API +// server: +// +// 1. List(), with `FieldSelector: "metadata.uid="`. Especially +// across all namespaces (but also within one namespace) this is +// irresponsibly expensive lookup, cannot be done on a routine +// basis. +// +// 2. Get(), using a specific name/namespace + subsequent UID +// comparison. This is a cheap lookup for the API server. +// +// For (2), name and namespace must be stored in the checkpoint. That is not +// true for legacy deployments with checkpoint data created by version 25.3.x of +// this driver. Detect that situation by looking for an empty `Name`. +func (m *CheckpointCleanupManager) unprepareIfStale(ctx context.Context, cpuid string, cpclaim PreparedClaim) { + if cpclaim.Name == "" { + klog.V(4).Infof("Checkpointed RC cleanup: skip checkpointed claim '%s': RC name not in checkpoint", cpuid) + // Consider fallback: expensive lookup by UID across all namespaces. + return + } + + claim, err := m.getClaimByName(ctx, cpclaim.Name, cpclaim.Namespace) + if err != nil && errors.IsNotFound(err) { + klog.V(4).Infof( + "Checkpointed RC cleanup: partially prepared claim '%s/%s:%s' is stale: not found in API server", + cpclaim.Namespace, + cpclaim.Name, + cpuid) + m.unprepare(ctx, cpuid, cpclaim) + return + } + + // A transient error during API server lookup. No explicit retry required. + // The next periodic cleanup invocation will implicitly retry. + if err != nil { + klog.Infof("Checkpointed RC cleanup: skip for checkpointed claim %s: getClaimByName failed (retry later): %s", cpuid, err) + return + } + + if string(claim.UID) != cpuid { + // There cannot be two ResourceClaim objects with the same name in + // the same namespace at the same time. It is possible for a + // ResourceClaim with the same name to have a different UID if the + // original object was deleted and a new one with the same name was + // created. Hence, this checkpointed claim is stale. + klog.V(4).Infof("Checkpointed RC cleanup: partially prepared claim '%s/%s' is stale: UID changed (checkpoint: %s, API server: %s)", cpclaim.Namespace, cpclaim.Name, cpuid, claim.UID) + m.unprepare(ctx, cpuid, cpclaim) + return + } + + klog.V(4).Infof("Checkpointed RC cleanup: partially prepared claim not stale: %s", ResourceClaimToString(claim)) +} + +// unprepare() attempts to unprepare devices for the provided claim +// ('self-initiated unprepare'). Expected side effect: removal of the +// corresponding claim from the checkpoint. +func (m *CheckpointCleanupManager) unprepare(ctx context.Context, uid string, claim PreparedClaim) { + claimRef := kubeletplugin.NamespacedObject{ + UID: types.UID(uid), + NamespacedName: types.NamespacedName{ + Name: claim.Name, + Namespace: claim.Namespace, + }, + } + + // Perform one Unprepare attempt. Implicit retrying across periodic cleanup + // invocations is sufficient. Rely on Unprepare() to delete claim from + // checkpoint (upon success). TODO: review `Unprepare()` for code paths that + // allow for this claim never to be dropped from the checkpoint (resulting + // in infinite periodic cleanup attempts for this claim). + _, err := m.unprepfunc(ctx, claimRef) + if err != nil { + klog.Warningf("Checkpointed RC cleanup: error during unprepare for %s (retried later): %s", claimRef.String(), err) + return + } + + klog.Infof("Checkpointed RC cleanup: unprepared stale claim: %s", claimRef.String()) +} + +// getClaimByName() attempts to fetch a ResourceClaim object directly from the +// API server. +func (m *CheckpointCleanupManager) getClaimByName(ctx context.Context, name string, ns string) (*resourcev1.ResourceClaim, error) { + // The API call below should be responded to with low latency. Choose a + // timeout constant here that reflects a pathological state if met; in this + // case give up. + childctx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + + claim, err := m.draclient.ResourceClaims(ns).Get(childctx, name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("error getting resource claim %s/%s: %w", name, ns, err) + } + + return claim, nil +} + +// enqueueCleanup() submits a cleanup task if the queue is currently empty. +// Return a Boolean indicating whether the task was submitted or not. +func (m *CheckpointCleanupManager) enqueueCleanup() bool { + select { + case m.queue <- struct{}{}: + return true + default: + // Channel full: one task already lined up, did not submit more. + return false + } +} + +// Run forever until context is canceled. +func (m *CheckpointCleanupManager) worker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-m.queue: + // Do we want to timeout-control this cleanup run? What may take + // unexpectedly long: lock acquisition (if we do any, e.g. around + // checkpoint file mutation), API server interaction. + m.cleanup(ctx) + } + } +} + +// Immediately submit a cleanup task; then periodically submit cleanup tasks +// forever. +func (m *CheckpointCleanupManager) triggerPeriodically(ctx context.Context) { + // Maybe add jitter. Or delay first cleanup by a somewhat random amount. + // After all, this periodic cleanup runs in N kubelet plugins and upon + // driver upgrade they might restart at roughly the same time -- it makes + // sense to smear the API server load out over time. + ticker := time.NewTicker(ResourceClaimCleanupInterval) + defer ticker.Stop() + + m.cleanup(ctx) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if m.enqueueCleanup() { + klog.V(6).Infof("Checkpointed RC cleanup: task submitted") + } else { + // A previous cleanup is taking long; that may not be normal. + klog.Warningf("Checkpointed RC cleanup: ongoing, skipped") + } + } + } +} diff --git a/cmd/compute-domain-kubelet-plugin/device_state.go b/cmd/compute-domain-kubelet-plugin/device_state.go index 2707c9aea..4736747a9 100644 --- a/cmd/compute-domain-kubelet-plugin/device_state.go +++ b/cmd/compute-domain-kubelet-plugin/device_state.go @@ -48,12 +48,13 @@ type DeviceConfigState struct { type DeviceState struct { sync.Mutex - cdi *CDIHandler - computeDomainManager *ComputeDomainManager - allocatable AllocatableDevices - config *Config + cdi *CDIHandler + computeDomainManager *ComputeDomainManager + checkpointCleanupManager *CheckpointCleanupManager + allocatable AllocatableDevices + config *Config + nvdevlib *deviceLib - nvdevlib *deviceLib checkpointManager checkpointmanager.CheckpointManager } @@ -120,6 +121,7 @@ func NewDeviceState(ctx context.Context, config *Config) (*DeviceState, error) { nvdevlib: nvdevlib, checkpointManager: checkpointManager, } + state.checkpointCleanupManager = NewCheckpointCleanupManager(state, config.clientsets.Resource) checkpoints, err := state.checkpointManager.ListCheckpoints() if err != nil { @@ -128,10 +130,12 @@ func NewDeviceState(ctx context.Context, config *Config) (*DeviceState, error) { for _, c := range checkpoints { if c == DriverPluginCheckpointFileBasename { + klog.Infof("Found previous checkpoint: %s", c) return state, nil } } + klog.Infof("Create empty checkpoint") if err := state.createCheckpoint(&Checkpoint{}); err != nil { return nil, fmt.Errorf("unable to create checkpoint: %w", err) } @@ -155,7 +159,7 @@ func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceCl // Make this a noop. Associated device(s) has/ave been prepared by us. // Prepare() must be idempotent, as it may be invoked more than once per // claim (and actual device preparation must happen at most once). - klog.V(6).Infof("skip prepare: claim %v found in checkpoint", claimUID) + klog.V(4).Infof("Skip prepare: claim %v found in checkpoint", claimUID) return preparedClaim.PreparedDevices.GetDevices(), nil } @@ -163,6 +167,8 @@ func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceCl checkpoint.V2.PreparedClaims[claimUID] = PreparedClaim{ CheckpointState: ClaimCheckpointStatePrepareStarted, Status: claim.Status, + Name: claim.Name, + Namespace: claim.Namespace, } }) if err != nil { @@ -179,6 +185,10 @@ func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceCl return nil, fmt.Errorf("unable to create CDI spec file for claim: %w", err) } + // Some errors above along the Prepare() path leave the claim in the + // checkpoint, in the 'PrepareStarted' state. That's deliberate, to annotate + // potentially partially prepared claims. There is an asynchronous + // checkpoint cleanup procedure that identifies when such entry goes stale. err = s.updateCheckpoint(func(checkpoint *Checkpoint) { checkpoint.V2.PreparedClaims[claimUID] = PreparedClaim{ CheckpointState: ClaimCheckpointStatePrepareCompleted, @@ -198,7 +208,7 @@ func (s *DeviceState) Unprepare(ctx context.Context, claimRef kubeletplugin.Name s.Lock() defer s.Unlock() - claimUID := string(claimRef.UID) + klog.V(6).Infof("Unprepare() for claim '%s'", claimRef.String()) // Rely on local checkpoint state for ability to clean up. checkpoint, err := s.getCheckpoint() @@ -206,6 +216,7 @@ func (s *DeviceState) Unprepare(ctx context.Context, claimRef kubeletplugin.Name return fmt.Errorf("unable to get checkpoint: %w", err) } + claimUID := string(claimRef.UID) pc, exists := checkpoint.V2.PreparedClaims[claimUID] if !exists { // Not an error: if this claim UID is not in the checkpoint then this @@ -222,16 +233,20 @@ func (s *DeviceState) Unprepare(ctx context.Context, claimRef kubeletplugin.Name // // TODO: Remove this one release cycle following the v25.3.0 release if pc.Status.Allocation == nil { - klog.Infof("PreparedClaim status was unset in Checkpoint for ResourceClaim %s: attempting to pull it from API server", claimRef.String()) + klog.Infof("PreparedClaim Status not set in Checkpoint for claim '%s': attempting to pull it from API server", claimRef.String()) claim, err := s.config.clientsets.Resource.ResourceClaims(claimRef.Namespace).Get( ctx, claimRef.Name, metav1.GetOptions{}) + // TODO: distinguish errors -- if this is a 'not found' error then this + // is permanent and we may want to drop the claim from the checkpoint. + // Otherwise, this might be worth retrying? if err != nil { return permanentError{fmt.Errorf("failed to fetch ResourceClaim %s: %w", claimRef.String(), err)} } if claim.Status.Allocation == nil { + // TODO: drop claim from checkpoint? return permanentError{fmt.Errorf("no allocation set in ResourceClaim %s", claim.String())} } pc.Status = claim.Status @@ -246,17 +261,18 @@ func (s *DeviceState) Unprepare(ctx context.Context, claimRef kubeletplugin.Name return fmt.Errorf("unsupported ClaimCheckpointState: %v", pc.CheckpointState) } - if err := s.cdi.DeleteClaimSpecFile(claimUID); err != nil { + // Assume that this is a retryable error. If there is any chance for this + // error to be permanent: drop the claim from checkpoint then regardless? + if err := s.cdi.DeleteClaimSpecFileIfExists(claimUID); err != nil { return fmt.Errorf("unable to delete CDI spec file for claim: %w", err) } - // Write new checkpoint reflecting that all devices for this claim have been - // unprepared (by virtue of removing its UID from all mappings). - delete(checkpoint.V2.PreparedClaims, claimUID) - if err := s.createCheckpoint(checkpoint); err != nil { - return fmt.Errorf("create checkpoint failed: %w", err) + // Mutate checkpoint reflecting that all devices for this claim have been + // unprepared, by virtue of removing its UID from the PreparedClaims map. + err = s.deleteClaimFromCheckpoint(claimRef) + if err != nil { + return fmt.Errorf("error deleting claim from checkpoint: %w", err) } - return nil } @@ -272,13 +288,19 @@ func (s *DeviceState) getCheckpoint() (*Checkpoint, error) { return checkpoint.ToLatestVersion(), nil } -func (s *DeviceState) updateCheckpoint(f func(*Checkpoint)) error { +// Read checkpoint from store, perform mutation, and write checkpoint back. Any +// mutation of the checkpoint must go through this function. The +// read-mutate-write sequence must be performed under a lock: we must be +// conceptually certain that multiple read-mutate-write actions never overlap. +// Currently, it is assumed that any caller gets here by first acquiring +// driver's `pulock`. +func (s *DeviceState) updateCheckpoint(mutate func(*Checkpoint)) error { checkpoint, err := s.getCheckpoint() if err != nil { return fmt.Errorf("unable to get checkpoint: %w", err) } - f(checkpoint) + mutate(checkpoint) if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil { return fmt.Errorf("unable to create checkpoint: %w", err) @@ -287,6 +309,17 @@ func (s *DeviceState) updateCheckpoint(f func(*Checkpoint)) error { return nil } +func (s *DeviceState) deleteClaimFromCheckpoint(claimRef kubeletplugin.NamespacedObject) error { + err := s.updateCheckpoint(func(cp *Checkpoint) { + delete(cp.V2.PreparedClaims, string(claimRef.UID)) + }) + if err != nil { + return fmt.Errorf("unable to update checkpoint: %w", err) + } + klog.V(6).Infof("Deleted claim from checkpoint: %s", claimRef.String()) + return nil +} + func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.ResourceClaim) (PreparedDevices, error) { // Generate a mapping of each OpaqueDeviceConfigs to the Device.Results it // applies to. Strict-decode: data is provided by user and may be completely @@ -612,7 +645,7 @@ func (s *DeviceState) assertImexChannelNotAllocated(id int) error { // of being prepared, but either retried soon (in which case we are // faster and win over it) or never retried (in which case we can also // safely allocate). - if claim.CheckpointState != "PrepareCompleted" { + if claim.CheckpointState != ClaimCheckpointStatePrepareCompleted { continue } diff --git a/cmd/compute-domain-kubelet-plugin/driver.go b/cmd/compute-domain-kubelet-plugin/driver.go index 053e99ee8..cc0e4be24 100644 --- a/cmd/compute-domain-kubelet-plugin/driver.go +++ b/cmd/compute-domain-kubelet-plugin/driver.go @@ -119,7 +119,12 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) { } if err := state.computeDomainManager.Start(ctx); err != nil { - return nil, err + return nil, fmt.Errorf("error starting ComputeDomain manager: %w", err) + } + + // Pass `nodeUnprepareResource` function in the cleanup manager. + if err := state.checkpointCleanupManager.Start(ctx, driver.nodeUnprepareResource); err != nil { + return nil, fmt.Errorf("error starting CheckpointCleanupManager: %w", err) } healthcheck, err := startHealthcheck(ctx, config) @@ -129,7 +134,7 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) { driver.healthcheck = healthcheck if err := driver.pluginhelper.PublishResources(ctx, resources); err != nil { - return nil, err + return nil, fmt.Errorf("error in PublishResources(): %w", err) } return driver, nil @@ -144,6 +149,10 @@ func (d *driver) Shutdown() error { return fmt.Errorf("error stopping ComputeDomainManager: %w", err) } + if err := d.state.checkpointCleanupManager.Stop(); err != nil { + return fmt.Errorf("error stopping CheckpointCleanupManager: %w", err) + } + if d.healthcheck != nil { d.healthcheck.Stop() } @@ -251,7 +260,7 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res devs, err := d.state.Prepare(ctx, claim) if err != nil { res := kubeletplugin.PrepareResult{ - Err: fmt.Errorf("error preparing devices for claim %s/%s:%s: %w", claim.Namespace, claim.Name, claim.UID, err), + Err: fmt.Errorf("error preparing devices for claim '%s': %w", ResourceClaimToString(claim), err), } if isPermanentError(err) { klog.Infof("Permanent error preparing devices for claim %v: %v", claim.UID, err) @@ -260,11 +269,13 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res return false, res } - klog.V(1).Infof("prepared devices for claim '%s/%s:%s': %v", claim.Namespace, claim.Name, claim.UID, devs) + klog.V(1).Infof("Prepared devices for claim '%s': %v", ResourceClaimToString(claim), devs) return true, kubeletplugin.PrepareResult{Devices: devs} } +// Return 2-tuple: the first value is a boolean indicating to the retry logic +// whether the work is 'done'. func (d *driver) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) (bool, error) { release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second)) if err != nil { diff --git a/cmd/compute-domain-kubelet-plugin/types.go b/cmd/compute-domain-kubelet-plugin/types.go index f5f09b456..136402ea9 100644 --- a/cmd/compute-domain-kubelet-plugin/types.go +++ b/cmd/compute-domain-kubelet-plugin/types.go @@ -16,8 +16,18 @@ package main +import ( + "fmt" + + resourcev1 "k8s.io/api/resource/v1" +) + const ( ComputeDomainChannelType = "channel" ComputeDomainDaemonType = "daemon" UnknownDeviceType = "unknown" ) + +func ResourceClaimToString(rc *resourcev1.ResourceClaim) string { + return fmt.Sprintf("%s/%s:%s", rc.Namespace, rc.Name, rc.UID) +} diff --git a/pkg/workqueue/workqueue.go b/pkg/workqueue/workqueue.go index 0a39f34ca..ec2d26760 100644 --- a/pkg/workqueue/workqueue.go +++ b/pkg/workqueue/workqueue.go @@ -156,8 +156,8 @@ func (q *WorkQueue) processNextWorkItem(ctx context.Context) { err := q.reconcile(ctx, workItem) if err != nil { // Most often, this is an expected, retryable error in the context of an - // eventually consistent system. Hence, do not log an error level. Rely - // on inner business logic to log unexpected errors on error level. + // eventually consistent system. Hence, do not log on an error level. Rely + // on inner business logic to log unexpected errors on an error level. klog.V(1).Infof("Reconcile: %v", err) // Only retry if we're still the current operation for this key q.Lock() diff --git a/tests/bats/tests.bats b/tests/bats/tests.bats index 498f94e85..36bcd8293 100644 --- a/tests/bats/tests.bats +++ b/tests/bats/tests.bats @@ -193,6 +193,7 @@ log_objects() { @test "NodePrepareResources: catch unknown field in opaque cfg in ResourceClaim" { log_objects + iupgrade_wait "${TEST_CHART_REPO}" "${TEST_CHART_VERSION}" NOARGS envsubst < tests/bats/specs/rc-opaque-cfg-unknown-field.yaml.tmpl > \ "${BATS_TEST_TMPDIR}"/rc-opaque-cfg-unknown-field.yaml @@ -236,6 +237,74 @@ log_objects() { kubectl wait --for=delete "${POD}" --timeout=10s } +@test "Self-initiated unprepare of stale RCs in PrepareStarted" { + log_objects + iupgrade_wait "${TEST_CHART_REPO}" "${TEST_CHART_VERSION}" NOARGS + + # Stage 1: provoke partially prepared claim. + # + # Based on the "catch unknown field in opaque cfg in ResourceClaim" test + # above: Provoke a permanent Prepare() error, leaving behind a partially + # prepared claim in the checkpoint. + envsubst < tests/bats/specs/rc-opaque-cfg-unknown-field.yaml.tmpl > \ + "${BATS_TEST_TMPDIR}"/rc-opaque-cfg-unknown-field.yaml + local SPEC="${BATS_TEST_TMPDIR}/rc-opaque-cfg-unknown-field.yaml" + local POD + POD=$(kubectl create -f "${SPEC}" | grep pod | awk '{print $1;}') + kubectl wait \ + --for=jsonpath='{.status.containerStatuses[0].state.waiting.reason}'=ContainerCreating \ + --timeout=10s \ + "${POD}" + wait_for_pod_event "${POD}" FailedPrepareDynamicResources 10 + run kubectl logs \ + -l nvidia-dra-driver-gpu-component=kubelet-plugin \ + -n nvidia-dra-driver-gpu \ + --prefix --tail=-1 + assert_output --partial 'strict decoding error: unknown field "unexpectedField"' + + # Stage 2: test that cleanup routine leaves this claim alone ('not stale') + # + # Re-install, flip log verbosity just to enforce container restart. This + # ensures that the cleanup runs immediately (it runs upon startup, and then + # only N minutes later again). + local _iargs=("--set" "logVerbosity=5") + iupgrade_wait "${TEST_CHART_REPO}" "${TEST_CHART_VERSION}" _iargs + sleep 1 # give the on-startup cleanup a chance to run. + run kubectl logs \ + -l nvidia-dra-driver-gpu-component=kubelet-plugin \ + -n nvidia-dra-driver-gpu \ + --prefix --tail=-1 + assert_output --partial "partially prepared claim not stale: default/batssuite-rc-bad-opaque-config" + + # Stage 3: simulate stale claim, test cleanup. + # + # To that end, uninstall the driver and then remove both pod and RC from the API server. + # Then, re-install DRA driver and confirm detection and removal of stale claim. + helm uninstall -n nvidia-dra-driver-gpu nvidia-dra-driver-gpu-batssuite --wait + kubectl delete "${POD}" --force + kubectl delete resourceclaim batssuite-rc-bad-opaque-config + local _iargs=("--set" "logVerbosity=6") + iupgrade_wait "${TEST_CHART_REPO}" "${TEST_CHART_VERSION}" _iargs + sleep 1 # give the on-startup cleanup a chance to run. + + run kubectl logs \ + -l nvidia-dra-driver-gpu-component=kubelet-plugin \ + -n nvidia-dra-driver-gpu \ + --prefix --tail=-1 + assert_output --partial "Deleted claim from checkpoint: default/batssuite-rc-bad-opaque-config" + assert_output --partial "Checkpointed RC cleanup: unprepared stale claim: default/batssuite-rc-bad-opaque-config" + + # Stage 4: appendix -- happens shortly thereafter: we do get a + # UnprepareResourceClaims() call for this claim. Why? It's a noop because the + # cleanup above was faster. + sleep 4 + run kubectl logs \ + -l nvidia-dra-driver-gpu-component=kubelet-plugin \ + -n nvidia-dra-driver-gpu \ + --prefix --tail=-1 + assert_output --partial "Unprepare noop: claim not found in checkpoint data" +} + @test "nickelpie (NCCL send/recv/broadcast, 2 pods, 2 nodes, small payload)" { log_objects