From bf137a274903a68b3654412d533e08b6e4e07fbf Mon Sep 17 00:00:00 2001 From: "Omer Dayan (SW-GPU)" Date: Tue, 26 Aug 2025 14:59:28 +0300 Subject: [PATCH 1/6] IsElastic --- pkg/scheduler/api/podgroup_info/job_info.go | 7 ++- .../api/podgroup_info/subgroup_info.go | 4 ++ .../api/podgroup_info/subgroup_info_test.go | 46 +++++++++++++++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/api/podgroup_info/job_info.go b/pkg/scheduler/api/podgroup_info/job_info.go index ae6cc5d2a..eab24b721 100644 --- a/pkg/scheduler/api/podgroup_info/job_info.go +++ b/pkg/scheduler/api/podgroup_info/job_info.go @@ -403,7 +403,12 @@ func (pgi *PodGroupInfo) IsReadyForScheduling() bool { } func (pgi *PodGroupInfo) IsElastic() bool { - return pgi.GetDefaultMinAvailable() < int32(len(pgi.GetAllPodsMap())) + for _, subGroup := range pgi.GetSubGroups() { + if subGroup.IsElastic() { + return true + } + } + return false } func (pgi *PodGroupInfo) IsStale() bool { diff --git a/pkg/scheduler/api/podgroup_info/subgroup_info.go b/pkg/scheduler/api/podgroup_info/subgroup_info.go index 2314f3d12..b227c60a2 100644 --- a/pkg/scheduler/api/podgroup_info/subgroup_info.go +++ b/pkg/scheduler/api/podgroup_info/subgroup_info.go @@ -117,6 +117,10 @@ func (sgi *SubGroupInfo) IsGangSatisfied() bool { return numActiveTasks >= int(sgi.minAvailable) } +func (sgi *SubGroupInfo) IsElastic() bool { + return sgi.GetMinAvailable() < int32(len(sgi.GetPodInfos())) +} + func (sgi *SubGroupInfo) GetNumActiveAllocatedTasks() int { return sgi.numActiveAllocatedTasks } diff --git a/pkg/scheduler/api/podgroup_info/subgroup_info_test.go b/pkg/scheduler/api/podgroup_info/subgroup_info_test.go index e9f621d48..90add672b 100644 --- a/pkg/scheduler/api/podgroup_info/subgroup_info_test.go +++ b/pkg/scheduler/api/podgroup_info/subgroup_info_test.go @@ -332,6 +332,52 @@ func TestGetNumPendingTasks(t *testing.T) { } } +func TestIsElastic(t *testing.T) { + tests := []struct { + name string + pods []*pod_info.PodInfo + expected bool + }{ + { + name: "satisfied with exact minimum", + pods: []*pod_info.PodInfo{ + {UID: "1", Status: pod_status.Running}, + {UID: "2", Status: pod_status.Running}, + }, + expected: false, + }, + { + name: "not satisfied with insufficient pods", + pods: []*pod_info.PodInfo{ + {UID: "1", Status: pod_status.Pending}, + }, + expected: false, + }, + { + name: "satisfied with above minimum pods", + pods: []*pod_info.PodInfo{ + {UID: "1", Status: pod_status.Pending}, + {UID: "2", Status: pod_status.Running}, + {UID: "3", Status: pod_status.Pending}, + {UID: "4", Status: pod_status.Gated}, + {UID: "5", Status: pod_status.Pending}, + }, + expected: true, + }, + } + + for _, test := range tests { + sgi := NewSubGroupInfo("test", 2) + for _, pod := range test.pods { + sgi.AssignTask(pod) + } + + if got := sgi.IsElastic(); got != test.expected { + t.Errorf("Name: %v, IsElastic() = %v, want %v", test.name, got, test.expected) + } + } +} + func TestGetNumActiveAllocatedTasks(t *testing.T) { tests := []struct { name string From 284f0b852928052a53a9aa6cb1ecea28c1be75b0 Mon Sep 17 00:00:00 2001 From: "Omer Dayan (SW-GPU)" Date: Tue, 26 Aug 2025 15:00:57 +0300 Subject: [PATCH 2/6] IsGangSatisfied --- pkg/scheduler/api/podgroup_info/job_info.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/scheduler/api/podgroup_info/job_info.go b/pkg/scheduler/api/podgroup_info/job_info.go index eab24b721..a158f0439 100644 --- a/pkg/scheduler/api/podgroup_info/job_info.go +++ b/pkg/scheduler/api/podgroup_info/job_info.go @@ -429,10 +429,6 @@ func (pgi *PodGroupInfo) IsStale() bool { } func (pgi *PodGroupInfo) IsGangSatisfied() bool { - numActiveTasks := pgi.GetNumActiveUsedTasks() - if numActiveTasks < int(pgi.GetDefaultMinAvailable()) { - return false - } for _, subGroup := range pgi.SubGroups { if !subGroup.IsGangSatisfied() { return false From d921718b09f6a304759664a5f18ea83761a26282 Mon Sep 17 00:00:00 2001 From: "Omer Dayan (SW-GPU)" Date: Tue, 26 Aug 2025 15:08:40 +0300 Subject: [PATCH 3/6] ShouldPipeline --- pkg/scheduler/api/podgroup_info/job_info.go | 30 ++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/pkg/scheduler/api/podgroup_info/job_info.go b/pkg/scheduler/api/podgroup_info/job_info.go index a158f0439..a78d9c1f2 100644 --- a/pkg/scheduler/api/podgroup_info/job_info.go +++ b/pkg/scheduler/api/podgroup_info/job_info.go @@ -438,20 +438,26 @@ func (pgi *PodGroupInfo) IsGangSatisfied() bool { } func (pgi *PodGroupInfo) ShouldPipelineJob() bool { - hasPipelinedTask := false - activeAllocatedTasksCount := 0 - for _, task := range pgi.GetAllPodsMap() { - if task.Status == pod_status.Pipelined { - log.InfraLogger.V(7).Infof("task: <%v/%v> was pipelined to node: <%v>", - task.Namespace, task.Name, task.NodeName) - hasPipelinedTask = true - } else if pod_status.IsActiveAllocatedStatus(task.Status) { - activeAllocatedTasksCount += 1 + for _, subGroup := range pgi.SubGroups { + hasPipelinedTask := false + activeAllocatedTasksCount := 0 + for _, task := range subGroup.GetPodInfos() { + if task.Status == pod_status.Pipelined { + log.InfraLogger.V(7).Infof("task: <%v/%v> was pipelined to node: <%v>", + task.Namespace, task.Name, task.NodeName) + hasPipelinedTask = true + } else if pod_status.IsActiveAllocatedStatus(task.Status) { + activeAllocatedTasksCount += 1 + } + } + + if hasPipelinedTask && activeAllocatedTasksCount < int(subGroup.GetMinAvailable()) { + log.InfraLogger.V(7).Infof("Subgroup: <%v/%v> has pipelined tasks, and not enough allocated pods for minAvailable <%v>. Pipeline all.", + pgi.UID, subGroup.GetName(), subGroup.GetMinAvailable()) + return true } } - // If the job has already MinAvailable tasks active allocated (but not pipelined), - // then we shouldn't convert non-pipelined tasks to pipeline. - return hasPipelinedTask && activeAllocatedTasksCount < int(pgi.GetDefaultMinAvailable()) + return false } func (pgi *PodGroupInfo) Clone() *PodGroupInfo { From 6a07ee0fb892cc7d27beb4e32e7f7a646096915c Mon Sep 17 00:00:00 2001 From: "Omer Dayan (SW-GPU)" Date: Tue, 26 Aug 2025 15:41:26 +0300 Subject: [PATCH 4/6] OrderJob Elastic --- pkg/scheduler/plugins/elastic/elastic.go | 26 +++++++++++++---- pkg/scheduler/plugins/elastic/elastic_test.go | 28 +++++++++++++++++++ 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/pkg/scheduler/plugins/elastic/elastic.go b/pkg/scheduler/plugins/elastic/elastic.go index c2414b900..22c282b82 100644 --- a/pkg/scheduler/plugins/elastic/elastic.go +++ b/pkg/scheduler/plugins/elastic/elastic.go @@ -29,22 +29,22 @@ func JobOrderFn(l, r interface{}) int { lv := l.(*podgroup_info.PodGroupInfo) rv := r.(*podgroup_info.PodGroupInfo) - lvAllocatedCount := int32(lv.GetActiveAllocatedTasksCount()) - rvAllocatedCount := int32(rv.GetActiveAllocatedTasksCount()) + lvBelowMinAvailable, lvAboveMinAvailable, lvExactlyAtMinAvailable := minAvailableState(lv) + rvBelowMinAvailable, rvAboveMinAvailable, rvExactlyAtMinAvailable := minAvailableState(rv) - if lvAllocatedCount < lv.GetDefaultMinAvailable() && rvAllocatedCount >= rv.GetDefaultMinAvailable() { + if lvBelowMinAvailable && !rvBelowMinAvailable { return -1 } - if lvAllocatedCount == lv.GetDefaultMinAvailable() && rvAllocatedCount > rv.GetDefaultMinAvailable() { + if lvExactlyAtMinAvailable && rvAboveMinAvailable { return -1 } - if lvAllocatedCount >= lv.GetDefaultMinAvailable() && rvAllocatedCount < rv.GetDefaultMinAvailable() { + if !lvBelowMinAvailable && rvBelowMinAvailable { return 1 } - if lvAllocatedCount > lv.GetDefaultMinAvailable() && rvAllocatedCount == rv.GetDefaultMinAvailable() { + if lvAboveMinAvailable && rvExactlyAtMinAvailable { return 1 } @@ -53,4 +53,18 @@ func JobOrderFn(l, r interface{}) int { return 0 } +func minAvailableState(pgi *podgroup_info.PodGroupInfo) (bool, bool, bool) { + exactlyAtMinAvailable := true + for _, subGroup := range pgi.SubGroups { + numAllocatedTasks := int32(subGroup.GetNumActiveAllocatedTasks()) + if numAllocatedTasks < subGroup.GetMinAvailable() { + return true, false, false + } + if numAllocatedTasks > subGroup.GetMinAvailable() { + exactlyAtMinAvailable = false + } + } + return false, !exactlyAtMinAvailable, exactlyAtMinAvailable +} + func (pp *elasticPlugin) OnSessionClose(_ *framework.Session) {} diff --git a/pkg/scheduler/plugins/elastic/elastic_test.go b/pkg/scheduler/plugins/elastic/elastic_test.go index 813573f34..438f101d1 100644 --- a/pkg/scheduler/plugins/elastic/elastic_test.go +++ b/pkg/scheduler/plugins/elastic/elastic_test.go @@ -60,6 +60,7 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -74,6 +75,7 @@ func TestJobOrderFn(t *testing.T) { }, rPods: []*pod_info.PodInfo{ { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -94,6 +96,7 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Allocated, ResReq: resource_info.EmptyResourceRequirements(), @@ -108,6 +111,7 @@ func TestJobOrderFn(t *testing.T) { }, rPods: []*pod_info.PodInfo{ { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -128,6 +132,7 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Bound, ResReq: resource_info.EmptyResourceRequirements(), @@ -142,6 +147,7 @@ func TestJobOrderFn(t *testing.T) { }, rPods: []*pod_info.PodInfo{ { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -162,6 +168,7 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Releasing, ResReq: resource_info.EmptyResourceRequirements(), @@ -176,6 +183,7 @@ func TestJobOrderFn(t *testing.T) { }, rPods: []*pod_info.PodInfo{ { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -196,6 +204,7 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -222,6 +231,7 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -236,11 +246,13 @@ func TestJobOrderFn(t *testing.T) { }, rPods: []*pod_info.PodInfo{ { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), }, { + UID: "pod-b-2", Name: "pod-b-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -261,6 +273,7 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -275,16 +288,19 @@ func TestJobOrderFn(t *testing.T) { }, rPods: []*pod_info.PodInfo{ { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), }, { + UID: "pod-b-2", Name: "pod-b-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), }, { + UID: "pod-b-3", Name: "pod-b-3", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -305,6 +321,7 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -319,11 +336,13 @@ func TestJobOrderFn(t *testing.T) { }, rPods: []*pod_info.PodInfo{ { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), }, { + UID: "pod-b-2", Name: "pod-b-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -344,6 +363,7 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -358,11 +378,13 @@ func TestJobOrderFn(t *testing.T) { }, rPods: []*pod_info.PodInfo{ { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), }, { + UID: "pod-b-2", Name: "pod-b-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -383,11 +405,13 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), }, { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -402,6 +426,7 @@ func TestJobOrderFn(t *testing.T) { }, rPods: []*pod_info.PodInfo{ { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -422,11 +447,13 @@ func TestJobOrderFn(t *testing.T) { }, lPods: []*pod_info.PodInfo{ { + UID: "pod-a-1", Name: "pod-a-1", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), }, { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), @@ -441,6 +468,7 @@ func TestJobOrderFn(t *testing.T) { }, rPods: []*pod_info.PodInfo{ { + UID: "pod-a-2", Name: "pod-a-2", Status: pod_status.Running, ResReq: resource_info.EmptyResourceRequirements(), From 7cf50beaeda0fa83d8e138e225c1590fc9ad56a2 Mon Sep 17 00:00:00 2001 From: "Omer Dayan (SW-GPU)" Date: Tue, 26 Aug 2025 17:34:51 +0300 Subject: [PATCH 5/6] SplitVictimTasks --- .../plugins/proportion/proportion.go | 39 +++++++++++++------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 7640cf04b..bcfdc87fc 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -155,7 +155,7 @@ func (pp *proportionPlugin) reclaimableFn( func (pp *proportionPlugin) getVictimResources(victim *api.VictimInfo) []*resource_info.Resource { var victimResources []*resource_info.Resource - elasticTasks, coreTasks := splitVictimTasks(victim.Tasks, victim.Job.GetDefaultMinAvailable()) + elasticTasks, coreTasks := splitVictimTasks(victim.Tasks, victim.Job.GetSubGroups()) // Process elastic tasks individually for _, task := range elasticTasks { @@ -177,19 +177,34 @@ func (pp *proportionPlugin) getVictimResources(victim *api.VictimInfo) []*resour // splitVictimTasks safely splits victim tasks into elastic and core tasks // Returns (elasticTasks, coreTasks) -func splitVictimTasks(tasks []*pod_info.PodInfo, minAvailable int32) ([]*pod_info.PodInfo, []*pod_info.PodInfo) { - totalTasks := len(tasks) - minAvailableInt := int(minAvailable) - - // Handle case where minAvailable is greater than or equal to the number of tasks - if minAvailableInt >= totalTasks { - // All tasks are considered core tasks, no elastic tasks - return nil, tasks +func splitVictimTasks(tasks []*pod_info.PodInfo, subGroups map[string]*podgroup_info.SubGroupInfo) ([]*pod_info.PodInfo, []*pod_info.PodInfo) { + subGroupsToTasks := map[string][]*pod_info.PodInfo{} + for _, task := range tasks { + subGroupName := podgroup_info.DefaultSubGroup + if task.SubGroupName != "" { + subGroupName = task.SubGroupName + } + if _, found := subGroupsToTasks[subGroupName]; !found { + subGroupsToTasks[subGroupName] = []*pod_info.PodInfo{} + } + subGroupsToTasks[subGroupName] = append(subGroupsToTasks[subGroupName], task) } - // Normal case: split tasks into elastic and core - elasticTasks := tasks[minAvailableInt:] - coreTasks := tasks[:minAvailableInt] + coreTasks := []*pod_info.PodInfo{} + elasticTasks := []*pod_info.PodInfo{} + for subGroupName, subGroupTasks := range subGroupsToTasks { + subGroup := subGroups[subGroupName] + + // Handle case where minAvailable is greater than or equal to the number of tasks + if subGroup.GetMinAvailable() >= int32(len(subGroupTasks)) { + // All tasks are considered core tasks, no elastic tasks + coreTasks = append(coreTasks, subGroupTasks...) + continue + } + + coreTasks = append(coreTasks, subGroupTasks[:subGroup.GetMinAvailable()]...) + elasticTasks = append(elasticTasks, subGroupTasks[subGroup.GetMinAvailable():]...) + } return elasticTasks, coreTasks } From c4e46c226b21db3006f29915503cbad7eeaa40f8 Mon Sep 17 00:00:00 2001 From: "Omer Dayan (SW-GPU)" Date: Tue, 26 Aug 2025 17:46:04 +0300 Subject: [PATCH 6/6] MinRuntime --- .../plugins/minruntime/minruntime.go | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/pkg/scheduler/plugins/minruntime/minruntime.go b/pkg/scheduler/plugins/minruntime/minruntime.go index 02f7c9d76..1538035de 100644 --- a/pkg/scheduler/plugins/minruntime/minruntime.go +++ b/pkg/scheduler/plugins/minruntime/minruntime.go @@ -126,9 +126,8 @@ func (mr *minruntimePlugin) reclaimScenarioValidatorFn(scenario api.ScenarioInfo if !protected { continue } - numVictimTasks := int32(len(victimInfo.Tasks)) - currentlyRunning := victimInfo.Job.GetActivelyRunningTasksCount() - if victimInfo.Job.GetDefaultMinAvailable() > currentlyRunning-numVictimTasks { + + if !validVictimForMinAvailable(victimInfo) { return false } } @@ -145,9 +144,8 @@ func (mr *minruntimePlugin) preemptScenarioValidatorFn(scenario api.ScenarioInfo if !protected { continue } - numVictimTasks := int32(len(victimInfo.Tasks)) - currentlyRunning := victimInfo.Job.GetActivelyRunningTasksCount() - if victimInfo.Job.GetDefaultMinAvailable() > currentlyRunning-numVictimTasks { + + if !validVictimForMinAvailable(victimInfo) { return false } } @@ -212,3 +210,27 @@ func (mr *minruntimePlugin) cacheReclaimProtection(pendingJob *podgroup_info.Pod } mr.reclaimProtectionCache[pendingJob.UID][victim.UID] = protected } + +func validVictimForMinAvailable(victimInfo *api.VictimInfo) bool { + numVictimTasksPerSubGroup := map[string]int32{} + for _, task := range victimInfo.Tasks { + subGroupName := podgroup_info.DefaultSubGroup + if task.SubGroupName != "" { + subGroupName = task.SubGroupName + } + numVictimTasksPerSubGroup[subGroupName]++ + } + + numCurrentlyRunningSubGroup := map[string]int32{} + for subGroupName := range numVictimTasksPerSubGroup { + numCurrentlyRunningSubGroup[subGroupName] = int32(victimInfo.Job.GetSubGroups()[subGroupName].GetNumActiveUsedTasks()) + } + + for subGroupName, numVictims := range numVictimTasksPerSubGroup { + subGroupCurrentlyRunning := numCurrentlyRunningSubGroup[subGroupName] + if victimInfo.Job.GetSubGroups()[subGroupName].GetMinAvailable() > subGroupCurrentlyRunning-numVictims { + return false + } + } + return true +}