Skip to content

Commit 980a6a1

Browse files
committed
CD daemon: pod mngr: store UpdateStatus return value in mutation cache
This makes sure that fast incremental mutations on the same CD object performed during shutdown are done conflict-free (i.e., in actual, incremental fashion using intermediate state returned by the API server). Without this patch: I1007 16:49:01.678050 1 podmanager.go:196] Successfully updated node gb-nvl-043-compute06 status to NotReady E1007 16:49:01.681345 1 computedomain.go:161] Failed to remove node from ComputeDomain during shutdown: [...] \ "the object has been modified" [...] With this patch: I1007 16:59:55.350436 1 podmanager.go:200] Successfully updated node gb-nvl-043-compute07 status to NotReady I1007 16:59:55.353551 1 computedomain.go:402] Successfully removed node with IP 192.168.34.153 from ComputeDomain default/imex-channel-injection Signed-off-by: Dr. Jan-Philip Gehrcke <[email protected]>
1 parent 4b91fce commit 980a6a1

File tree

2 files changed

+41
-35
lines changed

2 files changed

+41
-35
lines changed

cmd/compute-domain-daemon/computedomain.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (m *ComputeDomainManager) Start(ctx context.Context) (rerr error) {
111111
true,
112112
)
113113

114-
m.podManager = NewPodManager(m.config, m.Get, &m.mutationCache)
114+
m.podManager = NewPodManager(m.config, m.Get, m.mutationCache)
115115

116116
_, err = m.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
117117
AddFunc: func(obj any) {
@@ -192,8 +192,9 @@ func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error
192192
return fmt.Errorf("failed to cast to ComputeDomain")
193193
}
194194

195-
// Get the latest ComputeDomain object from the informer cache since we
196-
// plan to update it later and always *must* have the latest version.
195+
// Get the latest ComputeDomain object from the mutation cache (backed by
196+
// the informer cache) since we plan to update it later and always *must*
197+
// have the latest version.
197198
cd, err := m.Get(string(o.GetUID()))
198199
if err != nil {
199200
return fmt.Errorf("error getting latest ComputeDomain: %w", err)
@@ -216,8 +217,9 @@ func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error
216217
return nil
217218
}
218219

219-
// UpdateComputeDomainNodeInfo updates the Nodes field in the ComputeDomain
220-
// with info about the ComputeDomain daemon running on this node.
220+
// UpdateComputeDomainNodeInfo updates the Nodes field in the ComputeDomain with
221+
// info about the ComputeDomain daemon running on this node. Upon success, it
222+
// reflects the mutation in `m.mutationCache`.
221223
func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context, cd *nvapi.ComputeDomain) (rerr error) {
222224
var nodeInfo *nvapi.ComputeDomainNode
223225

@@ -269,12 +271,12 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context,
269271
newCD.Status.Status = nvapi.ComputeDomainStatusNotReady
270272
}
271273

