Skip to content

Commit 9052f8d

Browse files
committed
fix: in the status observer, trigger topology discovery when pods are ready
Signed-off-by: Dmitry Shmulevich <[email protected]>
1 parent 0663765 commit 9052f8d

File tree

5 files changed

+216
-141
lines changed

5 files changed

+216
-141
lines changed

internal/k8s/utils.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,15 @@ func GetPodsByLabels(ctx context.Context, client *kubernetes.Clientset, namespac
3333
return client.CoreV1().Pods(namespace).List(ctx, opt)
3434
}
3535

36+
func IsPodReady(pod *corev1.Pod) bool {
37+
for _, cond := range pod.Status.Conditions {
38+
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
39+
return true
40+
}
41+
}
42+
return false
43+
}
44+
3645
func GetDaemonSetPods(ctx context.Context, client *kubernetes.Clientset, name, namespace, nodename string) (*corev1.PodList, error) {
3746
ds, err := client.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
3847
if err != nil {

pkg/node_observer/controller.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ import (
3131
)
3232

3333
type Controller struct {
34-
ctx context.Context
35-
client kubernetes.Interface
36-
nodeInformer *NodeInformer
34+
ctx context.Context
35+
client kubernetes.Interface
36+
statusInformer *StatusInformer
3737
}
3838

3939
func NewController(ctx context.Context, client kubernetes.Interface, cfg *Config) (*Controller, error) {
@@ -50,23 +50,23 @@ func NewController(ctx context.Context, client kubernetes.Interface, cfg *Config
5050
req.Header.Set("Content-Type", "application/json")
5151
return req, nil
5252
}
53-
nodeInformer, err := NewNodeInformer(ctx, client, &cfg.Trigger, f)
53+
statusInformer, err := NewStatusInformer(ctx, client, &cfg.Trigger, f)
5454
if err != nil {
5555
return nil, err
5656
}
5757
return &Controller{
58-
ctx: ctx,
59-
client: client,
60-
nodeInformer: nodeInformer,
58+
ctx: ctx,
59+
client: client,
60+
statusInformer: statusInformer,
6161
}, nil
6262
}
6363

6464
func (c *Controller) Start() error {
6565
klog.Infof("Starting state observer")
66-
return c.nodeInformer.Start()
66+
return c.statusInformer.Start()
6767
}
6868

6969
func (c *Controller) Stop(err error) {
7070
klog.Infof("Stopping state observer")
71-
c.nodeInformer.Stop(err)
71+
c.statusInformer.Stop(err)
7272
}

pkg/node_observer/node_informer.go

Lines changed: 0 additions & 127 deletions
This file was deleted.
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright 2024-2025 NVIDIA CORPORATION
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package node_observer
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
corev1 "k8s.io/api/core/v1"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/labels"
15+
"k8s.io/apimachinery/pkg/util/wait"
16+
"k8s.io/client-go/informers"
17+
"k8s.io/client-go/kubernetes"
18+
"k8s.io/client-go/tools/cache"
19+
"k8s.io/client-go/util/workqueue"
20+
"k8s.io/klog/v2"
21+
22+
"github.com/NVIDIA/topograph/internal/httpreq"
23+
"github.com/NVIDIA/topograph/internal/k8s"
24+
)
25+
26+
type StatusInformer struct {
27+
ctx context.Context
28+
client kubernetes.Interface
29+
reqFunc httpreq.RequestFunc
30+
nodeFactory informers.SharedInformerFactory
31+
podFactory informers.SharedInformerFactory
32+
queue workqueue.TypedRateLimitingInterface[any]
33+
}
34+
35+
func NewStatusInformer(ctx context.Context, client kubernetes.Interface, trigger *Trigger, reqFunc httpreq.RequestFunc) (*StatusInformer, error) {
36+
klog.InfoS("Configuring status informer", "trigger", trigger)
37+
38+
statusInformer := &StatusInformer{
39+
ctx: ctx,
40+
client: client,
41+
reqFunc: reqFunc,
42+
}
43+
44+
if len(trigger.NodeSelector) != 0 {
45+
listOptionsFunc := func(options *metav1.ListOptions) {
46+
options.LabelSelector = labels.Set(trigger.NodeSelector).AsSelector().String()
47+
}
48+
statusInformer.nodeFactory = informers.NewSharedInformerFactoryWithOptions(
49+
client, 0, informers.WithTweakListOptions(listOptionsFunc))
50+
}
51+
52+
if trigger.PodSelector != nil {
53+
selector, err := metav1.LabelSelectorAsSelector(trigger.PodSelector)
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
listOptionsFunc := func(options *metav1.ListOptions) {
59+
options.LabelSelector = selector.String()
60+
}
61+
statusInformer.podFactory = informers.NewSharedInformerFactoryWithOptions(
62+
client, 0, informers.WithTweakListOptions(listOptionsFunc))
63+
}
64+
65+
return statusInformer, nil
66+
}
67+
68+
func (s *StatusInformer) Start() error {
69+
klog.Info("Starting status informer")
70+
71+
s.queue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
72+
73+
if err := s.startNodeInformer(); err != nil {
74+
return err
75+
}
76+
77+
if err := s.startPodInformer(); err != nil {
78+
return err
79+
}
80+
81+
go wait.Until(func() {
82+
for s.processEvent() {
83+
}
84+
}, time.Second, s.ctx.Done())
85+
86+
return nil
87+
}
88+
89+
func (s *StatusInformer) Stop(_ error) {
90+
klog.Info("Stopping status informer")
91+
if s.nodeFactory != nil {
92+
s.nodeFactory.Shutdown()
93+
}
94+
if s.podFactory != nil {
95+
s.podFactory.Shutdown()
96+
}
97+
if s.queue != nil {
98+
s.queue.ShutDown()
99+
}
100+
}
101+
102+
func (s *StatusInformer) startNodeInformer() error {
103+
if s.nodeFactory != nil {
104+
informer := s.nodeFactory.Core().V1().Nodes().Informer()
105+
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
106+
AddFunc: func(obj any) {
107+
if node, ok := obj.(*corev1.Node); ok {
108+
klog.V(4).Infof("Informer added node %s", node.Name)
109+
s.queue.Add(struct{}{})
110+
}
111+
},
112+
//UpdateFunc: func(_, obj any) {} // TODO: clarify the change in node that would require topology update
113+
DeleteFunc: func(obj any) {
114+
switch v := obj.(type) {
115+
case *corev1.Node:
116+
klog.V(4).Infof("Informer deleted node %s", v.Name)
117+
s.queue.Add(struct{}{})
118+
case cache.DeletedFinalStateUnknown:
119+
if node, ok := v.Obj.(*corev1.Node); ok {
120+
klog.V(4).Infof("Informer deleted node %s", node.Name)
121+
s.queue.Add(struct{}{})
122+
}
123+
}
124+
},
125+
})
126+
if err != nil {
127+
return err
128+
}
129+
s.nodeFactory.Start(s.ctx.Done())
130+
s.nodeFactory.WaitForCacheSync(s.ctx.Done())
131+
}
132+
return nil
133+
}
134+
135+
func (s *StatusInformer) startPodInformer() error {
136+
if s.podFactory != nil {
137+
informer := s.podFactory.Core().V1().Pods().Informer()
138+
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
139+
AddFunc: func(obj any) {
140+
if pod, ok := obj.(*corev1.Pod); ok {
141+
if k8s.IsPodReady(pod) {
142+
klog.V(4).Infof("Informer added pod %s/%s", pod.Namespace, pod.Name)
143+
s.queue.Add(struct{}{})
144+
}
145+
}
146+
},
147+
UpdateFunc: func(oldObj, newObj any) {
148+
oldPod, ok := oldObj.(*corev1.Pod)
149+
if !ok {
150+
return
151+
}
152+
newPod, ok := newObj.(*corev1.Pod)
153+
if !ok {
154+
return
155+
}
156+
if k8s.IsPodReady(oldPod) != k8s.IsPodReady(newPod) {
157+
klog.V(4).Infof("Informer updated pod %s/%s", newPod.Namespace, newPod.Name)
158+
s.queue.Add(struct{}{})
159+
}
160+
},
161+
DeleteFunc: func(obj any) {
162+
switch v := obj.(type) {
163+
case *corev1.Pod:
164+
klog.V(4).Infof("Informer deleted pod %s/%s", v.Namespace, v.Name)
165+
s.queue.Add(struct{}{})
166+
case cache.DeletedFinalStateUnknown:
167+
if pod, ok := v.Obj.(*corev1.Pod); ok {
168+
klog.V(4).Infof("Informer deleted pod %s/%s", pod.Namespace, pod.Name)
169+
s.queue.Add(struct{}{})
170+
}
171+
}
172+
},
173+
})
174+
if err != nil {
175+
return err
176+
}
177+
s.podFactory.Start(s.ctx.Done())
178+
s.podFactory.WaitForCacheSync(s.ctx.Done())
179+
}
180+
return nil
181+
}
182+
183+
func (s *StatusInformer) processEvent() bool {
184+
item, quit := s.queue.Get()
185+
if quit {
186+
return false
187+
}
188+
defer s.queue.Done(item)
189+
190+
_, _, err := httpreq.DoRequestWithRetries(s.reqFunc, false)
191+
if err != nil {
192+
klog.Errorf("failed to send HTTP request: %v", err)
193+
}
194+
s.queue.Forget(item)
195+
return true
196+
}

0 commit comments

Comments
 (0)