Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 177 additions & 6 deletions pkg/env-tests/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package env_tests
import (
"context"
"fmt"
"math/rand/v2"
"time"

. "github.com/onsi/ginkgo/v2"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
"github.com/NVIDIA/KAI-scheduler/pkg/env-tests/dynamicresource"
"github.com/NVIDIA/KAI-scheduler/pkg/env-tests/scheduler"
kueuev1alpha1 "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
)

var _ = Describe("Scheduler", Ordered, func() {
Expand Down Expand Up @@ -107,7 +109,13 @@ var _ = Describe("Scheduler", Ordered, func() {
})
Expect(ctrlClient.Create(ctx, testPod)).To(Succeed(), "Failed to create test pod")

err := GroupPods(ctx, ctrlClient, testQueue.Name, "test-podgroup", []*corev1.Pod{testPod}, 1)
podGroupConfig := podGroupConfig{
queueName: testQueue.Name,
podgroupName: "test-podgroup",
minMember: 1,
topologyConstraint: nil,
}
err := GroupPods(ctx, ctrlClient, podGroupConfig, []*corev1.Pod{testPod})
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")

// Wait for bind request to be created instead of checking pod binding
Expand Down Expand Up @@ -136,8 +144,13 @@ var _ = Describe("Scheduler", Ordered, func() {
}
Expect(ctrlClient.Create(ctx, testPod)).To(Succeed(), "Failed to create test pod")

err := GroupPods(ctx, ctrlClient, testQueue.Name, "test-podgroup", []*corev1.Pod{testPod}, 1)

podGroupConfig := podGroupConfig{
queueName: testQueue.Name,
podgroupName: "test-podgroup",
minMember: 1,
topologyConstraint: nil,
}
err := GroupPods(ctx, ctrlClient, podGroupConfig, []*corev1.Pod{testPod})
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")
err = wait.PollUntilContextTimeout(ctx, interval, defaultTimeout, true, func(ctx context.Context) (bool, error) {
var events corev1.EventList
Expand Down Expand Up @@ -198,7 +211,13 @@ var _ = Describe("Scheduler", Ordered, func() {
}
Expect(ctrlClient.Create(ctx, testPod2)).To(Succeed(), "Failed to create test pod")

err := GroupPods(ctx, ctrlClient, testQueue.Name, "test-podgroup", []*corev1.Pod{testPod1, testPod2}, 1)
podGroupConfig := podGroupConfig{
queueName: testQueue.Name,
podgroupName: "test-podgroup",
minMember: 1,
topologyConstraint: nil,
}
err := GroupPods(ctx, ctrlClient, podGroupConfig, []*corev1.Pod{testPod1, testPod2})
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")

err = wait.PollUntilContextTimeout(ctx, interval, defaultTimeout, true, func(ctx context.Context) (bool, error) {
Expand Down Expand Up @@ -289,7 +308,13 @@ var _ = Describe("Scheduler", Ordered, func() {
dynamicresource.UseClaim(testPod, resourceClaim)
Expect(ctrlClient.Create(ctx, testPod)).To(Succeed(), "Failed to create test pod")

err = GroupPods(ctx, ctrlClient, testQueue.Name, "test-podgroup", []*corev1.Pod{testPod}, 1)
podGroupConfig := podGroupConfig{
queueName: testQueue.Name,
podgroupName: "test-podgroup",
minMember: 1,
topologyConstraint: nil,
}
err = GroupPods(ctx, ctrlClient, podGroupConfig, []*corev1.Pod{testPod})
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")

err = WaitForPodScheduled(ctx, ctrlClient, testPod.Name, testNamespace.Name, defaultTimeout, interval)
Expand Down Expand Up @@ -322,7 +347,13 @@ var _ = Describe("Scheduler", Ordered, func() {
Expect(ctrlClient.Create(ctx, testPod)).To(Succeed(), "Failed to create test pod")
createdPods = append(createdPods, testPod)

err = GroupPods(ctx, ctrlClient, testQueue.Name, fmt.Sprintf("test-podgroup-%d", i), []*corev1.Pod{testPod}, 1)
podGroupConfig := podGroupConfig{
queueName: testQueue.Name,
podgroupName: fmt.Sprintf("test-podgroup-%d", i),
minMember: 1,
topologyConstraint: nil,
}
err = GroupPods(ctx, ctrlClient, podGroupConfig, []*corev1.Pod{testPod})
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")
}

Expand All @@ -338,4 +369,144 @@ var _ = Describe("Scheduler", Ordered, func() {
Expect(podMap).To(HaveLen(deviceNum), "Expected exactly %d pods to be scheduled", deviceNum)
})
})

Context("topology", func() {
const (
topologyName = "gb2000-topology"
rackLabelKey = "nvidia.com/gpu.clique"
hostnameLabelKey = "kubernetes.io/hostname"

gpusPerNode = 4
numRacks = 2
numNodesPerRack = 18
numNodes = numRacks * numNodesPerRack
)

var (
topologyNodes []*corev1.Node
topologyTree *kueuev1alpha1.Topology
)

BeforeAll(func(ctx context.Context) {
topologyNodes = []*corev1.Node{}

nodeToRack := map[int]string{}
for i := range numNodes {
nodeToRack[i] = fmt.Sprintf("%d", i%numRacks)
}
rand.Shuffle(len(nodeToRack), func(i, j int) {
nodeToRack[i], nodeToRack[j] = nodeToRack[j], nodeToRack[i]
})

for i := range numNodes {
topologyNode := DefaultNodeConfig(fmt.Sprintf("t-node-%d", i))
topologyNode.GPUs = gpusPerNode
topologyNode.Labels[rackLabelKey] = fmt.Sprintf("clusterUuid.%s", nodeToRack[i])
topologyNode.Labels[hostnameLabelKey] = topologyNode.Name
nodeObj := CreateNodeObject(ctx, ctrlClient, topologyNode)
Expect(ctrlClient.Create(ctx, nodeObj)).To(Succeed(), "Failed to create topology test node")
topologyNodes = append(topologyNodes, nodeObj)
}

topologyTree = &kueuev1alpha1.Topology{
ObjectMeta: metav1.ObjectMeta{
Name: topologyName,
},
Spec: kueuev1alpha1.TopologySpec{
Levels: []kueuev1alpha1.TopologyLevel{
{NodeLabel: rackLabelKey},
{NodeLabel: hostnameLabelKey},
},
},
}
Expect(ctrlClient.Create(ctx, topologyTree)).To(Succeed(), "Failed to create topology tree")
})

AfterAll(func(ctx context.Context) {
if topologyTree != nil {
Expect(ctrlClient.Delete(ctx, topologyTree)).To(Succeed(), "Failed to delete topology tree")
}

for _, node := range topologyNodes {
Expect(ctrlClient.Delete(ctx, node)).To(Succeed(), "Failed to delete topology test node")
}

err := DeleteAllInNamespace(ctx, ctrlClient, testNamespace.Name,
&corev1.Pod{},
&schedulingv2alpha2.PodGroup{},
&resourcev1beta1.ResourceClaim{},
&kaiv1alpha2.BindRequest{},
)
Expect(err).NotTo(HaveOccurred(), "Failed to delete test resources")

err = WaitForNoObjectsInNamespace(ctx, ctrlClient, testNamespace.Name, defaultTimeout, interval,
&corev1.PodList{},
&schedulingv2alpha2.PodGroupList{},
&resourcev1beta1.ResourceClaimList{},
&kaiv1alpha2.BindRequestList{},
)
Expect(err).NotTo(HaveOccurred(), "Failed to wait for test resources to be deleted")
})

It("Schedule pods with rack topology constraints", func(ctx context.Context) {

// schedule a single gpu pod outside of the topology to try and "pull" the topology constraint workload pods outside of a valid rack
binPackingPullPod := CreatePodObject(testNamespace.Name, "bin-packing-pull-pod", corev1.ResourceRequirements{
Limits: corev1.ResourceList{constants.GpuResource: resource.MustParse("1")},
})
binPackingPullPod.Spec.NodeSelector = map[string]string{
hostnameLabelKey: "test-node",
}
Expect(ctrlClient.Create(ctx, binPackingPullPod)).To(Succeed(), "Failed to create bin-packing-pull-pod")
err := GroupPods(ctx, ctrlClient, podGroupConfig{queueName: testQueue.Name, podgroupName: "bin-packing-pull-podgroup", minMember: 1}, []*corev1.Pod{binPackingPullPod})
Expect(err).NotTo(HaveOccurred(), "Failed tocreate a pod group for bin-packing-pull-pod")

singlePodResourceRequirements := corev1.ResourceRequirements{
Limits: corev1.ResourceList{
constants.GpuResource: resource.MustParse(fmt.Sprintf("%d", gpusPerNode-1)),
},
Requests: corev1.ResourceList{
constants.GpuResource: resource.MustParse(fmt.Sprintf("%d", gpusPerNode-1)),
},
}

numWorkloadPods := 4
workloadPods := []*corev1.Pod{}
for i := range numWorkloadPods {
testPod := CreatePodObject(testNamespace.Name, fmt.Sprintf("test-pod-%d", i), singlePodResourceRequirements)
Expect(ctrlClient.Create(ctx, testPod)).To(Succeed(), "Failed to create test pod %s", testPod.Name)
workloadPods = append(workloadPods, testPod)
}

podGroupConfig := podGroupConfig{
queueName: testQueue.Name,
podgroupName: "test-podgroup",
minMember: 1,
topologyConstraint: &schedulingv2alpha2.TopologyConstraint{
Topology: topologyName,
RequiredTopologyLevel: rackLabelKey,
},
}
err = GroupPods(ctx, ctrlClient, podGroupConfig, workloadPods)
Expect(err).NotTo(HaveOccurred(), "Failed to group pods")

for _, pod := range workloadPods {
err = WaitForPodScheduled(ctx, ctrlClient, pod.Name, testNamespace.Name, defaultTimeout, interval)
Expect(err).NotTo(HaveOccurred(), "Failed to wait for test pod to be scheduled")
}

pods := &corev1.PodList{}
Expect(ctrlClient.List(ctx, pods, client.InNamespace(testNamespace.Name))).
To(Succeed(), "Failed to list pods")

Expect(len(pods.Items)).To(Equal(numWorkloadPods+1), "Expected %d pods to be created in the test", numWorkloadPods)

scheduledRacks := map[string]int{}
for _, pod := range pods.Items {
scheduledRacks[pod.Labels[rackLabelKey]]++
}

Expect(scheduledRacks).To(HaveLen(1), "Expected all pods to be scheduled on the same rack. Got %v", scheduledRacks)
})
})
})
21 changes: 16 additions & 5 deletions pkg/env-tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ import (
commonconsts "github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
)

type podGroupConfig struct {
queueName string
podgroupName string
minMember int32
topologyConstraint *schedulingv2alpha2.TopologyConstraint
}

// NodeConfig holds the configuration for a test node
type NodeConfig struct {
Name string
Expand All @@ -36,6 +43,7 @@ type NodeConfig struct {
func DefaultNodeConfig(name string) NodeConfig {
return NodeConfig{
Name: name,
Labels: map[string]string{},
CPUs: "8",
Memory: "16Gi",
GPUs: 4,
Expand Down Expand Up @@ -125,22 +133,25 @@ func CreatePodObject(namespace, name string, resources corev1.ResourceRequiremen
return pod
}

func GroupPods(ctx context.Context, c client.Client, queueName, podgroupName string, pods []*corev1.Pod, minMember int32) error {
func GroupPods(ctx context.Context, c client.Client, podGroupConfig podGroupConfig, pods []*corev1.Pod) error {
if len(pods) == 0 {
return fmt.Errorf("no pods to group")
}

podgroup := &schedulingv2alpha2.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: podgroupName,
Name: podGroupConfig.podgroupName,
Namespace: pods[0].Namespace,
},
Spec: schedulingv2alpha2.PodGroupSpec{
Queue: queueName,
MinMember: minMember,
Queue: podGroupConfig.queueName,
MinMember: podGroupConfig.minMember,
MarkUnschedulable: ptr.To(true),
},
}
if podGroupConfig.topologyConstraint != nil {
podgroup.Spec.TopologyConstraint = *podGroupConfig.topologyConstraint
}
err := c.Create(ctx, podgroup, &client.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create podgroup: %w", err)
Expand All @@ -151,7 +162,7 @@ func GroupPods(ctx context.Context, c client.Client, queueName, podgroupName str
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[commonconsts.PodGroupAnnotationForPod] = podgroupName
pod.Annotations[commonconsts.PodGroupAnnotationForPod] = podGroupConfig.podgroupName
err = c.Patch(ctx, pod, client.MergeFrom(originalPod))
if err != nil {
return fmt.Errorf("failed to patch pod with podgroup annotation: %w", err)
Expand Down
15 changes: 15 additions & 0 deletions pkg/scheduler/plugins/topology/topology_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func (t *topologyPlugin) initializeTopologyTree(topologies []*kueuev1alpha1.Topo
}

func (*topologyPlugin) addNodeDataToTopology(topologyTree *TopologyInfo, singleTopology *kueuev1alpha1.Topology, nodeInfo *node_info.NodeInfo) {
// Validate that the node is part of the topology
if !isNodePartOfTopology(nodeInfo, singleTopology) {
return
}

var nodeContainingChildDomain *TopologyDomainInfo
for levelIndex := len(singleTopology.Spec.Levels) - 1; levelIndex >= 0; levelIndex-- {
level := singleTopology.Spec.Levels[levelIndex]
Expand Down Expand Up @@ -105,4 +110,14 @@ func (*topologyPlugin) addNodeDataToTopology(topologyTree *TopologyInfo, singleT
topologyTree.Root.AddNode(nodeInfo)
}

// For a given node to be part of the topology correctly, it must have a label for each level of the topology
func isNodePartOfTopology(nodeInfo *node_info.NodeInfo, singleTopology *kueuev1alpha1.Topology) bool {
for _, level := range singleTopology.Spec.Levels {
if _, found := nodeInfo.Node.Labels[level.NodeLabel]; !found {
return false
}
}
return true
}

func (t *topologyPlugin) OnSessionClose(ssn *framework.Session) {}
Loading