Skip to content

Commit e089759

Browse files
authored
Merge pull request #651 from jgehrcke/jp/issue-694
CD daemon: coordinate CD updates on shutdown via mutation cache
2 parents 765892d + e9f647e commit e089759

File tree

3 files changed

+85
-31
lines changed

3 files changed

+85
-31
lines changed

cmd/compute-domain-daemon/computedomain.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ func NewComputeDomainManager(config *ManagerConfig) *ComputeDomainManager {
7878
previousNodes: []*nvapi.ComputeDomainNode{},
7979
updatedNodesChan: make(chan []*nvapi.ComputeDomainNode),
8080
}
81-
m.podManager = NewPodManager(config, m.Get)
8281

8382
return m
8483
}
@@ -112,6 +111,8 @@ func (m *ComputeDomainManager) Start(ctx context.Context) (rerr error) {
112111
true,
113112
)
114113

114+
m.podManager = NewPodManager(m.config, m.Get, m.mutationCache)
115+
115116
_, err = m.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
116117
AddFunc: func(obj any) {
117118
m.config.workQueue.Enqueue(obj, m.onAddOrUpdate)
@@ -191,8 +192,9 @@ func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error
191192
return fmt.Errorf("failed to cast to ComputeDomain")
192193
}
193194

194-
// Get the latest ComputeDomain object from the informer cache since we
195-
// plan to update it later and always *must* have the latest version.
195+
// Get the latest ComputeDomain object from the mutation cache (backed by
196+
// the informer cache) since we plan to update it later and always *must*
197+
// have the latest version.
196198
cd, err := m.Get(string(o.GetUID()))
197199
if err != nil {
198200
return fmt.Errorf("error getting latest ComputeDomain: %w", err)
@@ -215,8 +217,9 @@ func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error
215217
return nil
216218
}
217219

218-
// UpdateComputeDomainNodeInfo updates the Nodes field in the ComputeDomain
219-
// with info about the ComputeDomain daemon running on this node.
220+
// UpdateComputeDomainNodeInfo updates the Nodes field in the ComputeDomain with
221+
// info about the ComputeDomain daemon running on this node. Upon success, it
222+
// reflects the mutation in `m.mutationCache`.
220223
func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context, cd *nvapi.ComputeDomain) (rerr error) {
221224
var nodeInfo *nvapi.ComputeDomainNode
222225

@@ -268,12 +271,12 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context,
268271
newCD.Status.Status = nvapi.ComputeDomainStatusNotReady
269272
}
270273

