Skip to content

Commit 837ac78

Browse files
committed
Track queue pops
1 parent f872e2f commit 837ac78

File tree

13 files changed

+87
-55
lines changed

13 files changed

+87
-55
lines changed

pkg/scheduler/actions/allocate/allocate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
3333
FilterNonPending: true,
3434
FilterUnready: true,
3535
MaxJobsQueueDepth: ssn.GetJobsDepth(framework.Allocate),
36-
})
36+
}, false)
3737
jobsOrderByQueues.InitializeWithJobs(ssn.PodGroupInfos)
3838

3939
log.InfraLogger.V(2).Infof("There are <%d> PodGroupInfos and <%d> Queues in total for scheduling",

pkg/scheduler/actions/common/action.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func GetJobsToAllocate(ssn *framework.Session, preempteeTasks []*pod_info.PodInf
6666
// add preemptor to allJobsToAllocate if it's not there
6767
allJobsToAllocate[preemptor.UID] = preemptor
6868
jobsToAllocateQueue := utils.NewJobsOrderByQueues(
69-
ssn, utils.JobsOrderInitOptions{MaxJobsQueueDepth: scheduler_util.QueueCapacityInfinite})
69+
ssn, utils.JobsOrderInitOptions{MaxJobsQueueDepth: scheduler_util.QueueCapacityInfinite}, false)
7070
jobsToAllocateQueue.InitializeWithJobs(allJobsToAllocate)
7171
return &jobsToAllocateQueue
7272
}

pkg/scheduler/actions/common/solvers/pod_scenario_builder.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,5 @@ func (asb *PodAccumulatedScenarioBuilder) GetNextScenario() *solverscenario.ByNo
113113
}
114114
}
115115

116-
asb.victimsJobsQueue.ResetQueuePopsMap()
117-
118116
return asb.lastScenario
119117
}

pkg/scheduler/actions/consolidation/consolidation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (alloc *consolidationAction) Execute(ssn *framework.Session) {
4242
FilterUnready: true,
4343
FilterNonPreemptible: true,
4444
MaxJobsQueueDepth: ssn.GetJobsDepth(framework.Consolidation),
45-
})
45+
}, false)
4646
jobsOrderByQueues.InitializeWithJobs(ssn.PodGroupInfos)
4747

4848
log.InfraLogger.V(2).Infof("There are <%d> PodGroupInfos and <%d> Queues in total for scheduling",

pkg/scheduler/actions/integration_tests/integration_tests_utils/integration_tests_utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func RunTests(t *testing.T, testsMetadata []TestTopologyMetadata) {
4949
}
5050

5151
func RunTest(t *testing.T, testMetadata TestTopologyMetadata, testNumber int, controller *Controller) {
52-
t.Logf("Running test number: %v, test name: %v", testNumber, testMetadata.Name)
52+
t.Logf("Running test number: %v, test name: %v", testNumber, testMetadata.TestTopologyBasic.Name)
5353
ssn := test_utils.BuildSession(testMetadata.TestTopologyBasic, controller)
5454

5555
runRoundsUntilMatch(testMetadata, controller, &ssn)

pkg/scheduler/actions/preempt/preempt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
3636
FilterNonPending: true,
3737
FilterUnready: true,
3838
MaxJobsQueueDepth: ssn.GetJobsDepth(framework.Preempt),
39-
})
39+
}, false)
4040
jobsOrderByQueues.InitializeWithJobs(ssn.PodGroupInfos)
4141

4242
log.InfraLogger.V(2).Infof("There are <%d> PodGroupInfos and <%d> Queues in total for scheduling",

pkg/scheduler/actions/reclaim/reclaim.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (ra *reclaimAction) Execute(ssn *framework.Session) {
3838
FilterNonPending: true,
3939
FilterUnready: true,
4040
MaxJobsQueueDepth: ssn.GetJobsDepth(framework.Reclaim),
41-
})
41+
}, false)
4242
jobsOrderByQueues.InitializeWithJobs(ssn.PodGroupInfos)
4343

