Skip to content

Commit fa7b160

Browse files
authored
Not using summerized MinAvailable for PodGroup, but look per SubGroup (#443)
* IsElastic * IsGangSatisfied * ShouldPipeline * OrderJob Elastic * SplitVictimTasks * MinRuntime
1 parent 847bc71 commit fa7b160

File tree

7 files changed

+177
-41
lines changed

7 files changed

+177
-41
lines changed

pkg/scheduler/api/podgroup_info/job_info.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,12 @@ func (pgi *PodGroupInfo) IsReadyForScheduling() bool {
403403
}
404404

405405
func (pgi *PodGroupInfo) IsElastic() bool {
406-
return pgi.GetDefaultMinAvailable() < int32(len(pgi.GetAllPodsMap()))
406+
for _, subGroup := range pgi.GetSubGroups() {
407+
if subGroup.IsElastic() {
408+
return true
409+
}
410+
}
411+
return false
407412
}
408413

409414
func (pgi *PodGroupInfo) IsStale() bool {
@@ -424,10 +429,6 @@ func (pgi *PodGroupInfo) IsStale() bool {
424429
}
425430

426431
func (pgi *PodGroupInfo) IsGangSatisfied() bool {
427-
numActiveTasks := pgi.GetNumActiveUsedTasks()
428-
if numActiveTasks < int(pgi.GetDefaultMinAvailable()) {
429-
return false
430-
}
431432
for _, subGroup := range pgi.SubGroups {
432433
if !subGroup.IsGangSatisfied() {
433434
return false
@@ -437,20 +438,26 @@ func (pgi *PodGroupInfo) IsGangSatisfied() bool {
437438
}
438439

439440
func (pgi *PodGroupInfo) ShouldPipelineJob() bool {
440-
hasPipelinedTask := false
441-
activeAllocatedTasksCount := 0
442-
for _, task := range pgi.GetAllPodsMap() {
443-
if task.Status == pod_status.Pipelined {
444-
log.InfraLogger.V(7).Infof("task: <%v/%v> was pipelined to node: <%v>",
445-
task.Namespace, task.Name, task.NodeName)
446-
hasPipelinedTask = true
447-
} else if pod_status.IsActiveAllocatedStatus(task.Status) {
448-
activeAllocatedTasksCount += 1
441+
for _, subGroup := range pgi.SubGroups {
442+
hasPipelinedTask := false
443+
activeAllocatedTasksCount := 0
444+
for _, task := range subGroup.GetPodInfos() {
445+
if task.Status == pod_status.Pipelined {
446+
log.InfraLogger.V(7).Infof("task: <%v/%v> was pipelined to node: <%v>",
447+
task.Namespace, task.Name, task.NodeName)
448+
hasPipelinedTask = true
449+
} else if pod_status.IsActiveAllocatedStatus(task.Status) {
450+
activeAllocatedTasksCount += 1
451+
}
452+
}
453+
454+
if hasPipelinedTask && activeAllocatedTasksCount < int(subGroup.GetMinAvailable()) {
455+
log.InfraLogger.V(7).Infof("Subgroup: <%v/%v> has pipelined tasks, and not enough allocated pods for minAvailable <%v>. Pipeline all.",
456+
pgi.UID, subGroup.GetName(), subGroup.GetMinAvailable())
457+
return true
449458
}
450459
}
451-
// If the job has already MinAvailable tasks active allocated (but not pipelined),
452-
// then we shouldn't convert non-pipelined tasks to pipeline.
453-
return hasPipelinedTask && activeAllocatedTasksCount < int(pgi.GetDefaultMinAvailable())
460+
return false
454461
}
455462

456463
func (pgi *PodGroupInfo) Clone() *PodGroupInfo {

pkg/scheduler/api/podgroup_info/subgroup_info.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ func (sgi *SubGroupInfo) IsGangSatisfied() bool {
117117
return numActiveTasks >= int(sgi.minAvailable)
118118
}
119119

120+
func (sgi *SubGroupInfo) IsElastic() bool {
121+
return sgi.GetMinAvailable() < int32(len(sgi.GetPodInfos()))
122+
}
123+
120124
func (sgi *SubGroupInfo) GetNumActiveAllocatedTasks() int {
121125
return sgi.numActiveAllocatedTasks
122126
}

pkg/scheduler/api/podgroup_info/subgroup_info_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,52 @@ func TestGetNumPendingTasks(t *testing.T) {
332332
}
333333
}
334334

335+
func TestIsElastic(t *testing.T) {
336+
tests := []struct {
337+
name string
338+
pods []*pod_info.PodInfo
339+
expected bool
340+
}{
341+
{
342+
name: "satisfied with exact minimum",
343+
pods: []*pod_info.PodInfo{
344+
{UID: "1", Status: pod_status.Running},
345+
{UID: "2", Status: pod_status.Running},
346+
},
347+
expected: false,
348+
},
349+
{
350+
name: "not satisfied with insufficient pods",
351+
pods: []*pod_info.PodInfo{
352+
{UID: "1", Status: pod_status.Pending},
353+
},
354+
expected: false,
355+
},
356+
{
357+
name: "satisfied with above minimum pods",
358+
pods: []*pod_info.PodInfo{
359+
{UID: "1", Status: pod_status.Pending},
360+
{UID: "2", Status: pod_status.Running},
361+
{UID: "3", Status: pod_status.Pending},
362+
{UID: "4", Status: pod_status.Gated},
363+
{UID: "5", Status: pod_status.Pending},
364+
},
365+
expected: true,
366+
},
367+
}
368+
369+
for _, test := range tests {
370+
sgi := NewSubGroupInfo("test", 2)
371+
for _, pod := range test.pods {
372+
sgi.AssignTask(pod)
373+
}
374+
375+
if got := sgi.IsElastic(); got != test.expected {
376+
t.Errorf("Name: %v, IsElastic() = %v, want %v", test.name, got, test.expected)
377+
}
378+
}
379+
}
380+
335381
func TestGetNumActiveAllocatedTasks(t *testing.T) {
336382
tests := []struct {
337383
name string

pkg/scheduler/plugins/elastic/elastic.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,22 @@ func JobOrderFn(l, r interface{}) int {
2929
lv := l.(*podgroup_info.PodGroupInfo)
3030
rv := r.(*podgroup_info.PodGroupInfo)
3131

32-
lvAllocatedCount := int32(lv.GetActiveAllocatedTasksCount())
33-
rvAllocatedCount := int32(rv.GetActiveAllocatedTasksCount())
32+
lvBelowMinAvailable, lvAboveMinAvailable, lvExactlyAtMinAvailable := minAvailableState(lv)
33+
rvBelowMinAvailable, rvAboveMinAvailable, rvExactlyAtMinAvailable := minAvailableState(rv)
3434

35-
if lvAllocatedCount < lv.GetDefaultMinAvailable() && rvAllocatedCount >= rv.GetDefaultMinAvailable() {
35+
if lvBelowMinAvailable && !rvBelowMinAvailable {
3636
return -1
3737
}
3838

39-
if lvAllocatedCount == lv.GetDefaultMinAvailable() && rvAllocatedCount > rv.GetDefaultMinAvailable() {
39+
if lvExactlyAtMinAvailable && rvAboveMinAvailable {
4040
return -1
4141
}
4242

43-
if lvAllocatedCount >= lv.GetDefaultMinAvailable() && rvAllocatedCount < rv.GetDefaultMinAvailable() {
43+
if !lvBelowMinAvailable && rvBelowMinAvailable {
4444
return 1
4545
}
4646

47-
if lvAllocatedCount > lv.GetDefaultMinAvailable() && rvAllocatedCount == rv.GetDefaultMinAvailable() {
47+
if lvAboveMinAvailable && rvExactlyAtMinAvailable {
4848
return 1
4949
}
5050

@@ -53,4 +53,18 @@ func JobOrderFn(l, r interface{}) int {
5353
return 0
5454
}
5555

56+
func minAvailableState(pgi *podgroup_info.PodGroupInfo) (bool, bool, bool) {
57+
exactlyAtMinAvailable := true
58+
for _, subGroup := range pgi.SubGroups {
59+
numAllocatedTasks := int32(subGroup.GetNumActiveAllocatedTasks())
60+
if numAllocatedTasks < subGroup.GetMinAvailable() {
61+
return true, false, false
62+
}
63+
if numAllocatedTasks > subGroup.GetMinAvailable() {
64+
exactlyAtMinAvailable = false
65+
}
66+
}
67+
return false, !exactlyAtMinAvailable, exactlyAtMinAvailable
68+
}
69+
5670
func (pp *elasticPlugin) OnSessionClose(_ *framework.Session) {}

pkg/scheduler/plugins/elastic/elastic_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func TestJobOrderFn(t *testing.T) {
6060
},
6161
lPods: []*pod_info.PodInfo{
6262
{
63+
UID: "pod-a-1",
6364
Name: "pod-a-1",
6465
Status: pod_status.Running,
6566
ResReq: resource_info.EmptyResourceRequirements(),
@@ -74,6 +75,7 @@ func TestJobOrderFn(t *testing.T) {
7475
},
7576
rPods: []*pod_info.PodInfo{
7677
{
78+
UID: "pod-a-2",
7779
Name: "pod-a-2",
7880
Status: pod_status.Running,
7981
ResReq: resource_info.EmptyResourceRequirements(),
@@ -94,6 +96,7 @@ func TestJobOrderFn(t *testing.T) {
9496
},
9597
lPods: []*pod_info.PodInfo{
9698
{
99+
UID: "pod-a-1",
97100
Name: "pod-a-1",
98101
Status: pod_status.Allocated,
99102
ResReq: resource_info.EmptyResourceRequirements(),
@@ -108,6 +111,7 @@ func TestJobOrderFn(t *testing.T) {
108111
},
109112
rPods: []*pod_info.PodInfo{
110113
{
114+
UID: "pod-a-2",
111115
Name: "pod-a-2",
112116
Status: pod_status.Running,
113117
ResReq: resource_info.EmptyResourceRequirements(),
@@ -128,6 +132,7 @@ func TestJobOrderFn(t *testing.T) {
128132
},
129133
lPods: []*pod_info.PodInfo{
130134
{
135+
UID: "pod-a-1",
131136
Name: "pod-a-1",
132137
Status: pod_status.Bound,
133138
ResReq: resource_info.EmptyResourceRequirements(),
@@ -142,6 +147,7 @@ func TestJobOrderFn(t *testing.T) {
142147
},
143148
rPods: []*pod_info.PodInfo{
144149
{
150+
UID: "pod-a-2",
145151
Name: "pod-a-2",
146152
Status: pod_status.Running,
147153
ResReq: resource_info.EmptyResourceRequirements(),
@@ -162,6 +168,7 @@ func TestJobOrderFn(t *testing.T) {
162168
},
163169
lPods: []*pod_info.PodInfo{
164170
{
171+
UID: "pod-a-1",
165172
Name: "pod-a-1",
166173
Status: pod_status.Releasing,
167174
ResReq: resource_info.EmptyResourceRequirements(),
@@ -176,6 +183,7 @@ func TestJobOrderFn(t *testing.T) {
176183
},
177184
rPods: []*pod_info.PodInfo{
178185
{
186+
UID: "pod-a-2",
179187
Name: "pod-a-2",
180188
Status: pod_status.Running,
181189
ResReq: resource_info.EmptyResourceRequirements(),
@@ -196,6 +204,7 @@ func TestJobOrderFn(t *testing.T) {
196204
},
197205
lPods: []*pod_info.PodInfo{
198206
{
207+
UID: "pod-a-1",
199208
Name: "pod-a-1",
200209
Status: pod_status.Running,
201210
ResReq: resource_info.EmptyResourceRequirements(),
@@ -222,6 +231,7 @@ func TestJobOrderFn(t *testing.T) {
222231
},
223232
lPods: []*pod_info.PodInfo{
224233
{
234+
UID: "pod-a-1",
225235
Name: "pod-a-1",
226236
Status: pod_status.Running,
227237
ResReq: resource_info.EmptyResourceRequirements(),
@@ -236,11 +246,13 @@ func TestJobOrderFn(t *testing.T) {
236246
},
237247
rPods: []*pod_info.PodInfo{
238248
{
249+
UID: "pod-a-2",
239250
Name: "pod-a-2",
240251
Status: pod_status.Running,
241252
ResReq: resource_info.EmptyResourceRequirements(),
242253
},
243254
{
255+
UID: "pod-b-2",
244256
Name: "pod-b-2",
245257
Status: pod_status.Running,
246258
ResReq: resource_info.EmptyResourceRequirements(),
@@ -261,6 +273,7 @@ func TestJobOrderFn(t *testing.T) {
261273
},
262274
lPods: []*pod_info.PodInfo{
263275
{
276+
UID: "pod-a-1",
264277
Name: "pod-a-1",
265278
Status: pod_status.Running,
266279
ResReq: resource_info.EmptyResourceRequirements(),
@@ -275,16 +288,19 @@ func TestJobOrderFn(t *testing.T) {
275288
},
276289
rPods: []*pod_info.PodInfo{
277290
{
291+
UID: "pod-a-2",
278292
Name: "pod-a-2",
279293
Status: pod_status.Running,
280294
ResReq: resource_info.EmptyResourceRequirements(),
281295
},
282296
{
297+
UID: "pod-b-2",
283298
Name: "pod-b-2",
284299
Status: pod_status.Running,
285300
ResReq: resource_info.EmptyResourceRequirements(),
286301
},
287302
{
303+
UID: "pod-b-3",
288304
Name: "pod-b-3",
289305
Status: pod_status.Running,
290306
ResReq: resource_info.EmptyResourceRequirements(),
@@ -305,6 +321,7 @@ func TestJobOrderFn(t *testing.T) {
305321
},
306322
lPods: []*pod_info.PodInfo{
307323
{
324+
UID: "pod-a-1",
308325
Name: "pod-a-1",
309326
Status: pod_status.Running,
310327
ResReq: resource_info.EmptyResourceRequirements(),
@@ -319,11 +336,13 @@ func TestJobOrderFn(t *testing.T) {
319336
},
320337
rPods: []*pod_info.PodInfo{
321338
{
339+
UID: "pod-a-2",
322340
Name: "pod-a-2",
323341
Status: pod_status.Running,
324342
ResReq: resource_info.EmptyResourceRequirements(),
325343
},
326344
{
345+
UID: "pod-b-2",
327346
Name: "pod-b-2",
328347
Status: pod_status.Running,
329348
ResReq: resource_info.EmptyResourceRequirements(),
@@ -344,6 +363,7 @@ func TestJobOrderFn(t *testing.T) {
344363
},
345364
lPods: []*pod_info.PodInfo{
346365
{
366+
UID: "pod-a-1",
347367
Name: "pod-a-1",
348368
Status: pod_status.Running,
349369
ResReq: resource_info.EmptyResourceRequirements(),
@@ -358,11 +378,13 @@ func TestJobOrderFn(t *testing.T) {
358378
},
359379
rPods: []*pod_info.PodInfo{
360380
{
381+
UID: "pod-a-2",
361382
Name: "pod-a-2",
362383
Status: pod_status.Running,
363384
ResReq: resource_info.EmptyResourceRequirements(),
364385
},
365386
{
387+
UID: "pod-b-2",
366388
Name: "pod-b-2",
367389
Status: pod_status.Running,
368390
ResReq: resource_info.EmptyResourceRequirements(),
@@ -383,11 +405,13 @@ func TestJobOrderFn(t *testing.T) {
383405
},
384406
lPods: []*pod_info.PodInfo{
385407
{
408+
UID: "pod-a-1",
386409
Name: "pod-a-1",
387410
Status: pod_status.Running,
388411
ResReq: resource_info.EmptyResourceRequirements(),
389412
},
390413
{
414+
UID: "pod-a-2",
391415
Name: "pod-a-2",
392416
Status: pod_status.Running,
393417
ResReq: resource_info.EmptyResourceRequirements(),
@@ -402,6 +426,7 @@ func TestJobOrderFn(t *testing.T) {
402426
},
403427
rPods: []*pod_info.PodInfo{
404428
{
429+
UID: "pod-a-2",
405430
Name: "pod-a-2",
406431
Status: pod_status.Running,
407432
ResReq: resource_info.EmptyResourceRequirements(),
@@ -422,11 +447,13 @@ func TestJobOrderFn(t *testing.T) {
422447
},
423448
lPods: []*pod_info.PodInfo{
424449
{
450+
UID: "pod-a-1",
425451
Name: "pod-a-1",
426452
Status: pod_status.Running,
427453
ResReq: resource_info.EmptyResourceRequirements(),
428454
},
429455
{
456+
UID: "pod-a-2",
430457
Name: "pod-a-2",
431458
Status: pod_status.Running,
432459
ResReq: resource_info.EmptyResourceRequirements(),
@@ -441,6 +468,7 @@ func TestJobOrderFn(t *testing.T) {
441468
},
442469
rPods: []*pod_info.PodInfo{
443470
{
471+
UID: "pod-a-2",
444472
Name: "pod-a-2",
445473
Status: pod_status.Running,
446474
ResReq: resource_info.EmptyResourceRequirements(),

0 commit comments

Comments
 (0)