Skip to content

Commit 7886910

Browse files
Add new job-id annotation to assign globally unique job index to each job (#650)
* add job-id annotation * update unit tests * change name to job global index
1 parent ec39730 commit 7886910

File tree

3 files changed

+130
-2
lines changed

3 files changed

+130
-2
lines changed

api/jobset/v1alpha2/jobset_types.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,15 @@ const (
2424
ReplicatedJobReplicas string = "jobset.sigs.k8s.io/replicatedjob-replicas"
2525
// ReplicatedJobNameKey is used to index into a Jobs labels and retrieve the name of the parent ReplicatedJob
2626
ReplicatedJobNameKey string = "jobset.sigs.k8s.io/replicatedjob-name"
27-
JobIndexKey string = "jobset.sigs.k8s.io/job-index"
28-
JobKey string = "jobset.sigs.k8s.io/job-key"
27+
// JobIndexKey is a label/annotation set to the index of the Job replica within its parent replicatedJob.
28+
// For each replicatedJob, this value will range from 0 to replicas-1, where `replicas`
29+
// is equal to jobset.spec.replicatedJobs[*].replicas.
30+
JobIndexKey string = "jobset.sigs.k8s.io/job-index"
31+
// JobGlobalIndexKey is a label/annotation set to an integer that is unique across the entire JobSet.
32+
// For each JobSet, this value will range from 0 to N-1, where N=total number of jobs in the jobset.
33+
JobGlobalIndexKey string = "jobset.sigs.k8s.io/job-global-index"
34+
// JobKey holds the SHA256 hash of the namespaced job name, which can be used to uniquely identify the job.
35+
JobKey string = "jobset.sigs.k8s.io/job-key"
2936
// ExclusiveKey is an annotation that can be set on the JobSet or on a ReplicatedJob template.
3037
// If set at the JobSet level, all child jobs from all ReplicatedJobs will be scheduled using exclusive
3138
// job placement per topology group (defined as the label value).

pkg/controllers/jobset_controller.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
730730
labels[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
731731
labels[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
732732
labels[jobset.JobKey] = jobHashKey(js.Namespace, jobName)
733+
labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjob.Name, jobIdx)
733734

734735
// Set annotations on the object.
735736
annotations := collections.CloneMap(obj.GetAnnotations())
@@ -739,6 +740,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
739740
annotations[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
740741
annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
741742
annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName)
743+
annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjob.Name, jobIdx)
742744

743745
// Apply coordinator annotation/label if a coordinator is defined in the JobSet spec.
744746
if js.Spec.Coordinator != nil {
@@ -1032,3 +1034,32 @@ func exclusiveConditions(cond1, cond2 metav1.Condition) bool {
10321034
func coordinatorEndpoint(js *jobset.JobSet) string {
10331035
return fmt.Sprintf("%s-%s-%d-%d.%s", js.Name, js.Spec.Coordinator.ReplicatedJob, js.Spec.Coordinator.JobIndex, js.Spec.Coordinator.PodIndex, GetSubdomain(js))
10341036
}
1037+
1038+
// globalJobIndex determines the job global index for a given job. The job global index is a unique
1039+
// global index for the job, with values ranging from 0 to N-1,
1040+
// where N=total number of jobs in the jobset. The job global index is calculated by
1041+
// iterating through the replicatedJobs in the order, as defined in the JobSet
1042+
// spec, keeping a cumulative sum of total replicas seen so far, then when we
1043+
// arrive at the parent replicatedJob of the target job, we add the local job
1044+
// index to our running sum of total jobs seen so far, in order to arrive at
1045+
// the final job global index value.
1046+
//
1047+
// Below is a diagram illustrating how job global indexs differ from job indexes.
1048+
//
1049+
// | my-jobset |
1050+
// | replicated job A | replicated job B |
1051+
// | job index 0 | job index 1 | job index 0 | job index 1 |
1052+
// | global index 0 | global index 2 | global index 3 | global index 4 |
1053+
//
1054+
// Returns an empty string if the parent replicated Job does not exist,
1055+
// although this should never happen in practice.
1056+
func globalJobIndex(js *jobset.JobSet, replicatedJobName string, jobIdx int) string {
1057+
currTotalJobs := 0
1058+
for _, rjob := range js.Spec.ReplicatedJobs {
1059+
if rjob.Name == replicatedJobName {
1060+
return strconv.Itoa(currTotalJobs + jobIdx)
1061+
}
1062+
currTotalJobs += int(rjob.Replicas)
1063+
}
1064+
return ""
1065+
}

pkg/controllers/jobset_controller_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,15 @@ func TestConstructJobsFromTemplate(t *testing.T) {
686686

687687
for _, tc := range tests {
688688
t.Run(tc.name, func(t *testing.T) {
689+
// Here we update the expected Jobs with certain features which require
690+
// direct access to the JobSet object itself to calculate. For example,
691+
// the `jobset.sigs.k8s.io/job-global-index` annotation requires access to the
692+
// full JobSet spec to calculate a unique ID for each Job.
693+
for _, expectedJob := range tc.want {
694+
addJobGlobalIndex(t, tc.js, expectedJob)
695+
}
696+
697+
// Now get the actual output of constructJobsFromTemplate, and diff the results.
689698
var got []*batchv1.Job
690699
for _, rjob := range tc.js.Spec.ReplicatedJobs {
691700
jobs := constructJobsFromTemplate(tc.js, &rjob, tc.ownedJobs)
@@ -699,6 +708,26 @@ func TestConstructJobsFromTemplate(t *testing.T) {
699708
}
700709
}
701710

711+
// addJobGlobalIndex modifies the Job object in place by adding
712+
// the `jobset.sigs.k8s.io/job-global-index` label/annotation to both the
713+
// Job itself and the Job template spec.`
714+
func addJobGlobalIndex(t *testing.T, js *jobset.JobSet, job *batchv1.Job) {
715+
t.Helper()
716+
717+
rjobName := job.Annotations[jobset.ReplicatedJobNameKey]
718+
jobIdx, err := strconv.Atoi(job.Annotations[jobset.JobIndexKey])
719+
if err != nil {
720+
t.Fatalf("invalid test case: %v", err)
721+
}
722+
// Job label/annotation
723+
job.Labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)
724+
job.Annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)
725+
726+
// Job template spec label/annotation
727+
job.Spec.Template.Labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)
728+
job.Spec.Template.Annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)
729+
}
730+
702731
func TestUpdateConditions(t *testing.T) {
703732
var (
704733
jobSetName = "test-jobset"
@@ -1381,3 +1410,64 @@ func TestCreateHeadlessSvcIfNecessary(t *testing.T) {
13811410
})
13821411
}
13831412
}
1413+
1414+
func TestGlobalJobIndex(t *testing.T) {
1415+
tests := []struct {
1416+
name string
1417+
jobSet *jobset.JobSet
1418+
replicatedJob string
1419+
jobIdx int
1420+
expectedJobGlobalIndex string
1421+
}{
1422+
{
1423+
name: "single replicated job",
1424+
jobSet: &jobset.JobSet{
1425+
Spec: jobset.JobSetSpec{
1426+
ReplicatedJobs: []jobset.ReplicatedJob{
1427+
{Name: "rjob", Replicas: 3},
1428+
},
1429+
},
1430+
},
1431+
replicatedJob: "rjob",
1432+
jobIdx: 1,
1433+
expectedJobGlobalIndex: "1",
1434+
},
1435+
{
1436+
name: "multiple replicated jobs",
1437+
jobSet: &jobset.JobSet{
1438+
Spec: jobset.JobSetSpec{
1439+
ReplicatedJobs: []jobset.ReplicatedJob{
1440+
{Name: "rjob1", Replicas: 2},
1441+
{Name: "rjob2", Replicas: 4},
1442+
{Name: "rjob3", Replicas: 1},
1443+
},
1444+
},
1445+
},
1446+
replicatedJob: "rjob2",
1447+
jobIdx: 3,
1448+
expectedJobGlobalIndex: "5",
1449+
},
1450+
{
1451+
name: "replicated job not found",
1452+
jobSet: &jobset.JobSet{
1453+
Spec: jobset.JobSetSpec{
1454+
ReplicatedJobs: []jobset.ReplicatedJob{
1455+
{Name: "rjob1", Replicas: 2},
1456+
},
1457+
},
1458+
},
1459+
replicatedJob: "rjob2",
1460+
jobIdx: 0,
1461+
expectedJobGlobalIndex: "",
1462+
},
1463+
}
1464+
1465+
for _, tc := range tests {
1466+
t.Run(tc.name, func(t *testing.T) {
1467+
actualJobGlobalIndex := globalJobIndex(tc.jobSet, tc.replicatedJob, tc.jobIdx)
1468+
if diff := cmp.Diff(tc.expectedJobGlobalIndex, actualJobGlobalIndex); diff != "" {
1469+
t.Errorf("unexpected global job index (-want/+got): %s", diff)
1470+
}
1471+
})
1472+
}
1473+
}

0 commit comments

Comments
 (0)