Skip to content

Commit 40fda1b

Browse files
committed
fix: in the status observer, trigger topology discovery when pods are ready
Signed-off-by: Dmitry Shmulevich <[email protected]>
1 parent 0663765 commit 40fda1b

File tree

5 files changed

+204
-141
lines changed

5 files changed

+204
-141
lines changed

internal/k8s/utils.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,15 @@ func GetPodsByLabels(ctx context.Context, client *kubernetes.Clientset, namespac
3333
return client.CoreV1().Pods(namespace).List(ctx, opt)
3434
}
3535

36+
func IsPodReady(pod *corev1.Pod) bool {
37+
for _, cond := range pod.Status.Conditions {
38+
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
39+
return true
40+
}
41+
}
42+
return false
43+
}
44+
3645
func GetDaemonSetPods(ctx context.Context, client *kubernetes.Clientset, name, namespace, nodename string) (*corev1.PodList, error) {
3746
ds, err := client.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
3847
if err != nil {

pkg/node_observer/controller.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ import (
3131
)
3232

3333
type Controller struct {
34-
ctx context.Context
35-
client kubernetes.Interface
36-
nodeInformer *NodeInformer
34+
ctx context.Context
35+
client kubernetes.Interface
36+
statusInformer *StatusInformer
3737
}
3838

3939
func NewController(ctx context.Context, client kubernetes.Interface, cfg *Config) (*Controller, error) {
@@ -50,23 +50,23 @@ func NewController(ctx context.Context, client kubernetes.Interface, cfg *Config
5050
req.Header.Set("Content-Type", "application/json")
5151
return req, nil
5252
}
53-
nodeInformer, err := NewNodeInformer(ctx, client, &cfg.Trigger, f)
53+
statusInformer, err := NewStatusInformer(ctx, client, &cfg.Trigger, f)
5454
if err != nil {
5555
return nil, err
5656
}
5757
return &Controller{
58-
ctx: ctx,
59-
client: client,
60-
nodeInformer: nodeInformer,
58+
ctx: ctx,
59+
client: client,
60+
statusInformer: statusInformer,
6161
}, nil
6262
}
6363

6464
func (c *Controller) Start() error {
6565
klog.Infof("Starting state observer")
66-
return c.nodeInformer.Start()
66+
return c.statusInformer.Start()
6767
}
6868

6969
func (c *Controller) Stop(err error) {
7070
klog.Infof("Stopping state observer")
71-
c.nodeInformer.Stop(err)
71+
c.statusInformer.Stop(err)
7272
}

pkg/node_observer/node_informer.go

Lines changed: 0 additions & 127 deletions
This file was deleted.
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Copyright 2024-2025 NVIDIA CORPORATION
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package node_observer
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
corev1 "k8s.io/api/core/v1"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/labels"
15+
"k8s.io/apimachinery/pkg/util/wait"
16+
"k8s.io/client-go/informers"
17+
"k8s.io/client-go/kubernetes"
18+
"k8s.io/client-go/tools/cache"
19+
"k8s.io/client-go/util/workqueue"
20+
"k8s.io/klog/v2"
21+
22+
"github.com/NVIDIA/topograph/internal/httpreq"
23+
"github.com/NVIDIA/topograph/internal/k8s"
24+
)
25+
26+
type StatusInformer struct {
27+
ctx context.Context
28+
client kubernetes.Interface
29+
reqFunc httpreq.RequestFunc
30+
nodeFactory informers.SharedInformerFactory
31+
podFactory informers.SharedInformerFactory
32+
queue workqueue.TypedRateLimitingInterface[any]
33+
}
34+
35+
func NewStatusInformer(ctx context.Context, client kubernetes.Interface, trigger *Trigger, reqFunc httpreq.RequestFunc) (*StatusInformer, error) {
36+
klog.InfoS("Configuring status informer", "trigger", trigger)
37+
38+
statusInformer := &StatusInformer{
39+
ctx: ctx,
40+
client: client,
41+
reqFunc: reqFunc,
42+
}
43+
44+
if len(trigger.NodeSelector) != 0 {
45+
listOptionsFunc := func(options *metav1.ListOptions) {
46+
options.LabelSelector = labels.Set(trigger.NodeSelector).AsSelector().String()
47+
}
48+
statusInformer.nodeFactory = informers.NewSharedInformerFactoryWithOptions(
49+
client, 0, informers.WithTweakListOptions(listOptionsFunc))
50+
}
51+
52+
if trigger.PodSelector != nil {
53+
selector, err := metav1.LabelSelectorAsSelector(trigger.PodSelector)
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
listOptionsFunc := func(options *metav1.ListOptions) {
59+
options.LabelSelector = selector.String()
60+
}
61+
statusInformer.podFactory = informers.NewSharedInformerFactoryWithOptions(
62+
client, 0, informers.WithTweakListOptions(listOptionsFunc))
63+
}
64+
65+
return statusInformer, nil
66+
}
67+
68+
func (s *StatusInformer) Start() error {
69+
klog.Infof("Starting status informer")
70+
71+
s.queue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
72+
73+
if err := s.startNodeInformer(); err != nil {
74+
return err
75+
}
76+
77+
if err := s.startPodInformer(); err != nil {
78+
return err
79+
}
80+
81+
go wait.Until(func() {
82+
for s.processEvent() {
83+
}
84+
}, time.Second, s.ctx.Done())
85+
86+
return nil
87+
}
88+
89+
func (s *StatusInformer) Stop(_ error) {
90+
klog.Infof("Stopping status informer")
91+
if s.nodeFactory != nil {
92+
s.nodeFactory.Shutdown()
93+
}
94+
if s.podFactory != nil {
95+
s.podFactory.Shutdown()
96+
}
97+
if s.queue != nil {
98+
s.queue.ShutDown()
99+
}
100+
}
101+
102+
func (s *StatusInformer) startNodeInformer() error {
103+
if s.nodeFactory != nil {
104+
informer := s.nodeFactory.Core().V1().Nodes().Informer()
105+
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
106+
AddFunc: func(obj any) {
107+
if node, ok := obj.(*corev1.Node); ok {
108+
klog.V(4).Infof("Informer event:add node:%s", node.Name)
109+
s.queue.Add(struct{}{})
110+
}
111+
},
112+
//UpdateFunc: func(_, obj any) {} // TODO: clarify the change in node that would require topology update
113+
DeleteFunc: func(obj any) {
114+
if node, ok := obj.(*corev1.Node); ok {
115+
klog.V(4).Infof("Informer event:delete node:%s", node.Name)
116+
s.queue.Add(struct{}{})
117+
}
118+
},
119+
})
120+
if err != nil {
121+
return err
122+
}
123+
s.nodeFactory.Start(s.ctx.Done())
124+
s.nodeFactory.WaitForCacheSync(s.ctx.Done())
125+
}
126+
return nil
127+
}
128+
129+
func (s *StatusInformer) startPodInformer() error {
130+
if s.podFactory != nil {
131+
informer := s.podFactory.Core().V1().Pods().Informer()
132+
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
133+
AddFunc: func(obj any) {
134+
if pod, ok := obj.(*corev1.Pod); ok {
135+
if k8s.IsPodReady(pod) {
136+
klog.V(4).Infof("Informer event:add pod:%s/%s", pod.Namespace, pod.Name)
137+
s.queue.Add(struct{}{})
138+
}
139+
}
140+
},
141+
UpdateFunc: func(oldObj, newObj any) {
142+
oldPod, ok := oldObj.(*corev1.Pod)
143+
if !ok {
144+
return
145+
}
146+
newPod, ok := newObj.(*corev1.Pod)
147+
if !ok {
148+
return
149+
}
150+
if k8s.IsPodReady(oldPod) != k8s.IsPodReady(newPod) {
151+
klog.V(4).Infof("Informer event:update pod:%s/%s", newPod.Namespace, newPod.Name)
152+
s.queue.Add(struct{}{})
153+
}
154+
},
155+
DeleteFunc: func(obj any) {
156+
if pod, ok := obj.(*corev1.Pod); ok {
157+
klog.V(4).Infof("Informer event:delete pod:%s/%s", pod.Namespace, pod.Name)
158+
s.queue.Add(struct{}{})
159+
}
160+
},
161+
})
162+
if err != nil {
163+
return err
164+
}
165+
s.podFactory.Start(s.ctx.Done())
166+
s.podFactory.WaitForCacheSync(s.ctx.Done())
167+
}
168+
return nil
169+
}
170+
171+
func (s *StatusInformer) processEvent() bool {
172+
item, quit := s.queue.Get()
173+
if quit {
174+
return false
175+
}
176+
defer s.queue.Done(item)
177+
178+
_, _, err := httpreq.DoRequestWithRetries(s.reqFunc, false)
179+
if err != nil {
180+
klog.Errorf("failed to send HTTP request: %v", err)
181+
}
182+
s.queue.Forget(item)
183+
return true
184+
}

pkg/node_observer/node_informer_test.go renamed to pkg/node_observer/status_informer_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,16 @@ import (
1313
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1414
)
1515

16-
func TestNewNodeInformer(t *testing.T) {
16+
func TestNewStatusInformer(t *testing.T) {
1717
ctx := context.TODO()
1818
trigger := &Trigger{
1919
NodeSelector: map[string]string{"key": "val"},
2020
PodSelector: &metav1.LabelSelector{
2121
MatchLabels: map[string]string{"key": "val"},
2222
},
2323
}
24-
informer, err := NewNodeInformer(ctx, nil, trigger, nil)
24+
informer, err := NewStatusInformer(ctx, nil, trigger, nil)
2525
require.NoError(t, err)
2626
require.NotNil(t, informer.nodeFactory)
2727
require.NotNil(t, informer.podFactory)
28-
29-
f := informer.eventHandler("tested")
30-
require.NotNil(t, f)
3128
}

0 commit comments

Comments
 (0)