4444
log.InfraLogger.V(2).Infof("There are <%d> PodGroupInfos and <%d> Queues in total for scheduling",
@@ -149,7 +149,7 @@ func getOrderedVictimsQueue(ssn *framework.Session, evictingQueue common_info.Qu
149149
FilterNonActiveAllocated: true,
150150
ReverseOrder: true,
151151
MaxJobsQueueDepth: scheduler_util.QueueCapacityInfinite,
152-
})
152+
}, true)
153153
jobs := map[common_info.PodGroupID]*podgroup_info.PodGroupInfo{}
154154
for _, job := range ssn.PodGroupInfos {
155155
if job.Queue != evictingQueue {

pkg/scheduler/actions/utils/action.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func GetVictimsQueue(
4141
victimsQueue := NewJobsOrderByQueues(ssn, JobsOrderInitOptions{
4242
ReverseOrder: true,
4343
MaxJobsQueueDepth: scheduler_util.QueueCapacityInfinite,
44-
})
44+
}, true)
4545
victimsQueue.InitializeWithJobs(preemptees)
4646
return &victimsQueue
4747
}

pkg/scheduler/actions/utils/job_order_by_queue.go

Lines changed: 67 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,21 @@ type JobsOrderByQueues struct {
2929
departmentIdToDepartmentMetadata map[common_info.QueueID]*departmentMetadata
3030
ssn *framework.Session
3131
jobsOrderInitOptions JobsOrderInitOptions
32-
queuePopsMap map[common_info.QueueID]int
32+
queuePopsMap map[common_info.QueueID][]*podgroup_info.PodGroupInfo
33+
isVictimQueue bool
3334
}
3435

