Skip to content

Commit dbe5618

Browse files
authored
Fixed getNumOfTasksToAllocatePerSubGroup so it will account tasks with virtualStatus (#403)
1 parent 2145d06 commit dbe5618

File tree

3 files changed

+176
-11
lines changed

3 files changed

+176
-11
lines changed

pkg/scheduler/actions/consolidation/consolidation_subgroups_test.go

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ func getSubGroupsConsolidationTestsMetadata() []integration_tests_utils.TestTopo
439439
RequiredGPUs: ptr.To(int64(2)),
440440
},
441441
},
442-
MinAvailable: ptr.To(int32(2)),
442+
MinAvailable: ptr.To(int32(1)),
443443
},
444444
{
445445
Name: "running_job1",
@@ -532,5 +532,83 @@ func getSubGroupsConsolidationTestsMetadata() []integration_tests_utils.TestTopo
532532
},
533533
},
534534
},
535+
{
536+
TestTopologyBasic: test_utils.TestTopologyBasic{
537+
Name: "job with sub groups above minAvailable consolidated by a pending job - complete eviction",
538+
Jobs: []*jobs_fake.TestJobBasic{
539+
{
540+
Name: "running_job0",
541+
Priority: constants.PriorityTrainNumber,
542+
QueueName: "queue0",
543+
SubGroups: map[string]*podgroup_info.SubGroupInfo{
544+
"sub-0": podgroup_info.NewSubGroupInfo("sub-0", 1),
545+
},
546+
Tasks: []*tasks_fake.TestTaskBasic{
547+
{
548+
State: pod_status.Running,
549+
NodeName: "node0",
550+
SubGroupName: "sub-0",
551+
RequiredGPUs: ptr.To(int64(1)),
552+
},
553+
{
554+
State: pod_status.Running,
555+
NodeName: "node1",
556+
SubGroupName: "sub-0",
557+
RequiredGPUs: ptr.To(int64(1)),
558+
},
559+
},
560+
MinAvailable: ptr.To(int32(1)),
561+
},
562+
{
563+
Name: "pending_job",
564+
RequiredGPUsPerTask: 4,
565+
Priority: constants.PriorityTrainNumber,
566+
QueueName: "queue0",
567+
Tasks: []*tasks_fake.TestTaskBasic{
568+
{
569+
State: pod_status.Pending,
570+
},
571+
},
572+
},
573+
},
574+
Nodes: map[string]nodes_fake.TestNodeBasic{
575+
"node0": {
576+
GPUs: 4,
577+
},
578+
"node1": {
579+
GPUs: 4,
580+
},
581+
},
582+
Queues: []test_utils.TestQueueBasic{
583+
{
584+
Name: "queue0",
585+
DeservedGPUs: 2,
586+
},
587+
},
588+
TaskExpectedResults: map[string]test_utils.TestExpectedResultBasic{
589+
"running_job0-0": {
590+
NodeName: "node0",
591+
GPUsRequired: 1,
592+
Status: pod_status.Running,
593+
},
594+
"running_job0-1": {
595+
NodeName: "node0",
596+
GPUsRequired: 1,
597+
Status: pod_status.Pipelined,
598+
},
599+
"pending_job-0": {
600+
GPUsRequired: 4,
601+
NodeName: "node1",
602+
Status: pod_status.Pipelined,
603+
},
604+
},
605+
Mocks: &test_utils.TestMock{
606+
CacheRequirements: &test_utils.CacheMocking{
607+
NumberOfCacheEvictions: 1,
608+
NumberOfPipelineActions: 2,
609+
},
610+
},
611+
},
612+
},
535613
}
536614
}

