Skip to content

Commit de73e87

Browse files
authored
[Bugfix] Prep for queue comparison changes (#114)
* Prep for queue comparison changes
1 parent 2a0bacd commit de73e87

File tree

12 files changed

+59
-71
lines changed

12 files changed

+59
-71
lines changed

pkg/scheduler/actions/integration_tests/integration_tests_utils/integration_tests_utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func RunTests(t *testing.T, testsMetadata []TestTopologyMetadata) {
4949
}
5050

5151
func RunTest(t *testing.T, testMetadata TestTopologyMetadata, testNumber int, controller *Controller) {
52-
t.Logf("Running test number: %v, test name: %v", testNumber, testMetadata.Name)
52+
t.Logf("Running test number: %v, test name: %v", testNumber, testMetadata.TestTopologyBasic.Name)
5353
ssn := test_utils.BuildSession(testMetadata.TestTopologyBasic, controller)
5454

5555
runRoundsUntilMatch(testMetadata, controller, &ssn)

pkg/scheduler/actions/reclaim/reclaim.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func getOrderedVictimsQueue(ssn *framework.Session, evictingQueue common_info.Qu
147147
jobsOrderedByQueue := utils.NewJobsOrderByQueues(ssn, utils.JobsOrderInitOptions{
148148
FilterNonPreemptible: true,
149149
FilterNonActiveAllocated: true,
150-
ReverseOrder: true,
150+
VictimQueue: true,
151151
MaxJobsQueueDepth: scheduler_util.QueueCapacityInfinite,
152152
})
153153
jobs := map[common_info.PodGroupID]*podgroup_info.PodGroupInfo{}

pkg/scheduler/actions/reclaim/reclaim_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestHandleReclaim(t *testing.T) {
2828
testsMetadata := getTestsMetadata()
2929

3030
for testNumber, testMetadata := range testsMetadata {
31-
t.Logf("Running test number: %v, test name: %v,", testNumber, testMetadata.Name)
31+
t.Logf("Running test number: %v, test name: %v,", testNumber, testMetadata.TestTopologyBasic.Name)
3232
ssn := test_utils.BuildSession(testMetadata.TestTopologyBasic, controller)
3333
reclaimAction := reclaim.New()
3434
reclaimAction.Execute(ssn)

pkg/scheduler/actions/utils/action.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func GetVictimsQueue(
3939
}
4040
}
4141
victimsQueue := NewJobsOrderByQueues(ssn, JobsOrderInitOptions{
42-
ReverseOrder: true,
42+
VictimQueue: true,
4343
MaxJobsQueueDepth: scheduler_util.QueueCapacityInfinite,
4444
})
4545
victimsQueue.InitializeWithJobs(preemptees)

pkg/scheduler/actions/utils/input_jobs.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type JobsOrderInitOptions struct {
1414
FilterNonPending bool
1515
FilterNonPreemptible bool
1616
FilterNonActiveAllocated bool
17-
ReverseOrder bool
17+
VictimQueue bool
1818
MaxJobsQueueDepth int
1919
}
2020

@@ -44,8 +44,8 @@ func (jobsOrder *JobsOrderByQueues) InitializeWithJobs(
4444
continue
4545
}
4646

47-
jobsOrder.addJobToQueue(job, jobsOrder.jobsOrderInitOptions.ReverseOrder)
47+
jobsOrder.addJobToQueue(job, jobsOrder.jobsOrderInitOptions.VictimQueue)
4848
}
4949

50-
jobsOrder.buildActiveJobOrderPriorityQueues(jobsOrder.jobsOrderInitOptions.ReverseOrder)
50+
jobsOrder.buildActiveJobOrderPriorityQueues(jobsOrder.jobsOrderInitOptions.VictimQueue)
5151
}

pkg/scheduler/actions/utils/job_order_by_queue.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func (jobsOrder *JobsOrderByQueues) PopNextJob() *podgroup_info.PodGroupInfo {
7171
job := jobsOrder.queueIdToQueueMetadata[queue.UID].jobsInQueue.Pop().(*podgroup_info.PodGroupInfo)
7272
jobsOrder.updateQueuePriorityQueue(queue, department)
7373
jobsOrder.updateDepartmentPriorityQueue(department)
74+
7475
log.InfraLogger.V(7).Infof("Popped job: %v", job.Name)
7576
return job
7677
}
@@ -80,11 +81,11 @@ func (jobsOrder *JobsOrderByQueues) PushJob(job *podgroup_info.PodGroupInfo) {
8081
department := jobsOrder.ssn.Queues[queue.ParentQueue]
8182

8283
if _, found := jobsOrder.departmentIdToDepartmentMetadata[department.UID]; !found {
83-
jobsOrder.initializePriorityQueueForDepartment(department, jobsOrder.jobsOrderInitOptions.ReverseOrder)
84+
jobsOrder.initializePriorityQueueForDepartment(department, jobsOrder.jobsOrderInitOptions.VictimQueue)
8485
jobsOrder.activeDepartments.Push(department)
8586
}
8687
if _, found := jobsOrder.queueIdToQueueMetadata[job.Queue]; !found {
87-
jobsOrder.initializePriorityQueue(job, jobsOrder.jobsOrderInitOptions.ReverseOrder)
88+
jobsOrder.initializePriorityQueue(job, jobsOrder.jobsOrderInitOptions.VictimQueue)
8889
jobsOrder.departmentIdToDepartmentMetadata[department.UID].queuesPriorityQueue.Push(queue)
8990
}
9091

@@ -243,10 +244,10 @@ func (jobsOrder *JobsOrderByQueues) buildFuncOrderBetweenQueuesWithJobs(jobsQueu
243244
jobsQueueMetadataPerQueue[rQueue.UID].jobsInQueue.Push(rJobInfo)
244245

245246
if reverseOrder {
246-
return !jobsOrder.ssn.QueueOrderFn(lQueue, rQueue, lJobInfo, rJobInfo)
247+
return !jobsOrder.ssn.QueueOrderFn(lQueue, rQueue, lJobInfo, rJobInfo, nil, nil)
247248
}
248249

249-
return jobsOrder.ssn.QueueOrderFn(lQueue, rQueue, lJobInfo, rJobInfo)
250+
return jobsOrder.ssn.QueueOrderFn(lQueue, rQueue, lJobInfo, rJobInfo, nil, nil)
250251
}
251252
}
252253

@@ -274,9 +275,9 @@ func (jobsOrder *JobsOrderByQueues) buildFuncOrderBetweenDepartmentsWithJobs(rev
274275
jobsOrder.departmentIdToDepartmentMetadata[rDepartment.UID].queuesPriorityQueue.Push(rBestQueue)
275276

276277
if reverseOrder {
277-
return !jobsOrder.ssn.QueueOrderFn(lDepartment, rDepartment, lJobInfo, rJobInfo)
278+
return !jobsOrder.ssn.QueueOrderFn(lDepartment, rDepartment, lJobInfo, rJobInfo, nil, nil)
278279
}
279280

280-
return jobsOrder.ssn.QueueOrderFn(lDepartment, rDepartment, lJobInfo, rJobInfo)
281+
return jobsOrder.ssn.QueueOrderFn(lDepartment, rDepartment, lJobInfo, rJobInfo, nil, nil)
281282
}
282283
}

pkg/scheduler/actions/utils/job_order_by_queue_test.go

Lines changed: 28 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestNumericalPriorityWithinSameQueue(t *testing.T) {
4949
testPod: {},
5050
},
5151
},
52-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
52+
PodInfos: pod_info.PodsMap{
5353
testPod: {},
5454
},
5555
},
@@ -62,7 +62,7 @@ func TestNumericalPriorityWithinSameQueue(t *testing.T) {
6262
testPod: {},
6363
},
6464
},
65-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
65+
PodInfos: pod_info.PodsMap{
6666
testPod: {},
6767
},
6868
},
@@ -75,7 +75,7 @@ func TestNumericalPriorityWithinSameQueue(t *testing.T) {
7575
testPod: {},
7676
},
7777
},
78-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
78+
PodInfos: pod_info.PodsMap{
7979
testPod: {},
8080
},
8181
},
@@ -88,7 +88,7 @@ func TestNumericalPriorityWithinSameQueue(t *testing.T) {
8888
testPod: {},
8989
},
9090
},
91-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
91+
PodInfos: pod_info.PodsMap{
9292
testPod: {},
9393
},
9494
},
@@ -112,11 +112,9 @@ func TestNumericalPriorityWithinSameQueue(t *testing.T) {
112112

113113
func TestJobsOrderByQueues_PushJob(t *testing.T) {
114114
type fields struct {
115-
queueIdToQueueMetadata map[common_info.QueueID]*jobsQueueMetadata
116-
departmentIdToDepartmentMetadata map[common_info.QueueID]*departmentMetadata
117-
jobsOrderInitOptions JobsOrderInitOptions
118-
Queues map[common_info.QueueID]*queue_info.QueueInfo
119-
InsertedJob map[common_info.PodGroupID]*podgroup_info.PodGroupInfo
115+
jobsOrderInitOptions JobsOrderInitOptions
116+
Queues map[common_info.QueueID]*queue_info.QueueInfo
117+
InsertedJob map[common_info.PodGroupID]*podgroup_info.PodGroupInfo
120118
}
121119
type args struct {
122120
job *podgroup_info.PodGroupInfo
@@ -133,10 +131,8 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
133131
{
134132
name: "single podgroup insert - empty queue",
135133
fields: fields{
136-
queueIdToQueueMetadata: map[common_info.QueueID]*jobsQueueMetadata{},
137-
departmentIdToDepartmentMetadata: map[common_info.QueueID]*departmentMetadata{},
138134
jobsOrderInitOptions: JobsOrderInitOptions{
139-
ReverseOrder: false,
135+
VictimQueue: false,
140136
FilterNonPending: true,
141137
FilterUnready: true,
142138
MaxJobsQueueDepth: scheduler_util.QueueCapacityInfinite,
@@ -157,7 +153,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
157153
"p1": {},
158154
},
159155
},
160-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
156+
PodInfos: pod_info.PodsMap{
161157
"p1": {},
162158
},
163159
},
@@ -173,7 +169,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
173169
"p1": {},
174170
},
175171
},
176-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
172+
PodInfos: pod_info.PodsMap{
177173
"p1": {},
178174
},
179175
},
@@ -183,10 +179,8 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
183179
{
184180
name: "single podgroup insert - one in queue. On pop comes second",
185181
fields: fields{
186-
queueIdToQueueMetadata: map[common_info.QueueID]*jobsQueueMetadata{},
187-
departmentIdToDepartmentMetadata: map[common_info.QueueID]*departmentMetadata{},
188182
jobsOrderInitOptions: JobsOrderInitOptions{
189-
ReverseOrder: false,
183+
VictimQueue: false,
190184
FilterNonPending: true,
191185
FilterUnready: true,
192186
MaxJobsQueueDepth: scheduler_util.QueueCapacityInfinite,
@@ -206,7 +200,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
206200
"p1": {},
207201
},
208202
},
209-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
203+
PodInfos: pod_info.PodsMap{
210204
"p1": {},
211205
},
212206
},
@@ -223,7 +217,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
223217
"p1": {},
224218
},
225219
},
226-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
220+
PodInfos: pod_info.PodsMap{
227221
"p1": {},
228222
},
229223
},
@@ -240,7 +234,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
240234
"p1": {},
241235
},
242236
},
243-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
237+
PodInfos: pod_info.PodsMap{
244238
"p1": {},
245239
},
246240
},
@@ -254,7 +248,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
254248
"p1": {},
255249
},
256250
},
257-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
251+
PodInfos: pod_info.PodsMap{
258252
"p1": {},
259253
},
260254
},
@@ -264,10 +258,8 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
264258
{
265259
name: "single podgroup insert - one in queue. On pop comes first",
266260
fields: fields{
267-
queueIdToQueueMetadata: map[common_info.QueueID]*jobsQueueMetadata{},
268-
departmentIdToDepartmentMetadata: map[common_info.QueueID]*departmentMetadata{},
269261
jobsOrderInitOptions: JobsOrderInitOptions{
270-
ReverseOrder: false,
262+
VictimQueue: false,
271263
FilterNonPending: true,
272264
FilterUnready: true,
273265
MaxJobsQueueDepth: scheduler_util.QueueCapacityInfinite,
@@ -287,7 +279,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
287279
"p1": {},
288280
},
289281
},
290-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
282+
PodInfos: pod_info.PodsMap{
291283
"p1": {},
292284
},
293285
},
@@ -304,7 +296,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
304296
"p1": {},
305297
},
306298
},
307-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
299+
PodInfos: pod_info.PodsMap{
308300
"p1": {},
309301
},
310302
},
@@ -321,7 +313,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
321313
"p1": {},
322314
},
323315
},
324-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
316+
PodInfos: pod_info.PodsMap{
325317
"p1": {},
326318
},
327319
},
@@ -335,7 +327,7 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
335327
"p1": {},
336328
},
337329
},
338-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
330+
PodInfos: pod_info.PodsMap{
339331
"p1": {},
340332
},
341333
},
@@ -348,16 +340,16 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
348340
ssn := newPrioritySession()
349341
ssn.Queues = tt.fields.Queues
350342
activeDepartments := scheduler_util.NewPriorityQueue(func(l, r interface{}) bool {
351-
if tt.fields.jobsOrderInitOptions.ReverseOrder {
343+
if tt.fields.jobsOrderInitOptions.VictimQueue {
352344
return !ssn.JobOrderFn(l, r)
353345
}
354346
return ssn.JobOrderFn(l, r)
355347
}, scheduler_util.QueueCapacityInfinite)
356348

357349
jobsOrder := &JobsOrderByQueues{
358350
activeDepartments: activeDepartments,
359-
queueIdToQueueMetadata: tt.fields.queueIdToQueueMetadata,
360-
departmentIdToDepartmentMetadata: tt.fields.departmentIdToDepartmentMetadata,
351+
queueIdToQueueMetadata: map[common_info.QueueID]*jobsQueueMetadata{},
352+
departmentIdToDepartmentMetadata: map[common_info.QueueID]*departmentMetadata{},
361353
ssn: ssn,
362354
jobsOrderInitOptions: tt.fields.jobsOrderInitOptions,
363355
}
@@ -394,7 +386,7 @@ func TestJobsOrderByQueues_RequeueJob(t *testing.T) {
394386
queueIdToQueueMetadata: map[common_info.QueueID]*jobsQueueMetadata{},
395387
departmentIdToDepartmentMetadata: map[common_info.QueueID]*departmentMetadata{},
396388
jobsOrderInitOptions: JobsOrderInitOptions{
397-
ReverseOrder: false,
389+
VictimQueue: false,
398390
FilterNonPending: true,
399391
FilterUnready: true,
400392
MaxJobsQueueDepth: scheduler_util.QueueCapacityInfinite,
@@ -414,7 +406,7 @@ func TestJobsOrderByQueues_RequeueJob(t *testing.T) {
414406
"p1": {},
415407
},
416408
},
417-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
409+
PodInfos: pod_info.PodsMap{
418410
"p1": {},
419411
},
420412
},
@@ -432,7 +424,7 @@ func TestJobsOrderByQueues_RequeueJob(t *testing.T) {
432424
"p1": {},
433425
},
434426
},
435-
PodInfos: map[common_info.PodID]*pod_info.PodInfo{
427+
PodInfos: pod_info.PodsMap{
436428
"p1": {},
437429
},
438430
},
@@ -445,7 +437,7 @@ func TestJobsOrderByQueues_RequeueJob(t *testing.T) {
445437
ssn := newPrioritySession()
446438
ssn.Queues = tt.fields.Queues
447439
activeDepartments := scheduler_util.NewPriorityQueue(func(l, r interface{}) bool {
448-
if tt.fields.jobsOrderInitOptions.ReverseOrder {
440+
if tt.fields.jobsOrderInitOptions.VictimQueue {
449441
return !ssn.JobOrderFn(l, r)
450442
}
451443
return ssn.JobOrderFn(l, r)
@@ -483,6 +475,7 @@ func newPrioritySession() *framework.Session {
483475
Plugins: []conf.PluginOption{
484476
{Name: "Priority"},
485477
{Name: "Elastic"},
478+
{Name: "Proportion"},
486479
},
487480
},
488481
},

pkg/scheduler/api/common_info/comparison.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,3 @@ type LessFn func(interface{}, interface{}) bool
88

99
// CompareFn is the func declaration used by sort or priority queue.
1010
type CompareFn func(interface{}, interface{}) int
11-
12-
// CompareTwoFactors is the func declaration used by sort or priority queue for 2 factors.
13-
type CompareTwoFactors func(interface{}, interface{}, interface{}, interface{}) int

pkg/scheduler/api/podgroup_info/job_info.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
"golang.org/x/exp/maps"
1212
"golang.org/x/exp/slices"
13-
"k8s.io/api/core/v1"
13+
v1 "k8s.io/api/core/v1"
1414
policyv1 "k8s.io/api/policy/v1"
1515
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1616
"k8s.io/apimachinery/pkg/types"

pkg/scheduler/framework/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type Session struct {
4646
NodeOrderFns []api.NodeOrderFn
4747
TaskOrderFns []common_info.CompareFn
4848
JobOrderFns []common_info.CompareFn
49-
QueueOrderFns []common_info.CompareTwoFactors
49+
QueueOrderFns []CompareQueueFn
5050
CanReclaimResourcesFns []api.CanReclaimResourcesFn
5151
ReclaimableFns []api.EvictableFn
5252
OnJobSolutionStartFns []api.OnJobSolutionStartFn

0 commit comments

Comments
 (0)