35-
func NewJobsOrderByQueues(ssn *framework.Session, options JobsOrderInitOptions) JobsOrderByQueues {
36+
func NewJobsOrderByQueues(ssn *framework.Session, options JobsOrderInitOptions, isVictimQueue bool) JobsOrderByQueues {
3637
return JobsOrderByQueues{
3738
ssn: ssn,
3839
queueIdToQueueMetadata: map[common_info.QueueID]*jobsQueueMetadata{},
3940
departmentIdToDepartmentMetadata: map[common_info.QueueID]*departmentMetadata{},
4041
jobsOrderInitOptions: options,
41-
queuePopsMap: map[common_info.QueueID]int{},
42+
queuePopsMap: map[common_info.QueueID][]*podgroup_info.PodGroupInfo{},
43+
isVictimQueue: isVictimQueue,
4244
}
4345
}
4446

45-
func (jobsOrder *JobsOrderByQueues) ResetQueuePopsMap() {
46-
jobsOrder.queuePopsMap = map[common_info.QueueID]int{}
47-
}
48-
4947
func (jobsOrder *JobsOrderByQueues) IsEmpty() bool {
5048
return jobsOrder.activeDepartments == nil || jobsOrder.activeDepartments.Empty()
5149
}
@@ -75,13 +73,13 @@ func (jobsOrder *JobsOrderByQueues) PopNextJob() *podgroup_info.PodGroupInfo {
7573
}
7674

7775
job := jobsOrder.queueIdToQueueMetadata[queue.UID].jobsInQueue.Pop().(*podgroup_info.PodGroupInfo)
78-
jobsOrder.updateQueuePriorityQueue(queue, department)
79-
jobsOrder.updateDepartmentPriorityQueue(department)
80-
8176
if _, found := jobsOrder.queuePopsMap[queue.UID]; !found {
82-
jobsOrder.queuePopsMap[queue.UID] = 0
77+
jobsOrder.queuePopsMap[queue.UID] = []*podgroup_info.PodGroupInfo{}
8378
}
84-
jobsOrder.queuePopsMap[queue.UID]++
79+
jobsOrder.queuePopsMap[queue.UID] = append(jobsOrder.queuePopsMap[queue.UID], job)
80+
81+
jobsOrder.updateQueuePriorityQueue(queue, department)
82+
jobsOrder.updateDepartmentPriorityQueue(department)
8583

8684
log.InfraLogger.V(7).Infof("Popped job: %v", job.Name)
8785
return job
@@ -249,27 +247,40 @@ func (jobsOrder *JobsOrderByQueues) buildFuncOrderBetweenQueuesWithJobs(jobsQueu
249247
return reverseOrder // When l has higher priority, return false
250248
}
251249

252-
lJobsToPop := 1
253-
if jobsOrder.queuePopsMap[lQueue.UID] > 0 {
254-
lJobsToPop = jobsOrder.queuePopsMap[lQueue.UID]
250+
var lPending, rPending podgroup_info.PodGroupInfos
251+
if !jobsOrder.isVictimQueue {
252+
lPendingRaw := jobsQueueMetadataPerQueue[lQueue.UID].jobsInQueue.Peek(1)
253+
rPendingRaw := jobsQueueMetadataPerQueue[rQueue.UID].jobsInQueue.Peek(1)
254+
lPending = podgroup_info.NewPodGroupInfos(lPendingRaw)
255+
rPending = podgroup_info.NewPodGroupInfos(rPendingRaw)
255256
}
256257

257-
rJobsToPop := 1
258-
if jobsOrder.queuePopsMap[rQueue.UID] > 0 {
259-
rJobsToPop = jobsOrder.queuePopsMap[rQueue.UID]
260-
}
261-
262-
lJobInfosRaw := jobsQueueMetadataPerQueue[lQueue.UID].jobsInQueue.Peek(lJobsToPop)
263-
rJobInfosRaw := jobsQueueMetadataPerQueue[rQueue.UID].jobsInQueue.Peek(rJobsToPop)
258+
var lVictims, rVictims podgroup_info.PodGroupInfos
259+
if jobsOrder.isVictimQueue {
260+
var lPoppedJobs []*podgroup_info.PodGroupInfo
261+
if len(jobsOrder.queuePopsMap[lQueue.UID]) > 0 {
262+
lPoppedJobs = append(lPoppedJobs, jobsOrder.queuePopsMap[lQueue.UID]...)
263+
}
264+
lPoppedJobs = append(lPoppedJobs, jobsQueueMetadataPerQueue[lQueue.UID].jobsInQueue.Peek(1)[0].(*podgroup_info.PodGroupInfo))
265+
lVictims = podgroup_info.PodGroupInfos{
266+
PodGroupInfos: lPoppedJobs,
267+
}
264268

265-
lJobInfos := podgroup_info.NewPodGroupInfos(lJobInfosRaw)
266-
rJobInfos := podgroup_info.NewPodGroupInfos(rJobInfosRaw)
269+
var rPoppedJobs []*podgroup_info.PodGroupInfo
270+
if len(jobsOrder.queuePopsMap[rQueue.UID]) > 0 {
271+
rPoppedJobs = append(rPoppedJobs, jobsOrder.queuePopsMap[rQueue.UID]...)
272+
}
273+
rPoppedJobs = append(rPoppedJobs, jobsQueueMetadataPerQueue[rQueue.UID].jobsInQueue.Peek(1)[0].(*podgroup_info.PodGroupInfo))
274+
rVictims = podgroup_info.PodGroupInfos{
275+
PodGroupInfos: rPoppedJobs,
276+
}
277+
}
267278

268279
if reverseOrder {
269-
return !jobsOrder.ssn.QueueOrderFn(lQueue, rQueue, lJobInfos, rJobInfos)
280+
return !jobsOrder.ssn.QueueOrderFn(lQueue, rQueue, lPending, rPending, lVictims, rVictims)
270281
}
271282

272-
return jobsOrder.ssn.QueueOrderFn(lQueue, rQueue, lJobInfos, rJobInfos)
283+
return jobsOrder.ssn.QueueOrderFn(lQueue, rQueue, lPending, rPending, lVictims, rVictims)
273284
}
274285
}
275286

