Skip to content

Commit f872e2f

Browse files
committed
Use peek to consider multiple jobs
1 parent 1518e6d commit f872e2f

File tree

3 files changed

+38
-4
lines changed

3 files changed

+38
-4
lines changed

pkg/scheduler/actions/common/solvers/pod_scenario_builder.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,5 +113,7 @@ func (asb *PodAccumulatedScenarioBuilder) GetNextScenario() *solverscenario.ByNo
113113
}
114114
}
115115

116+
asb.victimsJobsQueue.ResetQueuePopsMap()
117+
116118
return asb.lastScenario
117119
}

pkg/scheduler/actions/utils/job_order_by_queue.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type JobsOrderByQueues struct {
2929
departmentIdToDepartmentMetadata map[common_info.QueueID]*departmentMetadata
3030
ssn *framework.Session
3131
jobsOrderInitOptions JobsOrderInitOptions
32+
queuePopsMap map[common_info.QueueID]int
3233
}
3334

3435
func NewJobsOrderByQueues(ssn *framework.Session, options JobsOrderInitOptions) JobsOrderByQueues {
@@ -37,9 +38,14 @@ func NewJobsOrderByQueues(ssn *framework.Session, options JobsOrderInitOptions)
3738
queueIdToQueueMetadata: map[common_info.QueueID]*jobsQueueMetadata{},
3839
departmentIdToDepartmentMetadata: map[common_info.QueueID]*departmentMetadata{},
3940
jobsOrderInitOptions: options,
41+
queuePopsMap: map[common_info.QueueID]int{},
4042
}
4143
}
4244

45+
func (jobsOrder *JobsOrderByQueues) ResetQueuePopsMap() {
46+
jobsOrder.queuePopsMap = map[common_info.QueueID]int{}
47+
}
48+
4349
func (jobsOrder *JobsOrderByQueues) IsEmpty() bool {
4450
return jobsOrder.activeDepartments == nil || jobsOrder.activeDepartments.Empty()
4551
}
@@ -71,6 +77,12 @@ func (jobsOrder *JobsOrderByQueues) PopNextJob() *podgroup_info.PodGroupInfo {
7177
job := jobsOrder.queueIdToQueueMetadata[queue.UID].jobsInQueue.Pop().(*podgroup_info.PodGroupInfo)
7278
jobsOrder.updateQueuePriorityQueue(queue, department)
7379
jobsOrder.updateDepartmentPriorityQueue(department)
80+
81+
if _, found := jobsOrder.queuePopsMap[queue.UID]; !found {
82+
jobsOrder.queuePopsMap[queue.UID] = 0
83+
}
84+
jobsOrder.queuePopsMap[queue.UID]++
85+
7486
log.InfraLogger.V(7).Infof("Popped job: %v", job.Name)
7587
return job
7688
}
@@ -237,8 +249,18 @@ func (jobsOrder *JobsOrderByQueues) buildFuncOrderBetweenQueuesWithJobs(jobsQueu
237249
return reverseOrder // When l has higher priority, return false
238250
}
239251

240-
lJobInfosRaw := jobsQueueMetadataPerQueue[lQueue.UID].jobsInQueue.Peek(1)
241-
rJobInfosRaw := jobsQueueMetadataPerQueue[rQueue.UID].jobsInQueue.Peek(1)
252+
lJobsToPop := 1
253+
if jobsOrder.queuePopsMap[lQueue.UID] > 0 {
254+
lJobsToPop = jobsOrder.queuePopsMap[lQueue.UID]
255+
}
256+
257+
rJobsToPop := 1
258+
if jobsOrder.queuePopsMap[rQueue.UID] > 0 {
259+
rJobsToPop = jobsOrder.queuePopsMap[rQueue.UID]
260+
}
261+
262+
lJobInfosRaw := jobsQueueMetadataPerQueue[lQueue.UID].jobsInQueue.Peek(lJobsToPop)
263+
rJobInfosRaw := jobsQueueMetadataPerQueue[rQueue.UID].jobsInQueue.Peek(rJobsToPop)
242264

243265
lJobInfos := podgroup_info.NewPodGroupInfos(lJobInfosRaw)
244266
rJobInfos := podgroup_info.NewPodGroupInfos(rJobInfosRaw)
@@ -266,9 +288,17 @@ func (jobsOrder *JobsOrderByQueues) buildFuncOrderBetweenDepartmentsWithJobs(rev
266288
lBestQueue := jobsOrder.departmentIdToDepartmentMetadata[lDepartment.UID].queuesPriorityQueue.Pop().(*queue_info.QueueInfo)
267289
rBestQueue := jobsOrder.departmentIdToDepartmentMetadata[rDepartment.UID].queuesPriorityQueue.Pop().(*queue_info.QueueInfo)
268290

269-
lJobInfosRaw := jobsOrder.queueIdToQueueMetadata[lBestQueue.UID].jobsInQueue.Peek(1)
270-
rJobInfosRaw := jobsOrder.queueIdToQueueMetadata[rBestQueue.UID].jobsInQueue.Peek(1)
291+
lJobsToPop := 1
292+
if jobsOrder.queuePopsMap[lBestQueue.UID] > 0 {
293+
lJobsToPop = jobsOrder.queuePopsMap[lBestQueue.UID]
294+
}
271295

296+
rJobsToPop := 1
297+
if jobsOrder.queuePopsMap[rBestQueue.UID] > 0 {
298+
rJobsToPop = jobsOrder.queuePopsMap[rBestQueue.UID]
299+
}
300+
lJobInfosRaw := jobsOrder.queueIdToQueueMetadata[lBestQueue.UID].jobsInQueue.Peek(lJobsToPop)
301+
rJobInfosRaw := jobsOrder.queueIdToQueueMetadata[rBestQueue.UID].jobsInQueue.Peek(rJobsToPop)
272302
lJobInfos := podgroup_info.NewPodGroupInfos(lJobInfosRaw)
273303
rJobInfos := podgroup_info.NewPodGroupInfos(rJobInfosRaw)
274304

pkg/scheduler/actions/utils/job_order_by_queue_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
360360
departmentIdToDepartmentMetadata: tt.fields.departmentIdToDepartmentMetadata,
361361
ssn: ssn,
362362
jobsOrderInitOptions: tt.fields.jobsOrderInitOptions,
363+
queuePopsMap: map[common_info.QueueID]int{},
363364
}
364365
jobsOrder.InitializeWithJobs(tt.fields.InsertedJob)
365366
jobsOrder.PushJob(tt.args.job)
@@ -457,6 +458,7 @@ func TestJobsOrderByQueues_RequeueJob(t *testing.T) {
457458
departmentIdToDepartmentMetadata: tt.fields.departmentIdToDepartmentMetadata,
458459
ssn: ssn,
459460
jobsOrderInitOptions: tt.fields.jobsOrderInitOptions,
461+
queuePopsMap: map[common_info.QueueID]int{},
460462
}
461463
jobsOrder.InitializeWithJobs(tt.fields.InsertedJob)
462464

0 commit comments

Comments
 (0)