pkg/scheduler/api/podgroup_info/allocation_info.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func GetTasksToAllocate(
3434
var tasksToAllocate []*pod_info.PodInfo
3535
if len(podGroupInfo.SubGroups) > 0 {
3636
priorityQueueMap := getTasksPriorityQueuePerSubGroup(podGroupInfo, taskOrderFn, isRealAllocation)
37-
maxNumOfTasksToAllocateMap := getNumOfTasksToAllocatePerSubGroup(podGroupInfo)
37+
maxNumOfTasksToAllocateMap := getNumTasksToAllocatePerSubGroup(podGroupInfo, isRealAllocation)
3838

3939
subGroupPriorityQueue := getSubGroupsPriorityQueue(podGroupInfo.SubGroups, subGroupOrderFn)
4040
maxNumOfSubGroups := getNumOfSubGroupsToAllocate(podGroupInfo)
@@ -51,7 +51,7 @@ func GetTasksToAllocate(
5151

5252
} else {
5353
taskPriorityQueue := getTasksPriorityQueue(podGroupInfo, taskOrderFn, isRealAllocation)
54-
maxNumOfTasksToAllocate := getNumOfTasksToAllocate(podGroupInfo)
54+
maxNumOfTasksToAllocate := getNumTasksToAllocate(podGroupInfo)
5555
tasksToAllocate = getTasksFromQueue(taskPriorityQueue, maxNumOfTasksToAllocate)
5656
}
5757

@@ -151,27 +151,38 @@ func getSubGroupsPriorityQueue(subGroups map[string]*SubGroupInfo,
151151
return priorityQueue
152152
}
153153

154-
func getNumOfTasksToAllocate(podGroupInfo *PodGroupInfo) int {
154+
func getNumTasksToAllocate(podGroupInfo *PodGroupInfo) int {
155155
numAllocatedTasks := podGroupInfo.GetActiveAllocatedTasksCount()
156156
if numAllocatedTasks >= int(podGroupInfo.MinAvailable) {
157157
return 1
158158
}
159159
return int(podGroupInfo.MinAvailable) - numAllocatedTasks
160160
}
161161

162-
func getNumOfTasksToAllocatePerSubGroup(podGroupInfo *PodGroupInfo) map[string]int {
162+
func getNumTasksToAllocatePerSubGroup(podGroupInfo *PodGroupInfo, isRealAllocation bool) map[string]int {
163163
maxTasksToAllocate := map[string]int{}
164164
for name, subGroup := range podGroupInfo.SubGroups {
165165
numAllocatedTasks := subGroup.GetNumActiveAllocatedTasks()
166166
if numAllocatedTasks >= int(subGroup.minAvailable) {
167-
maxTasksToAllocate[name] = int(math.Min(float64(subGroup.GetNumPendingTasks()), 1))
167+
numTasksToAllocate := getNumAllocatableTasks(subGroup, isRealAllocation)
168+
maxTasksToAllocate[name] = int(math.Min(float64(numTasksToAllocate), 1))
168169
} else {
169170
maxTasksToAllocate[name] = int(subGroup.minAvailable) - numAllocatedTasks
170171
}
171172
}
172173
return maxTasksToAllocate
173174
}
174175

176+
func getNumAllocatableTasks(subGroup *SubGroupInfo, isRealAllocation bool) int {
177+
numTasksToAllocate := 0
178+
for _, task := range subGroup.GetPodInfos() {
179+
if task.ShouldAllocate(isRealAllocation) {
180+
numTasksToAllocate += 1
181+
}
182+
}
183+
return numTasksToAllocate
184+
}
185+
175186
func getNumOfSubGroupsToAllocate(podGroupInfo *PodGroupInfo) int {
176187
for _, subGroup := range podGroupInfo.SubGroups {
177188
allocatedTasks := subGroup.GetNumActiveAllocatedTasks()

pkg/scheduler/api/podgroup_info/allocation_info_test.go

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
package podgroup_info
55

66
import (
7+
"fmt"
78
"testing"
89

910
v1 "k8s.io/api/core/v1"
11+
"k8s.io/apimachinery/pkg/types"
1012

1113
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
1214
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
@@ -162,20 +164,20 @@ func Test_getNumOfTasksToAllocate(t *testing.T) {
162164
pg.AddTaskInfo(simpleTask("p1", "", pod_status.Pending))
163165
pg.AddTaskInfo(simpleTask("p2", "", pod_status.Allocated))
164166
want := 1
165-
got := getNumOfTasksToAllocate(pg)
167+
got := getNumTasksToAllocate(pg)
166168
if got != want {
167169
t.Errorf("got %d want %d", got, want)
168170
}
169171
}
170172

171-
func Test_getNumOfTasksToAllocatePerSubGroup(t *testing.T) {
173+
func Test_getNumTasksToAllocatePerSubGroup(t *testing.T) {
172174
pg := NewPodGroupInfo("pg")
173175
sg := NewSubGroupInfo("sg", 1)
174176
pg.SubGroups["sg"] = sg
175177

176178
pg.AddTaskInfo(simpleTask("p1", "sg", pod_status.Pending))
177179
pg.AddTaskInfo(simpleTask("p2", "sg", pod_status.Allocated))
178-
m := getNumOfTasksToAllocatePerSubGroup(pg)
180+
m := getNumTasksToAllocatePerSubGroup(pg, true)
179181
if m["sg"] != 1 {
180182
t.Errorf("want 1, got %v", m["sg"])
181183
}
@@ -279,8 +281,82 @@ func Test_getMaxNumOfTasksToAllocate(t *testing.T) {
279281
pg.AddTaskInfo(pi)
280282
}
281283

282-
if got := getNumOfTasksToAllocate(pg); got != tt.want {
283-
t.Errorf("getNumOfTasksToAllocate() = %v, want %v", got, tt.want)
284+
if got := getNumTasksToAllocate(pg); got != tt.want {
285+
t.Errorf("getNumTasksToAllocate() = %v, want %v", got, tt.want)
286+
}
287+
})
288+
}
289+
}
290+
291+
func Test_getNumAllocatableTasks(t *testing.T) {
292+
tests := []struct {
293+
name string
294+
taskStatuses []pod_status.PodStatus
295+
isRealAllocation bool
296+
want int
297+
}{
298+
{
299+
name: "no tasks",
300+
taskStatuses: nil,
301+
isRealAllocation: true,
302+
want: 0,
303+
},
304+
{
305+
name: "all pending",
306+
taskStatuses: []pod_status.PodStatus{pod_status.Pending, pod_status.Pending},
307+
isRealAllocation: true,
308+
want: 2,
309+
},
310+
{
311+
name: "pending and running",
312+
taskStatuses: []pod_status.PodStatus{pod_status.Pending, pod_status.Running},
313+
isRealAllocation: true,
314+
want: 1, // assuming only Pending is allocatable
315+
},
316+
{
317+
name: "pending and releasing - real allocation",
318+
taskStatuses: []pod_status.PodStatus{pod_status.Pending, pod_status.Releasing},
319+
isRealAllocation: true,
320+
want: 1, // only Pending is allocatable with real allocation
321+
},
322+
{
323+
name: "pending and releasing - non-real allocation",
324+
taskStatuses: []pod_status.PodStatus{pod_status.Pending, pod_status.Releasing},
325+
isRealAllocation: false,
326+
want: 2, // assuming both Pending and Releasing are allocatable
327+
},
328+
{
329+
name: "allocated and succeeded",
330+
taskStatuses: []pod_status.PodStatus{pod_status.Allocated, pod_status.Succeeded},
331+
isRealAllocation: true,
332+
want: 0,
333+
},
334+
{
335+
name: "all succeeded",
336+
taskStatuses: []pod_status.PodStatus{pod_status.Succeeded, pod_status.Succeeded},
337+
isRealAllocation: true,
338+
want: 0,
339+
},
340+
}
341+
342+
for _, tt := range tests {
343+
t.Run(tt.name, func(t *testing.T) {
344+
sg := NewSubGroupInfo("test-subgroup", 1)
345+
for i, st := range tt.taskStatuses {
346+
p := simpleTask(
347+
fmt.Sprintf("test-task-%d", i),
348+
"test-subgroup",
349+
st,
350+
)
351+
p.Pod.UID = types.UID(fmt.Sprintf("test-pod-%d", i))
352+
if p.Status == pod_status.Releasing && !tt.isRealAllocation {
353+
p.IsVirtualStatus = true
354+
}
355+
sg.AssignTask(p)
356+
}
357+
got := getNumAllocatableTasks(sg, tt.isRealAllocation)
358+
if got != tt.want {
359+
t.Errorf("getNumAllocatableTasks() = %d, want %d", got, tt.want)
284360
}
285361
})
286362
}

0 commit comments

Comments
 (0)