@@ -288,27 +299,44 @@ func (jobsOrder *JobsOrderByQueues) buildFuncOrderBetweenDepartmentsWithJobs(rev
288299
lBestQueue := jobsOrder.departmentIdToDepartmentMetadata[lDepartment.UID].queuesPriorityQueue.Pop().(*queue_info.QueueInfo)
289300
rBestQueue := jobsOrder.departmentIdToDepartmentMetadata[rDepartment.UID].queuesPriorityQueue.Pop().(*queue_info.QueueInfo)
290301

291-
lJobsToPop := 1
292-
if jobsOrder.queuePopsMap[lBestQueue.UID] > 0 {
293-
lJobsToPop = jobsOrder.queuePopsMap[lBestQueue.UID]
294-
}
302+
lJobsInQueue := jobsOrder.queueIdToQueueMetadata[lBestQueue.UID].jobsInQueue
303+
rJobsInQueue := jobsOrder.queueIdToQueueMetadata[rBestQueue.UID].jobsInQueue
295304

296-
rJobsToPop := 1
297-
if jobsOrder.queuePopsMap[rBestQueue.UID] > 0 {
298-
rJobsToPop = jobsOrder.queuePopsMap[rBestQueue.UID]
305+
var lPending, rPending podgroup_info.PodGroupInfos
306+
if !jobsOrder.isVictimQueue {
307+
lPendingRaw := lJobsInQueue.Peek(1)
308+
rPendingRaw := rJobsInQueue.Peek(1)
309+
lPending = podgroup_info.NewPodGroupInfos(lPendingRaw)
310+
rPending = podgroup_info.NewPodGroupInfos(rPendingRaw)
299311
}
300-
lJobInfosRaw := jobsOrder.queueIdToQueueMetadata[lBestQueue.UID].jobsInQueue.Peek(lJobsToPop)
301-
rJobInfosRaw := jobsOrder.queueIdToQueueMetadata[rBestQueue.UID].jobsInQueue.Peek(rJobsToPop)
302-
lJobInfos := podgroup_info.NewPodGroupInfos(lJobInfosRaw)
303-
rJobInfos := podgroup_info.NewPodGroupInfos(rJobInfosRaw)
304312

313+
var lVictims, rVictims podgroup_info.PodGroupInfos
314+
if jobsOrder.isVictimQueue {
315+
var lPoppedJobs []*podgroup_info.PodGroupInfo
316+
if len(jobsOrder.queuePopsMap[lBestQueue.UID]) > 0 {
317+
lPoppedJobs = append(lPoppedJobs, jobsOrder.queuePopsMap[lBestQueue.UID]...)
318+
}
319+
lPoppedJobs = append(lPoppedJobs, lJobsInQueue.Peek(1)[0].(*podgroup_info.PodGroupInfo))
320+
lVictims = podgroup_info.PodGroupInfos{
321+
PodGroupInfos: lPoppedJobs,
322+
}
323+
324+
var rPoppedJobs []*podgroup_info.PodGroupInfo
325+
if len(jobsOrder.queuePopsMap[rBestQueue.UID]) > 0 {
326+
rPoppedJobs = append(rPoppedJobs, jobsOrder.queuePopsMap[rBestQueue.UID]...)
327+
}
328+
rPoppedJobs = append(rPoppedJobs, rJobsInQueue.Peek(1)[0].(*podgroup_info.PodGroupInfo))
329+
rVictims = podgroup_info.PodGroupInfos{
330+
PodGroupInfos: rPoppedJobs,
331+
}
332+
}
305333
jobsOrder.departmentIdToDepartmentMetadata[lDepartment.UID].queuesPriorityQueue.Push(lBestQueue)
306334
jobsOrder.departmentIdToDepartmentMetadata[rDepartment.UID].queuesPriorityQueue.Push(rBestQueue)
307335

308336
if reverseOrder {
309-
return !jobsOrder.ssn.QueueOrderFn(lDepartment, rDepartment, lJobInfos, rJobInfos)
337+
return !jobsOrder.ssn.QueueOrderFn(lDepartment, rDepartment, lPending, rPending, lVictims, rVictims)
310338
}
311339

312-
return jobsOrder.ssn.QueueOrderFn(lDepartment, rDepartment, lJobInfos, rJobInfos)
340+
return jobsOrder.ssn.QueueOrderFn(lDepartment, rDepartment, lPending, rPending, lVictims, rVictims)
313341
}
314342
}

pkg/scheduler/api/common_info/comparison.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,6 @@ type CompareFn func(interface{}, interface{}) int
1111

1212
// CompareTwoFactors is the func declaration used by sort or priority queue for 2 factors.
1313
type CompareTwoFactors func(interface{}, interface{}, interface{}, interface{}) int
14+
15+
// CompareThreeFactors is the func declaration used by sort or priority queue for 3 factors.
16+
type CompareThreeFactors func(interface{}, interface{}, interface{}, interface{}, interface{}, interface{}) int

0 commit comments

Comments
 (0)