Skip to content

Commit 416c2c6

Browse files
authored
Merge pull request #632 from klueska/single-informer-cd-ds-pods
Use single informer for all ComputeDomain daemonset pods
2 parents 992f893 + 8f3081c commit 416c2c6

File tree

2 files changed

+22
-125
lines changed

2 files changed

+22
-125
lines changed

cmd/compute-domain-controller/daemonset.go

Lines changed: 9 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ type DaemonSetManager struct {
6666
informer cache.SharedIndexInformer
6767
mutationCache cache.MutationCache
6868

69+
daemonsetPodManager *DaemonSetPodManager
6970
resourceClaimTemplateManager *DaemonSetResourceClaimTemplateManager
7071
cleanupManager *CleanupManager[*appsv1.DaemonSet]
71-
podManagers map[string]*DaemonSetPodManager
7272
}
7373

7474
func NewDaemonSetManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc) *DaemonSetManager {
@@ -84,10 +84,10 @@ func NewDaemonSetManager(config *ManagerConfig, getComputeDomain GetComputeDomai
8484
factory := informers.NewSharedInformerFactoryWithOptions(
8585
config.clientsets.Core,
8686
informerResyncPeriod,
87+
informers.WithNamespace(config.driverNamespace),
8788
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
8889
opts.LabelSelector = metav1.FormatLabelSelector(labelSelector)
8990
}),
90-
informers.WithNamespace(config.driverNamespace),
9191
)
9292

9393
informer := factory.Apps().V1().DaemonSets().Informer()
@@ -97,8 +97,8 @@ func NewDaemonSetManager(config *ManagerConfig, getComputeDomain GetComputeDomai
9797
getComputeDomain: getComputeDomain,
9898
factory: factory,
9999
informer: informer,
100-
podManagers: make(map[string]*DaemonSetPodManager),
101100
}
101+
m.daemonsetPodManager = NewDaemonSetPodManager(config, getComputeDomain)
102102
m.resourceClaimTemplateManager = NewDaemonSetResourceClaimTemplateManager(config, getComputeDomain)
103103
m.cleanupManager = NewCleanupManager[*appsv1.DaemonSet](informer, getComputeDomain, m.cleanup)
104104

@@ -151,6 +151,10 @@ func (m *DaemonSetManager) Start(ctx context.Context) (rerr error) {
151151
return fmt.Errorf("informer cache sync for DaemonSet failed")
152152
}
153153

154+
if err := m.daemonsetPodManager.Start(ctx); err != nil {
155+
return fmt.Errorf("error starting ResourceClaimTemplate manager: %w", err)
156+
}
157+
154158
if err := m.resourceClaimTemplateManager.Start(ctx); err != nil {
155159
return fmt.Errorf("error starting ResourceClaimTemplate manager: %w", err)
156160
}
@@ -163,8 +167,8 @@ func (m *DaemonSetManager) Start(ctx context.Context) (rerr error) {
163167
}
164168

