@@ -23,6 +23,7 @@ import (
2323 . "github.com/onsi/ginkgo/v2"
2424 . "github.com/onsi/gomega"
2525 v1 "k8s.io/api/core/v1"
26+ "k8s.io/apimachinery/pkg/api/resource"
2627 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728)
2829
@@ -70,36 +71,47 @@ var _ = Describe("Topology", Ordered, func() {
7071
7172 It ("required only - rack level" , func (ctx context.Context ) {
7273 namespace := queue .GetConnectedNamespaceToQueue (testCtx .Queues [0 ])
73- queueName := testCtx .Queues [0 ].Name
74+ topologyConstraint := v2alpha2.TopologyConstraint {
75+ RequiredTopologyLevel : rd .TestRackLabelKey ,
76+ Topology : "e2e-topology-tree" ,
77+ }
7478
75- podCount := 2
7679 gpusPerNode := testTopologyData .TopologyNodes [gpuNodesNames [0 ]].
7780 Status .Allocatable [v1 .ResourceName (constants .GpuResource )]
7881 podResource := v1.ResourceList {
7982 v1 .ResourceName (constants .GpuResource ): gpusPerNode ,
8083 }
8184
82- podGroup := pod_group .Create (
83- namespace , "distributed-pod-group" + utils .GenerateRandomK8sName (10 ), queueName )
84- podGroup .Spec .MinMember = int32 (podCount )
85- podGroup .Spec .TopologyConstraint = v2alpha2.TopologyConstraint {
86- RequiredTopologyLevel : rd .TestRackLabelKey ,
87- Topology : "e2e-topology-tree" ,
85+ pods := createDistributedWorkload (ctx , testCtx , 2 , podResource , topologyConstraint )
86+ wait .ForPodsScheduled (ctx , testCtx .ControllerClient , namespace , pods )
87+
88+ // Validate that all the pods have been scheduled to the same rack
89+ podList , err := testCtx .KubeClientset .CoreV1 ().Pods (namespace ).List (ctx , metav1.ListOptions {})
90+ Expect (err ).NotTo (HaveOccurred (), "Failed to list pods" )
91+
92+ scheduledRacks := map [string ][]string {}
93+ for _ , pod := range podList .Items {
94+ podRack := testTopologyData .TopologyNodes [pod .Spec .NodeName ].Labels [rd .TestRackLabelKey ]
95+ scheduledRacks [podRack ] = append (scheduledRacks [podRack ], pod .Name )
96+ }
97+
98+ Expect (len (scheduledRacks )).To (Equal (1 ), "Expected all pods scheduled to one rack, got %v" , scheduledRacks )
99+ })
100+
101+ It ("preferred only - rack level" , func (ctx context.Context ) {
102+ namespace := queue .GetConnectedNamespaceToQueue (testCtx .Queues [0 ])
103+ topologyConstraint := v2alpha2.TopologyConstraint {
104+ PreferredTopologyLevel : rd .TestRackLabelKey ,
105+ Topology : "e2e-topology-tree" ,
88106 }
89107
90- pods := []* v1.Pod {}
91-
92- Expect (testCtx .ControllerClient .Create (ctx , podGroup )).To (Succeed ())
93- for i := 0 ; i < podCount ; i ++ {
94- pod := rd .CreatePodObject (testCtx .Queues [0 ], v1.ResourceRequirements {Requests : podResource , Limits : podResource })
95- pod .Name = "distributed-pod-" + utils .GenerateRandomK8sName (10 )
96- pod .Annotations [pod_group .PodGroupNameAnnotation ] = podGroup .Name
97- pod .Labels [pod_group .PodGroupNameAnnotation ] = podGroup .Name
98- _ , err := rd .CreatePod (ctx , testCtx .KubeClientset , pod )
99- Expect (err ).To (Succeed ())
100- pods = append (pods , pod )
108+ gpusPerNode := testTopologyData .TopologyNodes [gpuNodesNames [0 ]].
109+ Status .Allocatable [v1 .ResourceName (constants .GpuResource )]
110+ podResource := v1.ResourceList {
111+ v1 .ResourceName (constants .GpuResource ): gpusPerNode ,
101112 }
102113
114+ pods := createDistributedWorkload (ctx , testCtx , 2 , podResource , topologyConstraint )
103115 wait .ForPodsScheduled (ctx , testCtx .ControllerClient , namespace , pods )
104116
105117 // Validate that all the pods have been scheduled to the same rack
@@ -114,6 +126,69 @@ var _ = Describe("Topology", Ordered, func() {
114126
115127 Expect (len (scheduledRacks )).To (Equal (1 ), "Expected all pods scheduled to one rack, got %v" , scheduledRacks )
116128 })
129+
130+ It ("required rack and preferred node - all pods in a single node" , func (ctx context.Context ) {
131+ namespace := queue .GetConnectedNamespaceToQueue (testCtx .Queues [0 ])
132+ topologyConstraint := v2alpha2.TopologyConstraint {
133+ RequiredTopologyLevel : rd .TestRackLabelKey ,
134+ PreferredTopologyLevel : rd .NodeNameLabelKey ,
135+ Topology : "e2e-topology-tree" ,
136+ }
137+
138+ gpusPerNode := testTopologyData .TopologyNodes [gpuNodesNames [0 ]].
139+ Status .Allocatable [v1 .ResourceName (constants .GpuResource )]
140+ halfGpusPerNode := int64 (gpusPerNode .AsFloat64Slow () / 2 )
141+ podResource := v1.ResourceList {
142+ v1 .ResourceName (constants .GpuResource ): * resource .NewQuantity (halfGpusPerNode , resource .DecimalSI ),
143+ }
144+
145+ pods := createDistributedWorkload (ctx , testCtx , 2 , podResource , topologyConstraint )
146+ wait .ForPodsScheduled (ctx , testCtx .ControllerClient , namespace , pods )
147+
148+ // Validate that all the pods have been scheduled to the same rack
149+ podList , err := testCtx .KubeClientset .CoreV1 ().Pods (namespace ).List (ctx , metav1.ListOptions {})
150+ Expect (err ).NotTo (HaveOccurred (), "Failed to list pods" )
151+
152+ scheduledNodes := map [string ][]string {}
153+ for _ , pod := range podList .Items {
154+ scheduledNodes [pod .Spec .NodeName ] = append (scheduledNodes [pod .Spec .NodeName ], pod .Name )
155+ }
156+
157+ Expect (len (scheduledNodes )).To (Equal (1 ), "Expected all pods scheduled to one node, got %v" , scheduledNodes )
158+ })
159+
160+ It ("required rack and preferred node - all pods in a rack" , func (ctx context.Context ) {
161+ namespace := queue .GetConnectedNamespaceToQueue (testCtx .Queues [0 ])
162+ topologyConstraint := v2alpha2.TopologyConstraint {
163+ RequiredTopologyLevel : rd .TestRackLabelKey ,
164+ PreferredTopologyLevel : rd .NodeNameLabelKey ,
165+ Topology : "e2e-topology-tree" ,
166+ }
167+
168+ gpusPerNode := testTopologyData .TopologyNodes [gpuNodesNames [0 ]].
169+ Status .Allocatable [v1 .ResourceName (constants .GpuResource )]
170+ podResource := v1.ResourceList {
171+ v1 .ResourceName (constants .GpuResource ): gpusPerNode ,
172+ }
173+
174+ pods := createDistributedWorkload (ctx , testCtx , 2 , podResource , topologyConstraint )
175+ wait .ForPodsScheduled (ctx , testCtx .ControllerClient , namespace , pods )
176+
177+ // Validate that all the pods have been scheduled to the same rack
178+ podList , err := testCtx .KubeClientset .CoreV1 ().Pods (namespace ).List (ctx , metav1.ListOptions {})
179+ Expect (err ).NotTo (HaveOccurred (), "Failed to list pods" )
180+
181+ scheduledNodes := map [string ][]string {}
182+ scheduledRacks := map [string ][]string {}
183+ for _ , pod := range podList .Items {
184+ scheduledNodes [pod .Spec .NodeName ] = append (scheduledNodes [pod .Spec .NodeName ], pod .Name )
185+ podRack := testTopologyData .TopologyNodes [pod .Spec .NodeName ].Labels [rd .TestRackLabelKey ]
186+ scheduledRacks [podRack ] = append (scheduledRacks [podRack ], pod .Name )
187+ }
188+
189+ Expect (len (scheduledNodes )).To (BeNumerically (">" , 1 ), "Expected all pods scheduled to one more then one node, got %v" , scheduledNodes )
190+ Expect (len (scheduledRacks )).To (Equal (1 ), "Expected all pods scheduled to the same rack, got %v" , scheduledRacks )
191+ })
117192 }, MustPassRepeatedly (3 ))
118193
119194 Context ("Empty context to jump over ginkgo bug" , func () {
@@ -122,3 +197,27 @@ var _ = Describe("Topology", Ordered, func() {
122197 })
123198 })
124199}, Ordered )
200+
201+ func createDistributedWorkload (ctx context.Context , testCtx * testcontext.TestContext ,
202+ podCount int , podResource v1.ResourceList , topologyConstraint v2alpha2.TopologyConstraint ) []* v1.Pod {
203+ namespace := queue .GetConnectedNamespaceToQueue (testCtx .Queues [0 ])
204+ queueName := testCtx .Queues [0 ].Name
205+
206+ podGroup := pod_group .Create (namespace , "distributed-pod-group" + utils .GenerateRandomK8sName (10 ), queueName )
207+ podGroup .Spec .MinMember = int32 (podCount )
208+ podGroup .Spec .TopologyConstraint = topologyConstraint
209+
210+ pods := []* v1.Pod {}
211+ Expect (testCtx .ControllerClient .Create (ctx , podGroup )).To (Succeed ())
212+ for i := 0 ; i < podCount ; i ++ {
213+ pod := rd .CreatePodObject (testCtx .Queues [0 ], v1.ResourceRequirements {Requests : podResource , Limits : podResource })
214+ pod .Name = "distributed-pod-" + utils .GenerateRandomK8sName (10 )
215+ pod .Annotations [pod_group .PodGroupNameAnnotation ] = podGroup .Name
216+ pod .Labels [pod_group .PodGroupNameAnnotation ] = podGroup .Name
217+ _ , err := rd .CreatePod (ctx , testCtx .KubeClientset , pod )
218+ Expect (err ).To (Succeed ())
219+ pods = append (pods , pod )
220+ }
221+
222+ return pods
223+ }
0 commit comments