Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions cmd/compute-domain-daemon/computedomain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
)

const (
informerResyncPeriod = 10 * time.Minute
// Detecting when a CD daemon transitions from NotReady to Ready (based on
// the startup probe) at the moment sometimes requires an informer resync,
// see https://github.com/NVIDIA/k8s-dra-driver-gpu/issues/742.
informerResyncPeriod = 4 * time.Minute
mutationCacheTTL = time.Hour
)

Expand Down Expand Up @@ -113,12 +116,14 @@ func (m *ComputeDomainManager) Start(ctx context.Context) (rerr error) {

m.podManager = NewPodManager(m.config, m.Get, m.mutationCache)

// Use `WithKey` with hard-coded key, to cancel any previous update task (we
// want to make sure that the latest CD status update wins).
_, err = m.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
m.config.workQueue.Enqueue(obj, m.onAddOrUpdate)
m.config.workQueue.EnqueueWithKey(obj, "cd", m.onAddOrUpdate)
},
UpdateFunc: func(objOld, objNew any) {
m.config.workQueue.Enqueue(objNew, m.onAddOrUpdate)
m.config.workQueue.EnqueueWithKey(objNew, "cd", m.onAddOrUpdate)
},
})
if err != nil {
Expand Down Expand Up @@ -213,18 +218,20 @@ func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error
return nil
}

// Update node info in ComputeDomain.
if err := m.UpdateComputeDomainNodeInfo(ctx, cd); err != nil {
return fmt.Errorf("error updating node info in ComputeDomain: %w", err)
// Update node info in ComputeDomain, if required.
if err := m.EnsureNodeInfoInCD(ctx, cd); err != nil {
return fmt.Errorf("CD update: failed to insert/update node info in CD: %w", err)
}

return nil
}

// UpdateComputeDomainNodeInfo updates the Nodes field in the ComputeDomain with
// info about the ComputeDomain daemon running on this node. Upon success, it
// reflects the mutation in `m.mutationCache`.
func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context, cd *nvapi.ComputeDomain) (rerr error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt like renaming this from UpdateComputeDomainNodeInfo to EnsureNodeInfoInCD after I repeatedly found myself slightly confused about the high-level responsibility of this method.

// EnsureNodeInfoInCD makes sure that the current node (by node name) is
// represented in the `Nodes` field in the ComputeDomain object, and that it
// reports the IP address of this current pod running the CD daemon. If mutation
// is needed (first insertion, or IP address update) and successful, it reflects
// the mutation in `m.mutationCache`.
func (m *ComputeDomainManager) EnsureNodeInfoInCD(ctx context.Context, cd *nvapi.ComputeDomain) (rerr error) {
var nodeInfo *nvapi.ComputeDomainNode

// Create a deep copy of the ComputeDomain to avoid modifying the original
Expand All @@ -246,6 +253,7 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context,

// If there is one and its IP is the same as this one, we are done
if nodeInfo != nil && nodeInfo.IPAddress == m.config.podIP {
klog.V(6).Infof("EnsureNodeInfoInCD noop: pod IP unchanged (%s)", m.config.podIP)
return nil
}

Expand All @@ -261,7 +269,8 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context,
Name: m.config.nodeName,
CliqueID: m.config.cliqueID,
Index: nextIndex,
Status: nvapi.ComputeDomainStatusNotReady,
// This is going to be switched to Ready by podmanager.
Status: nvapi.ComputeDomainStatusNotReady,
}

klog.Infof("CD status does not contain node name '%s' yet, try to insert myself: %v", m.config.nodeName, nodeInfo)
Expand All @@ -286,7 +295,7 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context,
}
m.mutationCache.Mutation(newCD)

klog.V(2).Infof("Successfully updated CD")
klog.Infof("Successfully inserted/updated node in CD (nodeinfo: %v)", nodeInfo)
return nil
}