165169
func (m *DaemonSetManager) Stop() error {
166-
if err := m.removeAllPodManagers(); err != nil {
167-
return fmt.Errorf("error removing all Pod managers: %w", err)
170+
if err := m.daemonsetPodManager.Stop(); err != nil {
171+
return fmt.Errorf("error removing daemonset Pod manager: %w", err)
168172
}
169173
if err := m.resourceClaimTemplateManager.Stop(); err != nil {
170174
return fmt.Errorf("error stopping ResourceClaimTemplate manager: %w", err)
@@ -266,16 +270,11 @@ func (m *DaemonSetManager) Delete(ctx context.Context, cdUID string) error {
266270
}
267271

268272
d := ds[0]
269-
key := d.Spec.Selector.MatchLabels[computeDomainLabelKey]
270273

271274
if err := m.resourceClaimTemplateManager.Delete(ctx, cdUID); err != nil {
272275
return fmt.Errorf("error deleting ResourceClaimTemplate: %w", err)
273276
}
274277

275-
if err := m.removePodManager(key); err != nil {
276-
return fmt.Errorf("error removing Pod manager: %w", err)
277-
}
278-
279278
if d.GetDeletionTimestamp() != nil {
280279
return nil
281280
}
@@ -374,10 +373,6 @@ func (m *DaemonSetManager) onAddOrUpdate(ctx context.Context, obj any) error {
374373
return nil
375374
}
376375

377-
if err := m.addPodManager(ctx, d.Spec.Selector, string(cd.UID)); err != nil {
378-
return fmt.Errorf("error adding Pod manager '%s/%s': %w", d.Namespace, d.Name, err)
379-
}
380-
381376
if int(d.Status.NumberReady) != cd.Spec.NumNodes {
382377
return nil
383378
}
@@ -400,57 +395,3 @@ func (m *DaemonSetManager) cleanup(ctx context.Context, cdUID string) error {
400395
}
401396
return nil
402397
}
403-
404-
func (m *DaemonSetManager) addPodManager(ctx context.Context, labelSelector *metav1.LabelSelector, computeDomainUID string) error {
405-
key := labelSelector.MatchLabels[computeDomainLabelKey]
406-
407-
if _, exists := m.podManagers[key]; exists {
408-
return nil
409-
}
410-
411-
podManager := NewDaemonSetPodManager(m.config, labelSelector, m.getComputeDomain, computeDomainUID)
412-
413-
if err := podManager.Start(ctx); err != nil {
414-
return fmt.Errorf("error creating Pod manager: %w", err)
415-
}
416-
417-
m.Lock()
418-
m.podManagers[key] = podManager
419-
m.Unlock()
420-
421-
return nil
422-
}
423-
424-
func (m *DaemonSetManager) removePodManager(key string) error {
425-
if _, exists := m.podManagers[key]; !exists {
426-
return nil
427-
}
428-
429-
m.Lock()
430-
podManager := m.podManagers[key]
431-
m.Unlock()
432-
433-
if err := podManager.Stop(); err != nil {
434-
return fmt.Errorf("error stopping Pod manager: %w", err)
435-
}
436-
437-
m.Lock()
438-
delete(m.podManagers, key)
439-
m.Unlock()
440-
441-
return nil
442-
}
443-
444-
func (m *DaemonSetManager) removeAllPodManagers() error {
445-
m.Lock()
446-
for key, pm := range m.podManagers {
447-
m.Unlock()
448-
if err := pm.Stop(); err != nil {
449-
return fmt.Errorf("error stopping Pod manager: %w", err)
450-
}
451-
m.Lock()
452-
delete(m.podManagers, key)
453-
}
454-
m.Unlock()
455-
return nil
456-
}

cmd/compute-domain-controller/daemonsetpods.go

Lines changed: 13 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,18 @@ type DaemonSetPodManager struct {
4141
lister corev1listers.PodLister
4242

4343
getComputeDomain GetComputeDomainFunc
44-
computeDomainUID string
45-
46-
cleanupManager *CleanupManager[*corev1.Pod]
4744
}
4845

49-
func NewDaemonSetPodManager(config *ManagerConfig, labelSelector *metav1.LabelSelector, getComputeDomain GetComputeDomainFunc, computeDomainUID string) *DaemonSetPodManager {
46+
func NewDaemonSetPodManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc) *DaemonSetPodManager {
47+
labelSelector := &metav1.LabelSelector{
48+
MatchExpressions: []metav1.LabelSelectorRequirement{
49+
{
50+
Key: computeDomainLabelKey,
51+
Operator: metav1.LabelSelectorOpExists,
52+
},
53+
},
54+
}
55+
5056
factory := informers.NewSharedInformerFactoryWithOptions(
5157
config.clientsets.Core,
5258
informerResyncPeriod,
@@ -65,11 +71,8 @@ func NewDaemonSetPodManager(config *ManagerConfig, labelSelector *metav1.LabelSe
6571
informer: informer,
6672
lister: lister,
6773
getComputeDomain: getComputeDomain,
68-
computeDomainUID: computeDomainUID,
6974
}
7075

71-
m.cleanupManager = NewCleanupManager[*corev1.Pod](informer, getComputeDomain, m.cleanup)
72-
7376
return m
7477
}
7578

@@ -124,71 +127,24 @@ func (m *DaemonSetPodManager) Start(ctx context.Context) (rerr error) {
124127
return fmt.Errorf("error syncing pod informer: %w", err)
125128
}
126129

127-
if err := m.cleanupManager.Start(ctx); err != nil {
128-
return fmt.Errorf("error starting cleanup manager: %w", err)
129-
}
130-
131130
return nil
132131
}
133132

134133
func (m *DaemonSetPodManager) Stop() error {
135-
if err := m.cleanupManager.Stop(); err != nil {
136-
return fmt.Errorf("error stopping cleanup manager: %w", err)
134+
if m.cancelContext != nil {
135+
m.cancelContext()
137136
}
138-
m.cancelContext()
139137
m.waitGroup.Wait()
140138
return nil
141139
}
142140

143-
// cleanup removes nodes from the ComputeDomain status that no longer have backing pods.
144-
func (m *DaemonSetPodManager) cleanup(ctx context.Context, cdUID string) error {
145-
cd, err := m.getComputeDomain(cdUID)
146-
if err != nil {
147-
return fmt.Errorf("error getting ComputeDomain: %w", err)
148-
}
149-
if cd == nil {
150-
return nil
151-
}
152-
153-
pods, err := m.lister.List(nil)
154-
if err != nil {
155-
return fmt.Errorf("error listing pods: %w", err)
156-
}
157-
158-
newCD := cd.DeepCopy()
159-
160-
// Create a set of pod IPs for efficient lookup
161-
podIPs := make(map[string]struct{})
162-
for _, pod := range pods {
163-
if pod.Status.PodIP != "" {
164-
podIPs[pod.Status.PodIP] = struct{}{}
165-
}
166-
}
167-
168-
// Check if any nodes in the ComputeDomain status don't have backing pods
169-
var updatedNodes []*nvapi.ComputeDomainNode
170-
for _, node := range newCD.Status.Nodes {
171-
if _, exists := podIPs[node.IPAddress]; exists {
172-
updatedNodes = append(updatedNodes, node)
173-
}
174-
}
175-
176-
// Update the ComputeDomain status
177-
if err := m.updateComputeDomainNodes(ctx, newCD, updatedNodes); err != nil {
178-
return fmt.Errorf("error updating ComputeDomain status after removing stale nodes: %w", err)
179-
}
180-
181-
klog.Infof("Successfully cleaned up stale nodes from ComputeDomain %s/%s", cd.Namespace, cd.Name)
182-
return nil
183-
}
184-
185141
func (m *DaemonSetPodManager) onPodDelete(ctx context.Context, obj any) error {
186142
p, ok := obj.(*corev1.Pod)
187143
if !ok {
188144
return fmt.Errorf("failed to cast to Pod")
189145
}
190146

191-
cd, err := m.getComputeDomain(m.computeDomainUID)
147+
cd, err := m.getComputeDomain(p.Labels[computeDomainLabelKey])
192148
if err != nil {
193149
return fmt.Errorf("error getting ComputeDomain: %w", err)
194150
}

0 commit comments

Comments
 (0)