Skip to content

Commit 0a0b84f

Browse files
authored
Added SubGroupOrder plugin (#357)
* Added SubGroupOrder plugin * Added prioritization consts to subgroup order plugin
1 parent 448da4f commit 0a0b84f

File tree

29 files changed

+341
-84
lines changed

29 files changed

+341
-84
lines changed

deployments/kai-scheduler/templates/services/scheduler-configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ data:
1616
- name: elastic
1717
- name: kubeflow
1818
- name: ray
19+
- name: subgrouporder
1920
- name: taskorder
2021
- name: nominatednode
2122
- name: dynamicresources

pkg/scheduler/actions/allocate/allocate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
7979
func attemptToAllocateJob(ssn *framework.Session, stmt *framework.Statement, job *podgroup_info.PodGroupInfo) (allocated, pipelined bool) {
8080
queue := ssn.Queues[job.Queue]
8181

82-
resReq := podgroup_info.GetTasksToAllocateInitResource(job, ssn.TaskOrderFn, true)
82+
resReq := podgroup_info.GetTasksToAllocateInitResource(job, ssn.SubGroupOrderFn, ssn.TaskOrderFn, true)
8383
log.InfraLogger.V(3).Infof("Attempting to allocate job: <%v/%v> of queue <%v>, resources: <%v>",
8484
job.Namespace, job.Name, queue.Name, resReq)
8585

pkg/scheduler/actions/common/action.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,14 @@ func TryToVirtuallyAllocatePreemptorAndGetVictims(
9494
}
9595

9696
resReq := podgroup_info.GetTasksToAllocateInitResource(
97-
jobToAllocate, ssn.TaskOrderFn, false)
97+
jobToAllocate, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
9898
log.InfraLogger.V(6).Infof("Trying to pipeline job: <%s/%s>. resources required: %v",
9999
jobToAllocate.Namespace, jobToAllocate.Name, resReq)
100100

101101
if jobToAllocate.UID != preemptor.UID {
102102
if !AllocateJob(ssn, stmt, nodes, jobToAllocate, true) {
103-
tasksToAllocate := podgroup_info.GetTasksToAllocate(jobToAllocate, ssn.TaskOrderFn, false)
103+
tasksToAllocate := podgroup_info.GetTasksToAllocate(jobToAllocate, ssn.SubGroupOrderFn,
104+
ssn.TaskOrderFn, false)
104105
newVictims = append(newVictims, tasksToAllocate...)
105106
}
106107
continue

pkg/scheduler/actions/common/allocate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717

1818
func AllocateJob(ssn *framework.Session, stmt *framework.Statement, nodes []*node_info.NodeInfo,
1919
job *podgroup_info.PodGroupInfo, isPipelineOnly bool) bool {
20-
tasksToAllocate := podgroup_info.GetTasksToAllocate(job, ssn.TaskOrderFn, !isPipelineOnly)
20+
tasksToAllocate := podgroup_info.GetTasksToAllocate(job, ssn.SubGroupOrderFn, ssn.TaskOrderFn, !isPipelineOnly)
2121

2222
result := ssn.IsJobOverQueueCapacityFn(job, tasksToAllocate)
2323
if !result.IsSchedulable {

pkg/scheduler/actions/common/solvers/job_solver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (s *JobSolver) Solve(
5151

5252
var statement *framework.Statement
5353
var pendingTasks []*pod_info.PodInfo
54-
tasksToAllocate := podgroup_info.GetTasksToAllocate(pendingJob, ssn.TaskOrderFn, false)
54+
tasksToAllocate := podgroup_info.GetTasksToAllocate(pendingJob, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
5555
for _, nextTaskToSolve := range tasksToAllocate {
5656
nextTasksToSolve := []*pod_info.PodInfo{nextTaskToSolve}
5757
pendingTasks = append(pendingTasks, nextTasksToSolve...)

pkg/scheduler/actions/common/solvers/pod_scenario_builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func NewPodAccumulatedScenarioBuilder(
3434

3535
var scenario *solverscenario.ByNodeScenario = nil
3636
recordedVictimsTasks := make(map[common_info.PodID]*pod_info.PodInfo)
37-
tasksToAllocate := podgroup_info.GetTasksToAllocate(pendingJob, session.TaskOrderFn, false)
37+
tasksToAllocate := podgroup_info.GetTasksToAllocate(pendingJob, session.SubGroupOrderFn, session.TaskOrderFn, false)
3838
if len(tasksToAllocate) != 0 {
3939
scenario = solverscenario.NewByNodeScenario(session, pendingJob, pendingJob, nil, recordedVictimsJobs)
4040
for _, job := range recordedVictimsJobs {
@@ -76,7 +76,7 @@ func (asb *PodAccumulatedScenarioBuilder) addNextPotentialVictims() bool {
7676
nextVictimJob := asb.victimsJobsQueue.PopNextJob()
7777

7878
potentialVictimTasks, jobHasMoreTasks := podgroup_info.GetTasksToEvict(
79-
nextVictimJob, asb.session.TaskOrderFn,
79+
nextVictimJob, asb.session.SubGroupOrderFn, asb.session.TaskOrderFn,
8080
)
8181

8282
// Jump over recorded victims in potential victims generation

pkg/scheduler/actions/consolidation/consolidation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (alloc *consolidationAction) Execute(ssn *framework.Session) {
7979

8080
func attemptToConsolidateForPreemptor(
8181
ssn *framework.Session, job *podgroup_info.PodGroupInfo) (bool, *framework.Statement) {
82-
resReq := podgroup_info.GetTasksToAllocateInitResource(job, ssn.TaskOrderFn, false)
82+
resReq := podgroup_info.GetTasksToAllocateInitResource(job, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
8383
log.InfraLogger.V(3).Infof(
8484
"Attempting to consolidate running jobs in order to make room for job: <%s/%s>, resources: <%v>",
8585
job.Namespace, job.Name, resReq)

pkg/scheduler/actions/preempt/preempt.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,12 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
9999
func attemptToPreemptForPreemptor(
100100
ssn *framework.Session, preemptor *podgroup_info.PodGroupInfo,
101101
) (bool, *framework.Statement, []string) {
102-
resReq := podgroup_info.GetTasksToAllocateInitResource(preemptor, ssn.TaskOrderFn, false)
102+
resReq := podgroup_info.GetTasksToAllocateInitResource(preemptor, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
103103
log.InfraLogger.V(3).Infof(
104104
"Attempting to preempt for job: <%v/%v>, priority: <%v>, queue: <%v>, resources: <%v>",
105105
preemptor.Namespace, preemptor.Name, preemptor.Priority, preemptor.Queue, resReq)
106106

107-
preemptorTasks := podgroup_info.GetTasksToAllocate(preemptor, ssn.TaskOrderFn, false)
107+
preemptorTasks := podgroup_info.GetTasksToAllocate(preemptor, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
108108
if result := ssn.IsNonPreemptibleJobOverQueueQuotaFn(preemptor, preemptorTasks); !result.IsSchedulable {
109109
log.InfraLogger.V(3).Infof("Job <%v/%v> would have placed the queue resources over quota",
110110
preemptor.Namespace, preemptor.Name)

pkg/scheduler/actions/reclaim/reclaim.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (ra *reclaimAction) attemptToReclaimForSpecificJob(
103103
ssn *framework.Session, reclaimer *podgroup_info.PodGroupInfo,
104104
) (bool, *framework.Statement, []string) {
105105
queue := ssn.Queues[reclaimer.Queue]
106-
resReq := podgroup_info.GetTasksToAllocateInitResource(reclaimer, ssn.TaskOrderFn, false)
106+
resReq := podgroup_info.GetTasksToAllocateInitResource(reclaimer, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
107107
log.InfraLogger.V(3).Infof("Attempting to reclaim for job: <%v/%v> of queue <%v>, resources: <%v>",
108108
reclaimer.Namespace, reclaimer.Name, queue.Name, resReq)
109109

pkg/scheduler/actions/utils/action.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ func GetAllPendingJobs(ssn *framework.Session) map[common_info.PodGroupID]*podgr
130130

131131
func IsEnoughGPUsAllocatableForJob(job *podgroup_info.PodGroupInfo, ssn *framework.Session, isRealAllocation bool) bool {
132132
sumOfAllAllocatableGPUs, sumOfAllAllocatableGPUsMemory := getSumOfAvailableGPUs(ssn)
133-
requestedGPUs, requestedGpuMemory := podgroup_info.GetTasksToAllocateRequestedGPUs(job, ssn.TaskOrderFn,
134-
isRealAllocation)
135-
resReq := podgroup_info.GetTasksToAllocateInitResource(job, ssn.TaskOrderFn, isRealAllocation)
133+
requestedGPUs, requestedGpuMemory := podgroup_info.GetTasksToAllocateRequestedGPUs(job, ssn.SubGroupOrderFn,
134+
ssn.TaskOrderFn, isRealAllocation)
135+
resReq := podgroup_info.GetTasksToAllocateInitResource(job, ssn.SubGroupOrderFn, ssn.TaskOrderFn, isRealAllocation)
136136
log.InfraLogger.V(7).Infof(
137137
"Task: <%v/%v> resources requires: <%v>, sumOfAllAllocatableGPUs: <%v, %v mb>",
138138
job.Namespace, job.Name, resReq, sumOfAllAllocatableGPUs, sumOfAllAllocatableGPUsMemory)

0 commit comments

Comments
 (0)