Expand Down Expand Up @@ -364,7 +373,9 @@ func (m *ComputeDomainManager) MaybePushNodesUpdate(cd *nvapi.ComputeDomain) {
// perform a stable sort of IP addresses before writing them to the nodes
// config file.
if !maps.Equal(newIPs, previousIPs) {
klog.Infof("IP set changed: previous: %v; new: %v", previousIPs, newIPs)
Copy link
Collaborator Author

@jgehrcke jgehrcke Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bulk of the log volume emitted by the CD daemon is dominated by this; we must not log all of this on level zero.

example:
Image

klog.V(2).Infof("IP set changed")
// This log message gets large for large node numbers
klog.V(6).Infof("previous: %v; new: %v", previousIPs, newIPs)
m.previousNodes = cd.Status.Nodes
m.updatedNodesChan <- cd.Status.Nodes
} else {
Expand Down
2 changes: 1 addition & 1 deletion cmd/compute-domain-daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewController(config *ControllerConfig) (*Controller, error) {
return nil, fmt.Errorf("failed to create client sets: %v", err)
}

workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())
workQueue := workqueue.New(workqueue.DefaultCDDaemonRateLimiter())

mc := &ManagerConfig{
workQueue: workQueue,
Expand Down
21 changes: 17 additions & 4 deletions cmd/compute-domain-daemon/dnsnames.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package main

import (
"cmp"
"fmt"
"maps"
"os"
"path/filepath"
"slices"
"strings"
"sync"

Expand All @@ -32,7 +34,7 @@ import (
const (
hostsFilePath = "/etc/hosts"
dnsNamePrefix = "compute-domain-daemon-"
dnsNameFormat = dnsNamePrefix + "%d"
dnsNameFormat = dnsNamePrefix + "%04d"
)

// IPToDNSNameMap holds a map of IP Addresses to DNS names.
Expand Down Expand Up @@ -109,9 +111,20 @@ func (m *DNSNameManager) LogDNSNameMappings() {
return
}

klog.Infof("Current compute-domain-daemon mappings:")
for ip, dnsName := range m.ipToDNSName {
klog.Infof(" %s -> %s", ip, dnsName)
// Sort alphabetically by DNS name (map value) -> sort ips (map keys) based
// on their corresponding values.
var ips []string
for ip := range m.ipToDNSName {
ips = append(ips, ip)
}

slices.SortFunc(ips, func(a, b string) int {
return cmp.Compare(m.ipToDNSName[a], m.ipToDNSName[b])
})

for _, ip := range ips {
dnsname := m.ipToDNSName[ip]
klog.Infof("%s -> %s", dnsname, ip)
}
}

Expand Down
23 changes: 16 additions & 7 deletions cmd/compute-domain-daemon/podmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ func (pm *PodManager) Start(ctx context.Context) error {
pm.cancelContext = cancel

_, err := pm.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// Use `WithKey` with hard-coded key, to cancel any previous update task
// (we want to make sure that the latest pod status update wins).
AddFunc: func(obj any) {
pm.config.workQueue.Enqueue(obj, pm.addOrUpdate)
pm.config.workQueue.EnqueueWithKey(obj, "pod", pm.addOrUpdate)
},
UpdateFunc: func(oldObj, newObj any) {
pm.config.workQueue.Enqueue(newObj, pm.addOrUpdate)
pm.config.workQueue.EnqueueWithKey(newObj, "pod", pm.addOrUpdate)
},
})
if err != nil {
Expand Down Expand Up @@ -130,7 +132,7 @@ func (pm *PodManager) addOrUpdate(ctx context.Context, obj any) error {
}

if err := pm.updateNodeStatus(ctx, status); err != nil {
return fmt.Errorf("failed to update node status: %w", err)
return fmt.Errorf("pod update: failed to update note status in CD (%s): %w", status, err)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the wrapper (workqueue) does not enrich the error message with meaningful context information, and so I added pod update: here -- makes it easier to understand what a log message means. Example:

I1119 22:10:21.531887       1 workqueue.go:197] Reconcile: pod update: failed to update note status in CD (Ready): simulated error 5 (attempt 5)

}

return nil
Expand All @@ -153,7 +155,8 @@ func (pm *PodManager) isPodReady(pod *corev1.Pod) bool {
return false
}

// updateNodeStatus updates the status of the current node in the CD status.
// updateNodeStatus updates the status of the current node (the status of the
// pod running the CD daemon) in the CD status.
func (pm *PodManager) updateNodeStatus(ctx context.Context, status string) error {
// Get the current CD using the provided function
cd, err := pm.getComputeDomain(pm.config.computeDomainUUID)
Expand All @@ -176,13 +179,19 @@ func (pm *PodManager) updateNodeStatus(ctx context.Context, status string) error
}
}

// If node not found, exit early
// If node not found, exit early. Here, we could also assert `status ==
// NotReady`, assumption: the CD daemon only starts after the CD manager has
// performed the node info insert (leading to node != nil below), and the
// pod can only change its status to Ready after the CD daemon has started.
// Return explicit error that is being retried, and rely on the retry chain
// to be canceled by a newer incoming pod update).
if node == nil {
return nil
return fmt.Errorf("node not yet listed in CD (waiting for insertion)")
}

// If status hasn't changed, exit early
if node.Status == status {
klog.V(6).Infof("updateNodeStatus noop: status not changed (%s)", status)
return nil
}

Expand All @@ -198,6 +207,6 @@ func (pm *PodManager) updateNodeStatus(ctx context.Context, status string) error
}
pm.computeDomainMutationCache.Mutation(newCD)

klog.Infof("Successfully updated node %s status to %s", pm.config.nodeName, status)
klog.Infof("Successfully updated node status in CD (new nodeinfo: %v)", node)
return nil
}
67 changes: 67 additions & 0 deletions pkg/workqueue/jitterlimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package workqueue