272-
// Update the status
273-
if _, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(newCD.Namespace).UpdateStatus(ctx, newCD, metav1.UpdateOptions{}); err != nil {
274-
return fmt.Errorf("error updating nodes in ComputeDomain status: %w", err)
274+
// Update status and (upon success) store the latest version of the object
275+
// (as returned by the API server) in the mutation cache.
276+
newCD, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(newCD.Namespace).UpdateStatus(ctx, newCD, metav1.UpdateOptions{})
277+
if err != nil {
278+
return fmt.Errorf("error updating ComputeDomain status: %w", err)
275279
}
276-
277-
// Add the updated ComputeDomain to the mutation cache
278280
m.mutationCache.Mutation(newCD)
279281

280282
return nil
@@ -361,15 +363,13 @@ func (m *ComputeDomainManager) GetNodesUpdateChan() chan []*nvapi.ComputeDomainN
361363

362364
// removeNodeFromComputeDomain removes the current node's entry from the ComputeDomain status.
363365
func (m *ComputeDomainManager) removeNodeFromComputeDomain(ctx context.Context) error {
364-
objs := m.informer.GetIndexer().List()
365-
if len(objs) == 0 {
366-
klog.Infof("No ComputeDomain objects found in informer cache during cleanup")
367-
return nil
366+
cd, err := m.Get(m.config.computeDomainUUID)
367+
if err != nil {
368+
return fmt.Errorf("error getting ComputeDomain from mutation cache: %w", err)
368369
}
369-
370-
cd, ok := objs[0].(*nvapi.ComputeDomain)
371-
if !ok {
372-
return fmt.Errorf("failed to cast object to ComputeDomain")
370+
if cd == nil {
371+
klog.Infof("No ComputeDomain object found in mutation cache during cleanup")
372+
return nil
373373
}
374374

375375
newCD := cd.DeepCopy()
@@ -397,6 +397,9 @@ func (m *ComputeDomainManager) removeNodeFromComputeDomain(ctx context.Context)
397397
return fmt.Errorf("error removing node from ComputeDomain status: %w", err)
398398
}
399399

400+
// Add the updated ComputeDomain to the mutation cache
401+
m.mutationCache.Mutation(newCD)
402+
400403
klog.Infof("Successfully removed node with IP %s from ComputeDomain %s/%s", m.config.podIP, newCD.Namespace, newCD.Name)
401404
return nil
402405
}

cmd/compute-domain-daemon/podmanager.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@ import (
3333

3434
// PodManager watches for changes to its own pod and updates the CD status accordingly.
3535
type PodManager struct {
36-
config *ManagerConfig
37-
getComputeDomain GetComputeDomainFunc
38-
waitGroup sync.WaitGroup
39-
cancelContext context.CancelFunc
40-
podInformer cache.SharedIndexInformer
41-
informerFactory informers.SharedInformerFactory
42-
mutationCache cache.MutationCache
36+
config *ManagerConfig
37+
getComputeDomain GetComputeDomainFunc
38+
waitGroup sync.WaitGroup
39+
cancelContext context.CancelFunc
40+
podInformer cache.SharedIndexInformer
41+
informerFactory informers.SharedInformerFactory
42+
computeDomainMutationCache cache.MutationCache
4343
}
4444

4545
// NewPodManager creates a new PodManager instance.
46-
func NewPodManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc, mutationCache *cache.MutationCache) *PodManager {
46+
func NewPodManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc, mutationCache cache.MutationCache) *PodManager {
4747
informerFactory := informers.NewSharedInformerFactoryWithOptions(
4848
config.clientsets.Core,
4949
informerResyncPeriod,
@@ -56,11 +56,11 @@ func NewPodManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc,
5656
podInformer := informerFactory.Core().V1().Pods().Informer()
5757

5858
p := &PodManager{
59-
config: config,
60-
getComputeDomain: getComputeDomain,
61-
podInformer: podInformer,
62-
informerFactory: informerFactory,
63-
mutationCache: *mutationCache,
59+
config: config,
60+
getComputeDomain: getComputeDomain,
61+
podInformer: podInformer,
62+
informerFactory: informerFactory,
63+
computeDomainMutationCache: mutationCache,
6464
}
6565

6666
return p
@@ -113,6 +113,7 @@ func (pm *PodManager) Stop() error {
113113
}
114114

115115
pm.waitGroup.Wait()
116+
klog.Infof("Terminating: pod manager")
116117
return nil
117118
}
118119

@@ -188,13 +189,15 @@ func (pm *PodManager) updateNodeStatus(ctx context.Context, status string) error
188189
// Update the node status to the new value
189190
node.Status = status
190191

191-
// Update the CD status
192-
if _, err := pm.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(newCD.Namespace).UpdateStatus(ctx, newCD, metav1.UpdateOptions{}); err != nil {
192+
// Update the CD status. Store the CD object returned by the API server in
193+
// the mutationCache (it now has a newer ResourceVersion than `newCD` and
194+
// hence any further mutations should be performed based on that).
195+
newCD, err = pm.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(newCD.Namespace).UpdateStatus(ctx, newCD, metav1.UpdateOptions{})
196+
if err != nil {
193197
return fmt.Errorf("error updating node status in ComputeDomain: %w", err)
194198
}
199+
pm.computeDomainMutationCache.Mutation(newCD)
195200

196201
klog.Infof("Successfully updated node %s status to %s", pm.config.nodeName, status)
197-
198-
pm.mutationCache.Mutation(newCD)
199202
return nil
200203
}

0 commit comments

Comments
 (0)