@@ -27,6 +27,7 @@ import (
2727 corev1 "k8s.io/api/core/v1"
2828 "k8s.io/apimachinery/pkg/api/resource"
2929 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+ "sigs.k8s.io/kueue/apis/kueue/v1beta1"
3031
3132 . "github.com/opendatahub-io/distributed-workloads/tests/common"
3233 . "github.com/opendatahub-io/distributed-workloads/tests/common/support"
@@ -82,10 +83,65 @@ func runKFTOPyTorchMnistJob(t *testing.T, accelerator Accelerator, image string,
8283 "requirements.txt" : requirementsFileName ,
8384 })
8485
86+ // Create Kueue resources
87+ resourceFlavor := CreateKueueResourceFlavor (test , v1beta1.ResourceFlavorSpec {})
88+ defer test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Delete (test .Ctx (), resourceFlavor .Name , metav1.DeleteOptions {})
89+ cqSpec := v1beta1.ClusterQueueSpec {
90+ NamespaceSelector : & metav1.LabelSelector {},
91+ ResourceGroups : []v1beta1.ResourceGroup {
92+ {
93+ CoveredResources : []corev1.ResourceName {corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" )},
94+ Flavors : []v1beta1.FlavorQuotas {
95+ {
96+ Name : v1beta1 .ResourceFlavorReference (resourceFlavor .Name ),
97+ Resources : []v1beta1.ResourceQuota {
98+ {
99+ Name : corev1 .ResourceCPU ,
100+ NominalQuota : resource .MustParse ("8" ),
101+ },
102+ {
103+ Name : corev1 .ResourceMemory ,
104+ NominalQuota : resource .MustParse ("18Gi" ),
105+ },
106+ },
107+ },
108+ },
109+ },
110+ },
111+ }
112+
113+ if accelerator .IsGpu () {
114+ numGpus := (workerReplicas + 1 ) * numProcPerNode
115+ cqSpec .ResourceGroups [0 ].CoveredResources = append (
116+ cqSpec .ResourceGroups [0 ].CoveredResources ,
117+ corev1 .ResourceName (accelerator .ResourceLabel ),
118+ )
119+ cqSpec .ResourceGroups [0 ].Flavors [0 ].Resources = append (
120+ cqSpec .ResourceGroups [0 ].Flavors [0 ].Resources ,
121+ v1beta1.ResourceQuota {
122+ Name : corev1 .ResourceName (accelerator .ResourceLabel ),
123+ NominalQuota : resource .MustParse (fmt .Sprint (numGpus )),
124+ },
125+ )
126+ }
127+
128+ clusterQueue := CreateKueueClusterQueue (test , cqSpec )
129+ defer test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Delete (test .Ctx (), clusterQueue .Name , metav1.DeleteOptions {})
130+ localQueue := CreateKueueLocalQueue (test , namespace .Name , clusterQueue .Name , AsDefaultQueue )
131+
85132 // Create training PyTorch job
86- tuningJob := createKFTOPyTorchMnistJob (test , namespace .Name , * config , accelerator , workerReplicas , numProcPerNode , image )
133+ tuningJob := createKFTOPyTorchMnistJob (test , namespace .Name , * config , accelerator , workerReplicas , numProcPerNode , image , localQueue )
87134 defer test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace .Name ).Delete (test .Ctx (), tuningJob .Name , * metav1 .NewDeleteOptions (0 ))
88135
136+ // Make sure the Workload is created and running
137+ test .Eventually (GetKueueWorkloads (test , namespace .Name ), TestTimeoutMedium ).
138+ Should (
139+ And (
140+ HaveLen (1 ),
141+ ContainElement (WithTransform (KueueWorkloadAdmitted , BeTrueBecause ("Workload failed to be admitted" ))),
142+ ),
143+ )
144+
89145 // Make sure the PyTorch job is running
90146 test .Eventually (PyTorchJob (test , namespace .Name , tuningJob .Name ), TestTimeoutDouble ).
91147 Should (WithTransform (PyTorchJobConditionRunning , Equal (corev1 .ConditionTrue )))
@@ -96,7 +152,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, accelerator Accelerator, image string,
96152
97153}
98154
99- func createKFTOPyTorchMnistJob (test Test , namespace string , config corev1.ConfigMap , accelerator Accelerator , workerReplicas int , numProcPerNode int , baseImage string ) * kftov1.PyTorchJob {
155+ func createKFTOPyTorchMnistJob (test Test , namespace string , config corev1.ConfigMap , accelerator Accelerator , workerReplicas int , numProcPerNode int , baseImage string , localQueue * v1beta1. LocalQueue ) * kftov1.PyTorchJob {
100156 var backend string
101157 if accelerator .IsGpu () {
102158 backend = "nccl"
@@ -117,6 +173,9 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
117173 },
118174 ObjectMeta : metav1.ObjectMeta {
119175 GenerateName : "kfto-mnist-" ,
176+ Labels : map [string ]string {
177+ "kueue.x-k8s.io/queue-name" : localQueue .Name ,
178+ },
120179 },
121180 Spec : kftov1.PyTorchJobSpec {
122181 PyTorchReplicaSpecs : map [kftov1.ReplicaType ]* kftov1.ReplicaSpec {
@@ -177,11 +236,11 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
177236 Resources : corev1.ResourceRequirements {
178237 Requests : corev1.ResourceList {
179238 corev1 .ResourceCPU : resource .MustParse (fmt .Sprintf ("%d" , numProcPerNode )),
180- corev1 .ResourceMemory : resource .MustParse ("6Gi " ),
239+ corev1 .ResourceMemory : resource .MustParse ("4Gi " ),
181240 },
182241 Limits : corev1.ResourceList {
183242 corev1 .ResourceCPU : resource .MustParse (fmt .Sprintf ("%d" , numProcPerNode )),
184- corev1 .ResourceMemory : resource .MustParse ("6Gi " ),
243+ corev1 .ResourceMemory : resource .MustParse ("4Gi " ),
185244 },
186245 },
187246 },
@@ -273,11 +332,11 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
273332 Resources : corev1.ResourceRequirements {
274333 Requests : corev1.ResourceList {
275334 corev1 .ResourceCPU : resource .MustParse (fmt .Sprintf ("%d" , numProcPerNode )),
276- corev1 .ResourceMemory : resource .MustParse ("6Gi " ),
335+ corev1 .ResourceMemory : resource .MustParse ("4Gi " ),
277336 },
278337 Limits : corev1.ResourceList {
279338 corev1 .ResourceCPU : resource .MustParse (fmt .Sprintf ("%d" , numProcPerNode )),
280- corev1 .ResourceMemory : resource .MustParse ("6Gi " ),
339+ corev1 .ResourceMemory : resource .MustParse ("4Gi " ),
281340 },
282341 },
283342 },
0 commit comments