Skip to content

Commit 121a713

Browse files
committed
1- Add scheduler topology envtest
2- Bug fix: filter nodes that doesn't have the required topology labels
1 parent 847bc71 commit 121a713

File tree

3 files changed

+195
-11
lines changed

3 files changed

+195
-11
lines changed

pkg/env-tests/scheduler_test.go

Lines changed: 164 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package env_tests
66
import (
77
"context"
88
"fmt"
9+
"math/rand/v2"
910
"time"
1011

1112
. "github.com/onsi/ginkgo/v2"
@@ -24,6 +25,7 @@ import (
2425
"github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
2526
"github.com/NVIDIA/KAI-scheduler/pkg/env-tests/dynamicresource"
2627
"github.com/NVIDIA/KAI-scheduler/pkg/env-tests/scheduler"
28+
kueuev1alpha1 "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
2729
)
2830

2931
var _ = Describe("Scheduler", Ordered, func() {
@@ -107,7 +109,13 @@ var _ = Describe("Scheduler", Ordered, func() {
107109
})
108110
Expect(ctrlClient.Create(ctx, testPod)).To(Succeed(), "Failed to create test pod")
109111

110-
err := GroupPods(ctx, ctrlClient, testQueue.Name, "test-podgroup", []*corev1.Pod{testPod}, 1)
112+
podGroupConfig := podGroupConfig{
113+
queueName: testQueue.Name,
114+
podgroupName: "test-podgroup",
115+
minMember: 1,
116+
topologyConstraint: nil,
117+
}
118+
err := GroupPods(ctx, ctrlClient, podGroupConfig, []*corev1.Pod{testPod})
111119
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")
112120

113121
// Wait for bind request to be created instead of checking pod binding
@@ -136,8 +144,13 @@ var _ = Describe("Scheduler", Ordered, func() {
136144
}
137145
Expect(ctrlClient.Create(ctx, testPod)).To(Succeed(), "Failed to create test pod")
138146

139-
err := GroupPods(ctx, ctrlClient, testQueue.Name, "test-podgroup", []*corev1.Pod{testPod}, 1)
140-
147+
podGroupConfig := podGroupConfig{
148+
queueName: testQueue.Name,
149+
podgroupName: "test-podgroup",
150+
minMember: 1,
151+
topologyConstraint: nil,
152+
}
153+
err := GroupPods(ctx, ctrlClient, podGroupConfig, []*corev1.Pod{testPod})
141154
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")
142155
err = wait.PollUntilContextTimeout(ctx, interval, defaultTimeout, true, func(ctx context.Context) (bool, error) {
143156
var events corev1.EventList
@@ -198,7 +211,13 @@ var _ = Describe("Scheduler", Ordered, func() {
198211
}
199212
Expect(ctrlClient.Create(ctx, testPod2)).To(Succeed(), "Failed to create test pod")
200213

201-
err := GroupPods(ctx, ctrlClient, testQueue.Name, "test-podgroup", []*corev1.Pod{testPod1, testPod2}, 1)
214+
podGroupConfig := podGroupConfig{
215+
queueName: testQueue.Name,
216+
podgroupName: "test-podgroup",
217+
minMember: 1,
218+
topologyConstraint: nil,
219+
}
220+
err := GroupPods(ctx, ctrlClient, podGroupConfig, []*corev1.Pod{testPod1, testPod2})
202221
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")
203222

204223
err = wait.PollUntilContextTimeout(ctx, interval, defaultTimeout, true, func(ctx context.Context) (bool, error) {
@@ -289,7 +308,13 @@ var _ = Describe("Scheduler", Ordered, func() {
289308
dynamicresource.UseClaim(testPod, resourceClaim)
290309
Expect(ctrlClient.Create(ctx, testPod)).To(Succeed(), "Failed to create test pod")
291310

292-
err = GroupPods(ctx, ctrlClient, testQueue.Name, "test-podgroup", []*corev1.Pod{testPod}, 1)
311+
podGroupConfig := podGroupConfig{
312+
queueName: testQueue.Name,
313+
podgroupName: "test-podgroup",
314+
minMember: 1,
315+
topologyConstraint: nil,
316+
}
317+
err = GroupPods(ctx, ctrlClient, podGroupConfig, []*corev1.Pod{testPod})
293318
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")
294319

295320
err = WaitForPodScheduled(ctx, ctrlClient, testPod.Name, testNamespace.Name, defaultTimeout, interval)
@@ -322,7 +347,13 @@ var _ = Describe("Scheduler", Ordered, func() {
322347
Expect(ctrlClient.Create(ctx, testPod)).To(Succeed(), "Failed to create test pod")
323348
createdPods = append(createdPods, testPod)
324349

325-
err = GroupPods(ctx, ctrlClient, testQueue.Name, fmt.Sprintf("test-podgroup-%d", i), []*corev1.Pod{testPod}, 1)
350+
podGroupConfig := podGroupConfig{
351+
queueName: testQueue.Name,
352+
podgroupName: fmt.Sprintf("test-podgroup-%d", i),
353+
minMember: 1,
354+
topologyConstraint: nil,
355+
}
356+
err = GroupPods(ctx, ctrlClient, podGroupConfig, []*corev1.Pod{testPod})
326357
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")
327358
}
328359

@@ -338,4 +369,131 @@ var _ = Describe("Scheduler", Ordered, func() {
338369
Expect(podMap).To(HaveLen(deviceNum), "Expected exactly %d pods to be scheduled", deviceNum)
339370
})
340371
})
372+
373+
Context("topology", func() {
374+
const (
375+
topologyName = "gb2000-topology"
376+
rackLabelKey = "nvidia.com/gpu.clique"
377+
hostnameLabelKey = "kubernetes.io/hostname"
378+
379+
gpusPerNode = 4
380+
numRacks = 2
381+
numNodesPerRack = 18
382+
numNodes = numRacks * numNodesPerRack
383+
)
384+
385+
var (
386+
topologyNodes []*corev1.Node
387+
topologyTree *kueuev1alpha1.Topology
388+
)
389+
390+
BeforeAll(func(ctx context.Context) {
391+
topologyNodes = []*corev1.Node{}
392+
393+
nodeToRack := map[int]string{}
394+
for i := range numNodes {
395+
nodeToRack[i] = fmt.Sprintf("%d", i%numRacks)
396+
}
397+
rand.Shuffle(len(nodeToRack), func(i, j int) {
398+
nodeToRack[i], nodeToRack[j] = nodeToRack[j], nodeToRack[i]
399+
})
400+
401+
for i := 0; i < numNodes; i++ {
402+
topologyNode := DefaultNodeConfig(fmt.Sprintf("t-node-%d", i))
403+
topologyNode.GPUs = gpusPerNode
404+
topologyNode.Labels[rackLabelKey] = fmt.Sprintf("clusterUuid.%s", nodeToRack[i])
405+
topologyNode.Labels[hostnameLabelKey] = topologyNode.Name
406+
nodeObj := CreateNodeObject(ctx, ctrlClient, topologyNode)
407+
Expect(ctrlClient.Create(ctx, nodeObj)).To(Succeed(), "Failed to create topology test node")
408+
topologyNodes = append(topologyNodes, nodeObj)
409+
}
410+
411+
topologyTree = &kueuev1alpha1.Topology{
412+
ObjectMeta: metav1.ObjectMeta{
413+
Name: topologyName,
414+
},
415+
Spec: kueuev1alpha1.TopologySpec{
416+
Levels: []kueuev1alpha1.TopologyLevel{
417+
{NodeLabel: rackLabelKey},
418+
{NodeLabel: hostnameLabelKey},
419+
},
420+
},
421+
}
422+
Expect(ctrlClient.Create(ctx, topologyTree)).To(Succeed(), "Failed to create topology tree")
423+
})
424+
425+
AfterAll(func(ctx context.Context) {
426+
if topologyTree != nil {
427+
Expect(ctrlClient.Delete(ctx, topologyTree)).To(Succeed(), "Failed to delete topology tree")
428+
}
429+
430+
for _, node := range topologyNodes {
431+
Expect(ctrlClient.Delete(ctx, node)).To(Succeed(), "Failed to delete topology test node")
432+
}
433+
434+
err := DeleteAllInNamespace(ctx, ctrlClient, testNamespace.Name,
435+
&corev1.Pod{},
436+
&schedulingv2alpha2.PodGroup{},
437+
&resourcev1beta1.ResourceClaim{},
438+
&kaiv1alpha2.BindRequest{},
439+
)
440+
Expect(err).NotTo(HaveOccurred(), "Failed to delete test resources")
441+
442+
err = WaitForNoObjectsInNamespace(ctx, ctrlClient, testNamespace.Name, defaultTimeout, interval,
443+
&corev1.PodList{},
444+
&schedulingv2alpha2.PodGroupList{},
445+
&resourcev1beta1.ResourceClaimList{},
446+
&kaiv1alpha2.BindRequestList{},
447+
)
448+
Expect(err).NotTo(HaveOccurred(), "Failed to wait for test resources to be deleted")
449+
})
450+
451+
It("Schedule pods with rack topology constraints", func(ctx context.Context) {
452+
singlePodResourceRequirements := corev1.ResourceRequirements{
453+
Limits: corev1.ResourceList{
454+
constants.GpuResource: resource.MustParse(fmt.Sprintf("%d", gpusPerNode)),
455+
},
456+
Requests: corev1.ResourceList{
457+
constants.GpuResource: resource.MustParse(fmt.Sprintf("%d", gpusPerNode)),
458+
},
459+
}
460+
461+
workloadPods := []*corev1.Pod{}
462+
for i := 0; i < 4; i++ {
463+
testPod := CreatePodObject(testNamespace.Name, fmt.Sprintf("test-pod-%d", i), singlePodResourceRequirements)
464+
Expect(ctrlClient.Create(ctx, testPod)).To(Succeed(), "Failed to create test pod %s", testPod.Name)
465+
workloadPods = append(workloadPods, testPod)
466+
}
467+
468+
podGroupConfig := podGroupConfig{
469+
queueName: testQueue.Name,
470+
podgroupName: "test-podgroup",
471+
minMember: 1,
472+
topologyConstraint: &schedulingv2alpha2.TopologyConstraint{
473+
Topology: topologyName,
474+
RequiredTopologyLevel: rackLabelKey,
475+
},
476+
}
477+
err := GroupPods(ctx, ctrlClient, podGroupConfig, workloadPods)
478+
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")
479+
480+
for _, pod := range workloadPods {
481+
err = WaitForPodScheduled(ctx, ctrlClient, pod.Name, testNamespace.Name, defaultTimeout, interval)
482+
Expect(err).NotTo(HaveOccurred(), "Failed to wait for test pod to be scheduled")
483+
}
484+
485+
pods := &corev1.PodList{}
486+
Expect(ctrlClient.List(ctx, pods, client.InNamespace(testNamespace.Name))).
487+
To(Succeed(), "Failed to list pods")
488+
489+
Expect(len(pods.Items)).To(Equal(4), "Expected 4 pods to be created in the test")
490+
491+
scheduledRacks := map[string]int{}
492+
for _, pod := range pods.Items {
493+
scheduledRacks[pod.Labels[rackLabelKey]]++
494+
}
495+
496+
Expect(scheduledRacks).To(HaveLen(1), "Expected all pods to be scheduled on the same rack. Got %v", scheduledRacks)
497+
})
498+
})
341499
})

pkg/env-tests/utils.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ import (
2222
commonconsts "github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
2323
)
2424

25+
type podGroupConfig struct {
26+
queueName string
27+
podgroupName string
28+
minMember int32
29+
topologyConstraint *schedulingv2alpha2.TopologyConstraint
30+
}
31+
2532
// NodeConfig holds the configuration for a test node
2633
type NodeConfig struct {
2734
Name string
@@ -36,6 +43,7 @@ type NodeConfig struct {
3643
func DefaultNodeConfig(name string) NodeConfig {
3744
return NodeConfig{
3845
Name: name,
46+
Labels: map[string]string{},
3947
CPUs: "8",
4048
Memory: "16Gi",
4149
GPUs: 4,
@@ -125,22 +133,25 @@ func CreatePodObject(namespace, name string, resources corev1.ResourceRequiremen
125133
return pod
126134
}
127135

128-
func GroupPods(ctx context.Context, c client.Client, queueName, podgroupName string, pods []*corev1.Pod, minMember int32) error {
136+
func GroupPods(ctx context.Context, c client.Client, podGroupConfig podGroupConfig, pods []*corev1.Pod) error {
129137
if len(pods) == 0 {
130138
return fmt.Errorf("no pods to group")
131139
}
132140

133141
podgroup := &schedulingv2alpha2.PodGroup{
134142
ObjectMeta: metav1.ObjectMeta{
135-
Name: podgroupName,
143+
Name: podGroupConfig.podgroupName,
136144
Namespace: pods[0].Namespace,
137145
},
138146
Spec: schedulingv2alpha2.PodGroupSpec{
139-
Queue: queueName,
140-
MinMember: minMember,
147+
Queue: podGroupConfig.queueName,
148+
MinMember: podGroupConfig.minMember,
141149
MarkUnschedulable: ptr.To(true),
142150
},
143151
}
152+
if podGroupConfig.topologyConstraint != nil {
153+
podgroup.Spec.TopologyConstraint = *podGroupConfig.topologyConstraint
154+
}
144155
err := c.Create(ctx, podgroup, &client.CreateOptions{})
145156
if err != nil {
146157
return fmt.Errorf("failed to create podgroup: %w", err)
@@ -151,7 +162,7 @@ func GroupPods(ctx context.Context, c client.Client, queueName, podgroupName str
151162
if pod.Annotations == nil {
152163
pod.Annotations = make(map[string]string)
153164
}
154-
pod.Annotations[commonconsts.PodGroupAnnotationForPod] = podgroupName
165+
pod.Annotations[commonconsts.PodGroupAnnotationForPod] = podGroupConfig.podgroupName
155166
err = c.Patch(ctx, pod, client.MergeFrom(originalPod))
156167
if err != nil {
157168
return fmt.Errorf("failed to patch pod with podgroup annotation: %w", err)

pkg/scheduler/plugins/topology/topology_plugin.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ func (t *topologyPlugin) initializeTopologyTree(topologies []*kueuev1alpha1.Topo
7272
}
7373

7474
func (*topologyPlugin) addNodeDataToTopology(topologyTree *TopologyInfo, singleTopology *kueuev1alpha1.Topology, nodeInfo *node_info.NodeInfo) {
75+
// Validate that the node is part of the topology
76+
if !isNodePartOfTopology(nodeInfo, singleTopology) {
77+
return
78+
}
79+
7580
var nodeContainingChildDomain *TopologyDomainInfo
7681
for levelIndex := len(singleTopology.Spec.Levels) - 1; levelIndex >= 0; levelIndex-- {
7782
level := singleTopology.Spec.Levels[levelIndex]
@@ -105,4 +110,14 @@ func (*topologyPlugin) addNodeDataToTopology(topologyTree *TopologyInfo, singleT
105110
topologyTree.Root.AddNode(nodeInfo)
106111
}
107112

113+
// For a given node to be part of the topology correctly, it must have a label for each level of the topology
114+
func isNodePartOfTopology(nodeInfo *node_info.NodeInfo, singleTopology *kueuev1alpha1.Topology) bool {
115+
for _, level := range singleTopology.Spec.Levels {
116+
if _, found := nodeInfo.Node.Labels[level.NodeLabel]; !found {
117+
return false
118+
}
119+
}
120+
return true
121+
}
122+
108123
func (t *topologyPlugin) OnSessionClose(ssn *framework.Session) {}

0 commit comments

Comments
 (0)