import (
"math/rand"
"time"

"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

// Jitter relative to the delay yielded by the inner limiter. Example: a factor
// of 0.1 translates to a jitter interval with a width of 10 % compared to the
// inner delay, and centered around the inner delay time (resulting in +/- 5 %
// deviation compared to the inner delay time).
type JitterRL[T comparable] struct {
inner workqueue.TypedRateLimiter[T]
factor float64
}

func NewJitterRateLimiter[T comparable](inner workqueue.TypedRateLimiter[T], factor float64) workqueue.TypedRateLimiter[T] {
if factor >= 1.0 {
panic("factor must be < 1.0")
}
return &JitterRL[T]{inner: inner, factor: factor}
}

func (j *JitterRL[T]) When(item T) time.Duration {
// Get inner limiter's delay.
d := j.inner.When(item)

// Calculate jitter interval width W_j relative to the delay time given by
// the inner limiter.
jitterWidthSeconds := d.Seconds() * j.factor

// Get random number in the interval [-W_j/2, W_j/2).
jitterSeconds := jitterWidthSeconds * (rand.Float64() - 0.5)

delay := d + time.Duration(jitterSeconds*float64(time.Second))
klog.V(7).Infof("inner: %.5f s, jittered: %.5f s", d.Seconds(), delay.Seconds())

return delay
}

func (j *JitterRL[T]) Forget(item T) {
j.inner.Forget(item)
}

func (j *JitterRL[T]) NumRequeues(item T) int {
return j.inner.NumRequeues(item)
}
16 changes: 13 additions & 3 deletions pkg/workqueue/workqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ func DefaultPrepUnprepRateLimiter() workqueue.TypedRateLimiter[any] {
// This is a per-item exponential backoff limiter. Each time an item
// fails and is retried, the delay grows exponentially starting from the
// lower value up to the upper bound.
workqueue.NewTypedItemExponentialFailureRateLimiter[any](250*time.Millisecond, 3000*time.Second),
workqueue.NewTypedItemExponentialFailureRateLimiter[any](250*time.Millisecond, 3000*time.Millisecond),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Global (not per-item) rate limiter. Allows up to 5 retries per
// second, with bursts of up to 10.
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(5), 10)},
)
}

func DefaultCDDaemonRateLimiter() workqueue.TypedRateLimiter[any] {
return NewJitterRateLimiter(workqueue.NewTypedItemExponentialFailureRateLimiter[any](5*time.Millisecond, 6000*time.Millisecond), 0.5)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought quite a bit about these numbers, but of course these are just an attempt to pick something meaningful -- we will see over time if and how we want to change method and parameters.

}

func DefaultControllerRateLimiter() workqueue.TypedRateLimiter[any] {
return workqueue.DefaultTypedControllerRateLimiter[any]()
}
Expand Down Expand Up @@ -136,6 +140,11 @@ func (q *WorkQueue) EnqueueWithKey(obj any, key string, callback func(ctx contex

q.Lock()
q.activeOps[key] = workItem
// Do we also want to make sure here that a previously enqueued task for
// this key isn't going to be run anymore, if not yet started? Currently,
// the next-scheduled retry attempt is still executed, and business logic is
// hopefully resilient enough.
klog.V(7).Infof("enqueue with key: %s", key)
q.queue.AddRateLimited(workItem)
q.Unlock()
}
Expand All @@ -153,16 +162,17 @@ func (q *WorkQueue) processNextWorkItem(ctx context.Context) {
return
}

attempts := q.queue.NumRequeues(item)
err := q.reconcile(ctx, workItem)
if err != nil {
// Most often, this is an expected, retryable error in the context of an
// eventually consistent system. Hence, do not log on an error level. Rely
// on inner business logic to log unexpected errors on an error level.
klog.V(1).Infof("Reconcile: %v", err)
klog.Infof("Reconcile: %v (attempt %d)", err, attempts)
// Only retry if we're still the current operation for this key
q.Lock()
if q.activeOps[workItem.Key] != nil && q.activeOps[workItem.Key] != workItem {
klog.Errorf("Work item with key '%s' has been replaced with a newer enqueued one, not retrying", workItem.Key)
klog.Infof("Do not re-enqueue failed work item with key '%s': a newer item was enqueued", workItem.Key)
q.queue.Forget(workItem)
} else {
q.queue.AddRateLimited(workItem)
Expand Down
2 changes: 1 addition & 1 deletion tests/bats/test_cd_misc.bats
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ bats::on_failure() {
cat "$LOGPATH" | tail -n 50

# Explicitly confirm cleanup-on-shutdown behavior by inspecting CD log.
cat "$LOGPATH" | grep -e "Successfully updated node .* status to NotReady"
cat "$LOGPATH" | tail -n 50 | grep -e "updated node status in CD (new nodeinfo: .* NotReady"
cat "$LOGPATH" | grep "Successfully removed node" | \
grep "from ComputeDomain default/imex-channel-injection"

Expand Down