271-
// Update the status
272-
if _, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(newCD.Namespace).UpdateStatus(ctx, newCD, metav1.UpdateOptions{}); err != nil {
273-
return fmt.Errorf("error updating nodes in ComputeDomain status: %w", err)
274+
// Update status and (upon success) store the latest version of the object
275+
// (as returned by the API server) in the mutation cache.
276+
newCD, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(newCD.Namespace).UpdateStatus(ctx, newCD, metav1.UpdateOptions{})
277+
if err != nil {
278+
return fmt.Errorf("error updating ComputeDomain status: %w", err)
274279
}
275-
276-
// Add the updated ComputeDomain to the mutation cache
277280
m.mutationCache.Mutation(newCD)
278281

279282
return nil
@@ -360,15 +363,13 @@ func (m *ComputeDomainManager) GetNodesUpdateChan() chan []*nvapi.ComputeDomainN
360363

361364
// removeNodeFromComputeDomain removes the current node's entry from the ComputeDomain status.
362365
func (m *ComputeDomainManager) removeNodeFromComputeDomain(ctx context.Context) error {
363-
objs := m.informer.GetIndexer().List()
364-
if len(objs) == 0 {
365-
klog.Infof("No ComputeDomain objects found in informer cache during cleanup")
366-
return nil
366+
cd, err := m.Get(m.config.computeDomainUUID)
367+
if err != nil {
368+
return fmt.Errorf("error getting ComputeDomain from mutation cache: %w", err)
367369
}
368-
369-
cd, ok := objs[0].(*nvapi.ComputeDomain)
370-
if !ok {
371-
return fmt.Errorf("failed to cast object to ComputeDomain")
370+
if cd == nil {
371+
klog.Infof("No ComputeDomain object found in mutation cache during cleanup")
372+
return nil
372373
}
373374

374375
newCD := cd.DeepCopy()
@@ -391,10 +392,13 @@ func (m *ComputeDomainManager) removeNodeFromComputeDomain(ctx context.Context)
391392
newCD.Status.Status = nvapi.ComputeDomainStatusNotReady
392393
}
393394

395+
// Update status and (upon success) store the latest version of the object
396+
// (as returned by the API server) in the mutation cache.
394397
newCD.Status.Nodes = updatedNodes
395398
if _, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(newCD.Namespace).UpdateStatus(ctx, newCD, metav1.UpdateOptions{}); err != nil {
396399
return fmt.Errorf("error removing node from ComputeDomain status: %w", err)
397400
}
401+
m.mutationCache.Mutation(newCD)
398402

399403
klog.Infof("Successfully removed node with IP %s from ComputeDomain %s/%s", m.config.podIP, newCD.Namespace, newCD.Name)
400404
return nil

cmd/compute-domain-daemon/podmanager.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,17 @@ import (
3333

3434
// PodManager watches for changes to its own pod and updates the CD status accordingly.
3535
type PodManager struct {
36-
config *ManagerConfig
37-
getComputeDomain GetComputeDomainFunc
38-
waitGroup sync.WaitGroup
39-
cancelContext context.CancelFunc
40-
podInformer cache.SharedIndexInformer
41-
informerFactory informers.SharedInformerFactory
36+
config *ManagerConfig
37+
getComputeDomain GetComputeDomainFunc
38+
waitGroup sync.WaitGroup
39+
cancelContext context.CancelFunc
40+
podInformer cache.SharedIndexInformer
41+
informerFactory informers.SharedInformerFactory
42+
computeDomainMutationCache cache.MutationCache
4243
}
4344

4445
// NewPodManager creates a new PodManager instance.
45-
func NewPodManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc) *PodManager {
46+
func NewPodManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc, mutationCache cache.MutationCache) *PodManager {
4647
informerFactory := informers.NewSharedInformerFactoryWithOptions(
4748
config.clientsets.Core,
4849
informerResyncPeriod,
@@ -55,10 +56,11 @@ func NewPodManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc)
5556
podInformer := informerFactory.Core().V1().Pods().Informer()
5657

5758
p := &PodManager{
58-
config: config,
59-
getComputeDomain: getComputeDomain,
60-
podInformer: podInformer,
61-
informerFactory: informerFactory,
59+
config: config,
60+
getComputeDomain: getComputeDomain,
61+
podInformer: podInformer,
62+
informerFactory: informerFactory,
63+
computeDomainMutationCache: mutationCache,
6264
}
6365

6466
return p
@@ -111,6 +113,7 @@ func (pm *PodManager) Stop() error {
111113
}
112114

113115
pm.waitGroup.Wait()
116+
klog.Infof("Terminating: pod manager")
114117
return nil
115118
}
116119

@@ -186,10 +189,14 @@ func (pm *PodManager) updateNodeStatus(ctx context.Context, status string) error
186189
// Update the node status to the new value
187190
node.Status = status
188191

189-
// Update the CD status
190-
if _, err := pm.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(newCD.Namespace).UpdateStatus(ctx, newCD, metav1.UpdateOptions{}); err != nil {
192+
// Update the CD status. Store the CD object returned by the API server in
193+
// the mutationCache (it now has a newer ResourceVersion than `newCD` and
194+
// hence any further mutations should be performed based on that).
195+
newCD, err = pm.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(newCD.Namespace).UpdateStatus(ctx, newCD, metav1.UpdateOptions{})
196+
if err != nil {
191197
return fmt.Errorf("error updating node status in ComputeDomain: %w", err)
192198
}
199+
pm.computeDomainMutationCache.Mutation(newCD)
193200

194201
klog.Infof("Successfully updated node %s status to %s", pm.config.nodeName, status)
195202
return nil

tests/bats/tests.bats

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,49 @@ log_objects() {
143143
kubectl delete -f demo/specs/imex/channel-injection-all.yaml
144144
}
145145

146+
@test "CD daemon shutdown: confirm CD status cleanup" {
147+
log_objects
148+
149+
kubectl apply -f demo/specs/imex/channel-injection.yaml
150+
kubectl wait --for=condition=READY pods imex-channel-injection --timeout=100s
151+
run kubectl logs imex-channel-injection
152+
assert_output --partial "channel0"
153+
154+
local LOGPATH="${BATS_TEST_TMPDIR}/cd-daemon.log"
155+
local PNAME
156+
PNAME=$( \
157+
kubectl get pods -n nvidia-dra-driver-gpu | \
158+
grep imex-channel-injection | \
159+
awk '{print $1}'
160+
)
161+
162+
# Expect `nodes` key to be present in CD status.
163+
run bats_pipe kubectl get computedomain imex-channel-injection -o json \| jq '.status'
164+
assert_output --partial 'nodes'
165+
166+
echo "attach background log follower to daemon pod: $PNAME"
167+
kubectl logs -n nvidia-dra-driver-gpu --follow "$PNAME" > "$LOGPATH" 2>&1 &
168+
kubectl delete pods imex-channel-injection
169+
170+
# Note: the log follower child process terminates when the pod terminates.
171+
kubectl wait --for=delete pods imex-channel-injection --timeout=10s
172+
173+
# Expect `nodes` key to not be be present (single-node CD).
174+
run bats_pipe kubectl get computedomain imex-channel-injection -o json \| jq '.status'
175+
refute_output --partial 'nodes'
176+
177+
# Inspect CD daemon log, dump tail for easier debug-on-failure.
178+
cat "$LOGPATH" | tail -n 50
179+
180+
# Explicitly confirm cleanup-on-shutdown behavior by inspecting CD log.
181+
cat "$LOGPATH" | grep -e "Successfully updated node .* status to NotReady"
182+
cat "$LOGPATH" | grep "Successfully removed node" | \
183+
grep "from ComputeDomain default/imex-channel-injection"
184+
185+
# Delete CD.
186+
kubectl delete computedomain imex-channel-injection
187+
}
188+
146189
@test "NodePrepareResources: catch unknown field in opaque cfg in ResourceClaim" {
147190
log_objects
148191

0 commit comments

Comments
 (0)