Skip to content

Commit 565ccc8

Browse files
committed
fix: in the status observer, trigger topology discovery when pods are ready
Signed-off-by: Dmitry Shmulevich <[email protected]>
1 parent 0663765 commit 565ccc8

File tree

5 files changed

+178
-141
lines changed

5 files changed

+178
-141
lines changed

internal/k8s/utils.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,15 @@ func GetPodsByLabels(ctx context.Context, client *kubernetes.Clientset, namespac
3333
return client.CoreV1().Pods(namespace).List(ctx, opt)
3434
}
3535

36+
func IsPodReady(pod *corev1.Pod) bool {
37+
for _, cond := range pod.Status.Conditions {
38+
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
39+
return true
40+
}
41+
}
42+
return false
43+
}
44+
3645
func GetDaemonSetPods(ctx context.Context, client *kubernetes.Clientset, name, namespace, nodename string) (*corev1.PodList, error) {
3746
ds, err := client.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
3847
if err != nil {

pkg/node_observer/controller.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ import (
3131
)
3232

3333
type Controller struct {
34-
ctx context.Context
35-
client kubernetes.Interface
36-
nodeInformer *NodeInformer
34+
ctx context.Context
35+
client kubernetes.Interface
36+
statusInformer *StatusInformer
3737
}
3838

3939
func NewController(ctx context.Context, client kubernetes.Interface, cfg *Config) (*Controller, error) {
@@ -50,23 +50,23 @@ func NewController(ctx context.Context, client kubernetes.Interface, cfg *Config
5050
req.Header.Set("Content-Type", "application/json")
5151
return req, nil
5252
}
53-
nodeInformer, err := NewNodeInformer(ctx, client, &cfg.Trigger, f)
53+
statusInformer, err := NewStatusInformer(ctx, client, &cfg.Trigger, f)
5454
if err != nil {
5555
return nil, err
5656
}
5757
return &Controller{
58-
ctx: ctx,
59-
client: client,
60-
nodeInformer: nodeInformer,
58+
ctx: ctx,
59+
client: client,
60+
statusInformer: statusInformer,
6161
}, nil
6262
}
6363

6464
func (c *Controller) Start() error {
6565
klog.Infof("Starting state observer")
66-
return c.nodeInformer.Start()
66+
return c.statusInformer.Start()
6767
}
6868

6969
func (c *Controller) Stop(err error) {
7070
klog.Infof("Stopping state observer")
71-
c.nodeInformer.Stop(err)
71+
c.statusInformer.Stop(err)
7272
}

pkg/node_observer/node_informer.go

Lines changed: 0 additions & 127 deletions
This file was deleted.
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2024-2025 NVIDIA CORPORATION
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package node_observer
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
corev1 "k8s.io/api/core/v1"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/labels"
15+
"k8s.io/apimachinery/pkg/util/wait"
16+
"k8s.io/client-go/informers"
17+
"k8s.io/client-go/kubernetes"
18+
"k8s.io/client-go/tools/cache"
19+
"k8s.io/client-go/util/workqueue"
20+
"k8s.io/klog/v2"
21+
22+
"github.com/NVIDIA/topograph/internal/httpreq"
23+
"github.com/NVIDIA/topograph/internal/k8s"
24+
)
25+
26+
type StatusInformer struct {
27+
ctx context.Context
28+
client kubernetes.Interface
29+
reqFunc httpreq.RequestFunc
30+
nodeFactory informers.SharedInformerFactory
31+
podFactory informers.SharedInformerFactory
32+
queue workqueue.TypedRateLimitingInterface[any]
33+
}
34+
35+
func NewStatusInformer(ctx context.Context, client kubernetes.Interface, trigger *Trigger, reqFunc httpreq.RequestFunc) (*StatusInformer, error) {
36+
klog.InfoS("Configuring node informer", "trigger", trigger)
37+
38+
statusInformer := &StatusInformer{
39+
ctx: ctx,
40+
client: client,
41+
reqFunc: reqFunc,
42+
}
43+
44+
if len(trigger.NodeSelector) != 0 {
45+
listOptionsFunc := func(options *metav1.ListOptions) {
46+
options.LabelSelector = labels.Set(trigger.NodeSelector).AsSelector().String()
47+
}
48+
statusInformer.nodeFactory = informers.NewSharedInformerFactoryWithOptions(
49+
client, 0, informers.WithTweakListOptions(listOptionsFunc))
50+
}
51+
52+
if trigger.PodSelector != nil {
53+
selector, err := metav1.LabelSelectorAsSelector(trigger.PodSelector)
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
listOptionsFunc := func(options *metav1.ListOptions) {
59+
options.LabelSelector = selector.String()
60+
}
61+
statusInformer.podFactory = informers.NewSharedInformerFactoryWithOptions(
62+
client, 0, informers.WithTweakListOptions(listOptionsFunc))
63+
}
64+
65+
return statusInformer, nil
66+
}
67+
68+
func (s *StatusInformer) Start() error {
69+
klog.Infof("Starting node informer")
70+
71+
s.queue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
72+
73+
if s.nodeFactory != nil {
74+
nodeInformer := s.nodeFactory.Core().V1().Nodes().Informer()
75+
_, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
76+
AddFunc: func(obj any) {
77+
node := obj.(*corev1.Node)
78+
klog.V(4).Infof("Informer event:add node:%s", node.Name)
79+
s.queue.Add(struct{}{})
80+
},
81+
//UpdateFunc: func(_, obj any) {} // TODO: clarify the change in node that would require topology update
82+
DeleteFunc: func(obj any) {
83+
node := obj.(*corev1.Node)
84+
klog.V(4).Infof("Informer event:delete node:%s", node.Name)
85+
s.queue.Add(struct{}{})
86+
},
87+
})
88+
if err != nil {
89+
return err
90+
}
91+
s.nodeFactory.Start(s.ctx.Done())
92+
s.nodeFactory.WaitForCacheSync(s.ctx.Done())
93+
}
94+
95+
if s.podFactory != nil {
96+
podInformer := s.podFactory.Core().V1().Pods().Informer()
97+
_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
98+
AddFunc: func(obj any) {
99+
pod := obj.(*corev1.Pod)
100+
if k8s.IsPodReady(pod) {
101+
klog.V(4).Infof("Informer event:add pod:%s/%s", pod.Namespace, pod.Name)
102+
s.queue.Add(struct{}{})
103+
}
104+
},
105+
UpdateFunc: func(oldObj, newObj any) {
106+
oldPod := oldObj.(*corev1.Pod)
107+
newPod := newObj.(*corev1.Pod)
108+
if !k8s.IsPodReady(oldPod) && k8s.IsPodReady(newPod) {
109+
klog.V(4).Infof("Informer event:update pod:%s/%s", newPod.Namespace, newPod.Name)
110+
s.queue.Add(struct{}{})
111+
}
112+
},
113+
DeleteFunc: func(obj any) {
114+
pod := obj.(*corev1.Pod)
115+
klog.V(4).Infof("Informer event:delete pod:%s/%s", pod.Namespace, pod.Name)
116+
s.queue.Add(struct{}{})
117+
},
118+
})
119+
if err != nil {
120+
return err
121+
}
122+
s.podFactory.Start(s.ctx.Done())
123+
s.podFactory.WaitForCacheSync(s.ctx.Done())
124+
}
125+
126+
go wait.Until(func() {
127+
for s.processEvent() {
128+
}
129+
}, time.Second, s.ctx.Done())
130+
131+
return nil
132+
}
133+
134+
func (s *StatusInformer) Stop(_ error) {
135+
klog.Infof("Stopping node informer")
136+
if s.nodeFactory != nil {
137+
s.nodeFactory.Shutdown()
138+
}
139+
if s.podFactory != nil {
140+
s.podFactory.Shutdown()
141+
}
142+
s.queue.ShutDown()
143+
}
144+
145+
func (s *StatusInformer) processEvent() bool {
146+
item, quit := s.queue.Get()
147+
if quit {
148+
return false
149+
}
150+
defer s.queue.Done(item)
151+
152+
_, _, err := httpreq.DoRequestWithRetries(s.reqFunc, false)
153+
if err != nil {
154+
klog.Errorf("failed to send HTTP request: %v", err)
155+
}
156+
s.queue.Forget(item)
157+
return true
158+
}

pkg/node_observer/node_informer_test.go renamed to pkg/node_observer/status_informer_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,16 @@ import (
1313
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1414
)
1515

16-
func TestNewNodeInformer(t *testing.T) {
16+
func TestNewStatusInformer(t *testing.T) {
1717
ctx := context.TODO()
1818
trigger := &Trigger{
1919
NodeSelector: map[string]string{"key": "val"},
2020
PodSelector: &metav1.LabelSelector{
2121
MatchLabels: map[string]string{"key": "val"},
2222
},
2323
}
24-
informer, err := NewNodeInformer(ctx, nil, trigger, nil)
24+
informer, err := NewStatusInformer(ctx, nil, trigger, nil)
2525
require.NoError(t, err)
2626
require.NotNil(t, informer.nodeFactory)
2727
require.NotNil(t, informer.podFactory)
28-
29-
f := informer.eventHandler("tested")
30-
require.NotNil(t, f)
3128
}

0 commit comments

Comments
 (0)