Skip to content

Commit 91bea3f

Browse files
committed
SplitVictimTasks
1 parent 59cb432 commit 91bea3f

File tree

1 file changed

+27
-12
lines changed

1 file changed

+27
-12
lines changed

pkg/scheduler/plugins/proportion/proportion.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func (pp *proportionPlugin) reclaimableFn(
155155
func (pp *proportionPlugin) getVictimResources(victim *api.VictimInfo) []*resource_info.Resource {
156156
var victimResources []*resource_info.Resource
157157

158-
elasticTasks, coreTasks := splitVictimTasks(victim.Tasks, victim.Job.GetDefaultMinAvailable())
158+
elasticTasks, coreTasks := splitVictimTasks(victim.Tasks, victim.Job.GetSubGroups())
159159

160160
// Process elastic tasks individually
161161
for _, task := range elasticTasks {
@@ -177,19 +177,34 @@ func (pp *proportionPlugin) getVictimResources(victim *api.VictimInfo) []*resour
177177

178178
// splitVictimTasks safely splits victim tasks into elastic and core tasks
179179
// Returns (elasticTasks, coreTasks)
180-
func splitVictimTasks(tasks []*pod_info.PodInfo, minAvailable int32) ([]*pod_info.PodInfo, []*pod_info.PodInfo) {
181-
totalTasks := len(tasks)
182-
minAvailableInt := int(minAvailable)
183-
184-
// Handle case where minAvailable is greater than or equal to the number of tasks
185-
if minAvailableInt >= totalTasks {
186-
// All tasks are considered core tasks, no elastic tasks
187-
return nil, tasks
180+
func splitVictimTasks(tasks []*pod_info.PodInfo, subGroups map[string]*podgroup_info.SubGroupInfo) ([]*pod_info.PodInfo, []*pod_info.PodInfo) {
181+
subGroupsToTasks := map[string][]*pod_info.PodInfo{}
182+
for _, task := range tasks {
183+
subGroupName := podgroup_info.DefaultSubGroup
184+
if task.SubGroupName != "" {
185+
subGroupName = task.SubGroupName
186+
}
187+
if _, found := subGroupsToTasks[subGroupName]; !found {
188+
subGroupsToTasks[subGroupName] = []*pod_info.PodInfo{}
189+
}
190+
subGroupsToTasks[subGroupName] = append(subGroupsToTasks[subGroupName], task)
188191
}
189192

190-
// Normal case: split tasks into elastic and core
191-
elasticTasks := tasks[minAvailableInt:]
192-
coreTasks := tasks[:minAvailableInt]
193+
coreTasks := []*pod_info.PodInfo{}
194+
elasticTasks := []*pod_info.PodInfo{}
195+
for subGroupName, subGroupTasks := range subGroupsToTasks {
196+
subGroup := subGroups[subGroupName]
197+
198+
// Handle case where minAvailable is greater than or equal to the number of tasks
199+
if subGroup.GetMinAvailable() >= int32(len(subGroupTasks)) {
200+
// All tasks are considered core tasks, no elastic tasks
201+
coreTasks = append(coreTasks, subGroupTasks...)
202+
continue
203+
}
204+
205+
coreTasks = append(coreTasks, subGroupTasks[:subGroup.GetMinAvailable()]...)
206+
elasticTasks = append(elasticTasks, subGroupTasks[subGroup.GetMinAvailable():]...)
207+
}
193208

194209
return elasticTasks, coreTasks
195210
}

0 commit comments

Comments
 (0)