diff --git a/internal/k8s/utils.go b/internal/k8s/utils.go index 22b64e0..b8a5c52 100644 --- a/internal/k8s/utils.go +++ b/internal/k8s/utils.go @@ -33,6 +33,15 @@ func GetPodsByLabels(ctx context.Context, client *kubernetes.Clientset, namespac return client.CoreV1().Pods(namespace).List(ctx, opt) } +func IsPodReady(pod *corev1.Pod) bool { + for _, cond := range pod.Status.Conditions { + if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { + return true + } + } + return false +} + func GetDaemonSetPods(ctx context.Context, client *kubernetes.Clientset, name, namespace, nodename string) (*corev1.PodList, error) { ds, err := client.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { diff --git a/internal/k8s/utils_test.go b/internal/k8s/utils_test.go new file mode 100644 index 0000000..ea39dda --- /dev/null +++ b/internal/k8s/utils_test.go @@ -0,0 +1,82 @@ +/* + * Copyright 2025 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package k8s + +import ( + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" +) + +func TestIsPodReady(t *testing.T) { + testCases := []struct { + name string + pod *corev1.Pod + ready bool + }{ + { + name: "Case 1: ready", + pod: &corev1.Pod{ + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.DisruptionTarget, + Status: corev1.ConditionUnknown, + }, + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + ready: true, + }, + { + name: "Case 2: implicit not ready", + pod: &corev1.Pod{ + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.DisruptionTarget, + Status: corev1.ConditionUnknown, + }, + { + Type: corev1.ContainersReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + ready: false, + }, + { + name: "Case 3: explicit not ready", + pod: &corev1.Pod{ + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.DisruptionTarget, + Status: corev1.ConditionUnknown, + }, + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + ready: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.ready, IsPodReady(tc.pod)) + }) + } +} diff --git a/pkg/engines/slinky/engine.go b/pkg/engines/slinky/engine.go index 4433989..19494ba 100644 --- a/pkg/engines/slinky/engine.go +++ b/pkg/engines/slinky/engine.go @@ -133,6 +133,9 @@ func (eng *SlinkyEngine) GetComputeInstances(ctx context.Context, _ engines.Envi // map k8s host name to SLURM host name nodeMap := make(map[string]string) for _, pod := range pods.Items { + if !k8s.IsPodReady(&pod) { + continue + } host, ok := pod.Labels["slurm.node.name"] if !ok { host = pod.Spec.Hostname diff --git a/pkg/node_observer/controller.go b/pkg/node_observer/controller.go index 2abe2bc..cf0a7ea 100644 --- a/pkg/node_observer/controller.go +++ b/pkg/node_observer/controller.go @@ -31,9 +31,9 @@ import ( ) type Controller struct { - ctx context.Context - client kubernetes.Interface - nodeInformer *NodeInformer + ctx context.Context + client kubernetes.Interface + statusInformer *StatusInformer } func NewController(ctx context.Context, client kubernetes.Interface, cfg *Config) (*Controller, error) { @@ -50,23 +50,23 @@ func NewController(ctx context.Context, client kubernetes.Interface, cfg *Config req.Header.Set("Content-Type", "application/json") return req, nil } - nodeInformer, err := NewNodeInformer(ctx, client, &cfg.Trigger, f) + statusInformer, err := NewStatusInformer(ctx, client, &cfg.Trigger, f) if err != nil { return nil, err } return &Controller{ - ctx: ctx, - client: client, - nodeInformer: nodeInformer, + ctx: ctx, + client: client, + statusInformer: statusInformer, }, nil } func (c *Controller) Start() error { klog.Infof("Starting state observer") - return c.nodeInformer.Start() + return c.statusInformer.Start() } func (c *Controller) Stop(err error) { klog.Infof("Stopping state observer") - c.nodeInformer.Stop(err) + c.statusInformer.Stop(err) } diff --git a/pkg/node_observer/node_informer.go b/pkg/node_observer/node_informer.go deleted file mode 100644 index bf75461..0000000 --- a/pkg/node_observer/node_informer.go +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright 2024-2025 NVIDIA CORPORATION - * SPDX-License-Identifier: Apache-2.0 - */ - -package node_observer - -import ( - "context" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - - "github.com/NVIDIA/topograph/internal/httpreq" -) - -type NodeInformer struct { - ctx context.Context - client kubernetes.Interface - reqFunc httpreq.RequestFunc - nodeFactory informers.SharedInformerFactory - podFactory informers.SharedInformerFactory -} - -func NewNodeInformer(ctx context.Context, client kubernetes.Interface, trigger *Trigger, reqFunc httpreq.RequestFunc) (*NodeInformer, error) { - klog.InfoS("Configuring node informer", "trigger", trigger) - - informer := &NodeInformer{ - ctx: ctx, - client: client, - reqFunc: reqFunc, - } - - if len(trigger.NodeSelector) != 0 { - listOptionsFunc := func(options *metav1.ListOptions) { - options.LabelSelector = labels.Set(trigger.NodeSelector).AsSelector().String() - } - informer.nodeFactory = informers.NewSharedInformerFactoryWithOptions( - client, 0, informers.WithTweakListOptions(listOptionsFunc)) - } - - if trigger.PodSelector != nil { - selector, err := metav1.LabelSelectorAsSelector(trigger.PodSelector) - if err != nil { - return nil, err - } - - listOptionsFunc := func(options *metav1.ListOptions) { - options.LabelSelector = selector.String() - } - informer.podFactory = informers.NewSharedInformerFactoryWithOptions( - client, 0, informers.WithTweakListOptions(listOptionsFunc)) - } - - return informer, nil -} - -func (n *NodeInformer) Start() error { - klog.Infof("Starting node informer") - - if n.nodeFactory != nil { - err := n.startInformer(n.nodeFactory.Core().V1().Nodes().Informer()) - if err != nil { - return err - } - } - - if n.podFactory != nil { - err := n.startInformer(n.podFactory.Core().V1().Pods().Informer()) - if err != nil { - return err - } - } - - return nil -} - -func (n *NodeInformer) Stop(_ error) { - klog.Infof("Stopping node informer") - if n.nodeFactory != nil { - n.nodeFactory.Shutdown() - } - if n.podFactory != nil { - n.podFactory.Shutdown() - } -} - -func (n *NodeInformer) startInformer(informer cache.SharedIndexInformer) error { - _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: n.eventHandler("added"), - // TODO: clarify the change in pod/node that would require topology update - //UpdateFunc: func(_, obj any) {} - DeleteFunc: n.eventHandler("deleted"), - }) - if err != nil { - return err - } - - informer.Run(n.ctx.Done()) - return nil -} - -func (n *NodeInformer) eventHandler(action string) func(obj any) { - return func(obj any) { - switch v := obj.(type) { - case *corev1.Pod: - klog.V(4).Infof("Informer %s pod %s/%s", action, v.Namespace, v.Name) - case *corev1.Node: - klog.V(4).Infof("Informer %s node %s", action, v.Name) - default: - klog.V(4).Infof("Informer %s %T %v", action, obj, obj) - } - n.sendRequest() - } -} - -func (n *NodeInformer) sendRequest() { - _, _, err := httpreq.DoRequestWithRetries(n.reqFunc, false) - if err != nil { - klog.Errorf("failed to send HTTP request: %v", err) - } -} diff --git a/pkg/node_observer/status_informer.go b/pkg/node_observer/status_informer.go new file mode 100644 index 0000000..7660b85 --- /dev/null +++ b/pkg/node_observer/status_informer.go @@ -0,0 +1,194 @@ +/* + * Copyright 2024-2025 NVIDIA CORPORATION + * SPDX-License-Identifier: Apache-2.0 + */ + +package node_observer + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "github.com/NVIDIA/topograph/internal/httpreq" + "github.com/NVIDIA/topograph/internal/k8s" +) + +type StatusInformer struct { + ctx context.Context + client kubernetes.Interface + reqFunc httpreq.RequestFunc + nodeFactory informers.SharedInformerFactory + podFactory informers.SharedInformerFactory + queue workqueue.TypedRateLimitingInterface[any] +} + +func NewStatusInformer(ctx context.Context, client kubernetes.Interface, trigger *Trigger, reqFunc httpreq.RequestFunc) (*StatusInformer, error) { + klog.InfoS("Configuring status informer", "trigger", trigger) + + statusInformer := &StatusInformer{ + ctx: ctx, + client: client, + reqFunc: reqFunc, + } + + if len(trigger.NodeSelector) != 0 { + listOptionsFunc := func(options *metav1.ListOptions) { + options.LabelSelector = labels.Set(trigger.NodeSelector).AsSelector().String() + } + statusInformer.nodeFactory = informers.NewSharedInformerFactoryWithOptions( + client, 0, informers.WithTweakListOptions(listOptionsFunc)) + } + + if trigger.PodSelector != nil { + selector, err := metav1.LabelSelectorAsSelector(trigger.PodSelector) + if err != nil { + return nil, err + } + + listOptionsFunc := func(options *metav1.ListOptions) { + options.LabelSelector = selector.String() + } + statusInformer.podFactory = informers.NewSharedInformerFactoryWithOptions( + client, 0, informers.WithTweakListOptions(listOptionsFunc)) + } + + return statusInformer, nil +} + +func (s *StatusInformer) Start() error { + klog.Info("Starting status informer") + + s.queue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) + + if err := s.startNodeInformer(); err != nil { + return err + } + + if err := s.startPodInformer(); err != nil { + return err + } + + go func() { + for s.processEvent() { + } + }() + + return nil +} + +func (s *StatusInformer) Stop(_ error) { + klog.Info("Stopping status informer") + if s.nodeFactory != nil { + s.nodeFactory.Shutdown() + } + if s.podFactory != nil { + s.podFactory.Shutdown() + } + if s.queue != nil { + s.queue.ShutDown() + } +} + +func (s *StatusInformer) startNodeInformer() error { + if s.nodeFactory != nil { + informer := s.nodeFactory.Core().V1().Nodes().Informer() + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + if node, ok := obj.(*corev1.Node); ok { + klog.V(4).Infof("Informer added node %s", node.Name) + s.queue.Add(struct{}{}) + } + }, + //UpdateFunc: func(_, obj any) {} // TODO: clarify the change in node that would require topology update + DeleteFunc: func(obj any) { + switch v := obj.(type) { + case *corev1.Node: + klog.V(4).Infof("Informer deleted node %s", v.Name) + s.queue.Add(struct{}{}) + case cache.DeletedFinalStateUnknown: + if node, ok := v.Obj.(*corev1.Node); ok { + klog.V(4).Infof("Informer deleted node %s", node.Name) + s.queue.Add(struct{}{}) + } + } + }, + }) + if err != nil { + return err + } + s.nodeFactory.Start(s.ctx.Done()) + s.nodeFactory.WaitForCacheSync(s.ctx.Done()) + } + return nil +} + +func (s *StatusInformer) startPodInformer() error { + if s.podFactory != nil { + informer := s.podFactory.Core().V1().Pods().Informer() + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + if pod, ok := obj.(*corev1.Pod); ok { + if k8s.IsPodReady(pod) { + klog.V(4).Infof("Informer added pod %s/%s", pod.Namespace, pod.Name) + s.queue.Add(struct{}{}) + } + } + }, + UpdateFunc: func(oldObj, newObj any) { + oldPod, ok := oldObj.(*corev1.Pod) + if !ok { + return + } + newPod, ok := newObj.(*corev1.Pod) + if !ok { + return + } + if k8s.IsPodReady(oldPod) != k8s.IsPodReady(newPod) { + klog.V(4).Infof("Informer updated pod %s/%s", newPod.Namespace, newPod.Name) + s.queue.Add(struct{}{}) + } + }, + DeleteFunc: func(obj any) { + switch v := obj.(type) { + case *corev1.Pod: + klog.V(4).Infof("Informer deleted pod %s/%s", v.Namespace, v.Name) + s.queue.Add(struct{}{}) + case cache.DeletedFinalStateUnknown: + if pod, ok := v.Obj.(*corev1.Pod); ok { + klog.V(4).Infof("Informer deleted pod %s/%s", pod.Namespace, pod.Name) + s.queue.Add(struct{}{}) + } + } + }, + }) + if err != nil { + return err + } + s.podFactory.Start(s.ctx.Done()) + s.podFactory.WaitForCacheSync(s.ctx.Done()) + } + return nil +} + +func (s *StatusInformer) processEvent() bool { + item, shutdown := s.queue.Get() + if shutdown { + return false + } + defer s.queue.Done(item) + + _, _, err := httpreq.DoRequestWithRetries(s.reqFunc, false) + if err != nil { + klog.Errorf("failed to send HTTP request: %v", err) + } + s.queue.Forget(item) + return true +} diff --git a/pkg/node_observer/node_informer_test.go b/pkg/node_observer/status_informer_test.go similarity index 76% rename from pkg/node_observer/node_informer_test.go rename to pkg/node_observer/status_informer_test.go index 927bad7..10f6081 100644 --- a/pkg/node_observer/node_informer_test.go +++ b/pkg/node_observer/status_informer_test.go @@ -13,7 +13,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestNewNodeInformer(t *testing.T) { +func TestNewStatusInformer(t *testing.T) { ctx := context.TODO() trigger := &Trigger{ NodeSelector: map[string]string{"key": "val"}, @@ -21,11 +21,8 @@ func TestNewNodeInformer(t *testing.T) { MatchLabels: map[string]string{"key": "val"}, }, } - informer, err := NewNodeInformer(ctx, nil, trigger, nil) + informer, err := NewStatusInformer(ctx, nil, trigger, nil) require.NoError(t, err) require.NotNil(t, informer.nodeFactory) require.NotNil(t, informer.podFactory) - - f := informer.eventHandler("tested") - require.NotNil(t, f) }