Skip to content

Commit 9cb030b

Browse files
authored
Propagate schedulingGates set on PodTemplate when resuming JobSet (#706)
1 parent 886b1a6 commit 9cb030b

File tree

2 files changed

+76
-0
lines changed

2 files changed

+76
-0
lines changed

pkg/controllers/jobset_controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,10 @@ func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, repl
478478
job.Spec.Template.Spec.Tolerations,
479479
replicatedJobPodTemplate.Spec.Tolerations,
480480
)
481+
job.Spec.Template.Spec.SchedulingGates = collections.MergeSlices(
482+
job.Spec.Template.Spec.SchedulingGates,
483+
replicatedJobPodTemplate.Spec.SchedulingGates,
484+
)
481485
} else {
482486
log.Error(nil, "job missing ReplicatedJobName label")
483487
}

test/e2e/e2e_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/types"
2727
"k8s.io/utils/ptr"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
2829

2930
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
3031
"sigs.k8s.io/jobset/pkg/util/testing"
@@ -198,6 +199,77 @@ var _ = ginkgo.Describe("JobSet", func() {
198199
})
199200
})
200201

202+
// This test shows that when a JobSet is resumed it allows to add a
203+
// scheduling gate and propagates it down to Pods. This scenario is needed
204+
// for the integration with Kueue, to support TopologyAwareScheduling (TAS),
205+
// which adds the kueue.x-k8s.io/topology scheduling gate to control
206+
// assignment of Pods to the topology domains.
207+
ginkgo.When("JobSet is resumed is propagates scheduling gates to Pods", func() {
208+
209+
ginkgo.It("should allow to add schedulingGates to PodTemplate while resuming", func() {
210+
ctx := context.Background()
211+
js := sleepTestJobSet(ns, 1).Obj()
212+
jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace}
213+
const (
214+
schedulingGateName = "example.com/gate"
215+
)
216+
217+
ginkgo.By("Create a suspended JobSet", func() {
218+
js.Spec.Suspend = ptr.To(true)
219+
gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed())
220+
})
221+
222+
ginkgo.By("Resume the JobSet and set schedulingGates", func() {
223+
gomega.Eventually(func(g gomega.Gomega) {
224+
g.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
225+
js.Spec.Suspend = ptr.To(false)
226+
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
227+
podTemplate.Spec.SchedulingGates = append(podTemplate.Spec.SchedulingGates, corev1.PodSchedulingGate{
228+
Name: schedulingGateName,
229+
})
230+
g.Expect(k8sClient.Update(ctx, js)).Should(gomega.Succeed())
231+
}, timeout, interval).Should(gomega.Succeed())
232+
})
233+
234+
// In this test the number of expected Pods equals the number of
235+
// expected Jobs as the Jobs don't set completions or parallelism,
236+
// so 1 Pod per Job is implied.
237+
expectedPods := util.NumExpectedJobs(js)
238+
ginkgo.By("Await for the expected number of gated pods created", func() {
239+
gomega.Eventually(func(g gomega.Gomega) {
240+
list := &corev1.PodList{}
241+
g.Expect(k8sClient.List(ctx, list, client.InNamespace(js.Namespace))).Should(gomega.Succeed())
242+
gatedCount := 0
243+
for _, p := range list.Items {
244+
if len(p.Spec.SchedulingGates) == 1 && p.Spec.SchedulingGates[0].Name == schedulingGateName {
245+
gatedCount++
246+
}
247+
}
248+
g.Expect(gatedCount).Should(gomega.Equal(expectedPods),
249+
fmt.Sprintf("expected %v gated pods, got: %v, found items: %v", expectedPods, gatedCount, list.Items))
250+
}, timeout, interval).Should(gomega.Succeed())
251+
})
252+
253+
ginkgo.By("Ungate all of the pods to let the Job run and complete", func() {
254+
gomega.Eventually(func(g gomega.Gomega) {
255+
list := &corev1.PodList{}
256+
g.Expect(k8sClient.List(ctx, list, client.InNamespace(js.Namespace))).Should(gomega.Succeed())
257+
for i := range list.Items {
258+
p := &list.Items[i]
259+
if len(p.Spec.SchedulingGates) == 1 && p.Spec.SchedulingGates[0].Name == schedulingGateName {
260+
p.Spec.SchedulingGates = nil
261+
g.Expect(k8sClient.Update(ctx, p)).Should(gomega.Succeed())
262+
}
263+
}
264+
}, timeout, interval).Should(gomega.Succeed())
265+
})
266+
267+
ginkgo.By("Await for the JobSet to complete successfully", func() {
268+
util.JobSetCompleted(ctx, k8sClient, js, timeout)
269+
})
270+
})
271+
})
272+
201273
}) // end of Describe
202274

203275
// getPingCommand returns ping command for 4 hostnames

0 commit comments

Comments
 (0)