Skip to content

Commit 4102829

Browse files
itsomrienoodle
andauthored
Elastic reclaim panic (#390)
* Fix panic in elastic reclaim scenarios --------- Co-authored-by: Erez Freiberger <[email protected]>
1 parent 8fcb26c commit 4102829

File tree

4 files changed

+162
-11
lines changed

4 files changed

+162
-11
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
66

77
## [Unreleased]
88

9+
### Fixed
10+
- Fixed scheduler panic in some elastic reclaim scenarios
11+
912
## [v0.8.0] - 2025-08-18
1013

1114
### Added

pkg/scheduler/plugins/proportion/proportion.go

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -155,25 +155,46 @@ func (pp *proportionPlugin) reclaimableFn(
155155

156156
func (pp *proportionPlugin) getVictimResources(victim *api.VictimInfo) []*resource_info.Resource {
157157
var victimResources []*resource_info.Resource
158-
if len(victim.Tasks) > int(victim.Job.GetDefaultMinAvailable()) {
159-
elasticTasks := victim.Tasks[victim.Job.GetDefaultMinAvailable():]
160-
for _, task := range elasticTasks {
161-
resources := getResources(pp.allowConsolidatingReclaim, task)
162-
if resources == nil {
163-
continue
164-
}
165-
victimResources = append(victimResources, resources)
158+
159+
elasticTasks, coreTasks := splitVictimTasks(victim.Tasks, victim.Job.GetDefaultMinAvailable())
160+
161+
// Process elastic tasks individually
162+
for _, task := range elasticTasks {
163+
resources := getResources(pp.allowConsolidatingReclaim, task)
164+
if resources == nil {
165+
continue
166166
}
167+
victimResources = append(victimResources, resources)
167168
}
168169

169-
resources := getResources(pp.allowConsolidatingReclaim, victim.Tasks[:victim.Job.GetDefaultMinAvailable()]...)
170+
// Process core tasks as a group
171+
resources := getResources(pp.allowConsolidatingReclaim, coreTasks...)
170172
if resources != nil {
171173
victimResources = append(victimResources, resources)
172174
}
173175

174176
return victimResources
175177
}
176178

179+
// splitVictimTasks safely splits victim tasks into elastic and core tasks
180+
// Returns (elasticTasks, coreTasks)
181+
func splitVictimTasks(tasks []*pod_info.PodInfo, minAvailable int32) ([]*pod_info.PodInfo, []*pod_info.PodInfo) {
182+
totalTasks := len(tasks)
183+
minAvailableInt := int(minAvailable)
184+
185+
// Handle case where minAvailable is greater than or equal to the number of tasks
186+
if minAvailableInt >= totalTasks {
187+
// All tasks are considered core tasks, no elastic tasks
188+
return nil, tasks
189+
}
190+
191+
// Normal case: split tasks into elastic and core
192+
elasticTasks := tasks[minAvailableInt:]
193+
coreTasks := tasks[:minAvailableInt]
194+
195+
return elasticTasks, coreTasks
196+
}
197+
177198
func getResources(ignoreReallocatedTasks bool, pods ...*pod_info.PodInfo) *resource_info.Resource {
178199
resources := make([]*resource_info.ResourceRequirements, 0, len(pods))
179200
for _, task := range pods {

pkg/scheduler/plugins/proportion/proportion_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info"
2323
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
2424
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status"
25+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info"
2526
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/resource_info"
2627
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache"
2728
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf"
@@ -773,6 +774,128 @@ var _ = Describe("Set Fair Share in Proportion", func() {
773774
}
774775

775776
})
777+
778+
Context("getVictimResources", func() {
779+
It("should handle case where MinAvailable is greater than number of tasks (panic fix)", func() {
780+
plugin := &proportionPlugin{
781+
allowConsolidatingReclaim: true,
782+
}
783+
784+
// Create a victim with only 1 task but MinAvailable = 2
785+
// This should cause a slice bounds panic without the fix
786+
victim := &api.VictimInfo{
787+
Job: &podgroup_info.PodGroupInfo{},
788+
Tasks: []*pod_info.PodInfo{
789+
{
790+
Status: pod_status.Pending,
791+
AcceptedResource: common_info.BuildResourceRequirements("1", "1Gi"),
792+
},
793+
},
794+
}
795+
victim.Job.SetDefaultMinAvailable(2)
796+
797+
// This should not panic
798+
result := plugin.getVictimResources(victim)
799+
// Should return resources for the single task that exists
800+
Expect(len(result)).To(Equal(1))
801+
Expect(result[0]).ToNot(BeNil())
802+
Expect(result[0].Cpu()).To(Equal(1000.0))
803+
})
804+
805+
It("should correctly split elastic and core tasks when MinAvailable is less than task count", func() {
806+
plugin := &proportionPlugin{
807+
allowConsolidatingReclaim: true,
808+
}
809+
810+
// Create a victim with 3 tasks but MinAvailable = 1
811+
victim := &api.VictimInfo{
812+
Job: &podgroup_info.PodGroupInfo{},
813+
Tasks: []*pod_info.PodInfo{
814+
{
815+
Status: pod_status.Pending,
816+
AcceptedResource: common_info.BuildResourceRequirements("1", "1Gi"),
817+
},
818+
{
819+
Status: pod_status.Pending,
820+
AcceptedResource: common_info.BuildResourceRequirements("1", "1Gi"),
821+
},
822+
{
823+
Status: pod_status.Pending,
824+
AcceptedResource: common_info.BuildResourceRequirements("1", "1Gi"),
825+
},
826+
},
827+
}
828+
victim.Job.SetDefaultMinAvailable(1)
829+
830+
result := plugin.getVictimResources(victim)
831+
832+
// Should return 3 resources: 2 elastic tasks + 1 core task group
833+
Expect(len(result)).To(Equal(3))
834+
for _, res := range result {
835+
Expect(res).ToNot(BeNil())
836+
Expect(res.Cpu()).To(Equal(1000.0))
837+
}
838+
})
839+
840+
It("should handle case where MinAvailable equals task count", func() {
841+
plugin := &proportionPlugin{
842+
allowConsolidatingReclaim: true,
843+
}
844+
845+
// Create a victim with 2 tasks and MinAvailable = 2
846+
victim := &api.VictimInfo{
847+
Job: &podgroup_info.PodGroupInfo{},
848+
Tasks: []*pod_info.PodInfo{
849+
{
850+
Status: pod_status.Pending,
851+
AcceptedResource: common_info.BuildResourceRequirements("1", "1Gi"),
852+
},
853+
{
854+
Status: pod_status.Pending,
855+
AcceptedResource: common_info.BuildResourceRequirements("1", "1Gi"),
856+
},
857+
},
858+
}
859+
victim.Job.SetDefaultMinAvailable(2)
860+
861+
result := plugin.getVictimResources(victim)
862+
863+
// Should return 1 resource for all core tasks (no elastic tasks)
864+
Expect(len(result)).To(Equal(1))
865+
Expect(result[0]).ToNot(BeNil())
866+
Expect(result[0].Cpu()).To(Equal(2000.0)) // Combined resources
867+
})
868+
869+
It("should handle zero MinAvailable", func() {
870+
plugin := &proportionPlugin{
871+
allowConsolidatingReclaim: true,
872+
}
873+
874+
victim := &api.VictimInfo{
875+
Job: &podgroup_info.PodGroupInfo{},
876+
Tasks: []*pod_info.PodInfo{
877+
{
878+
Status: pod_status.Pending,
879+
AcceptedResource: common_info.BuildResourceRequirements("1", "1Gi"),
880+
},
881+
{
882+
Status: pod_status.Pending,
883+
AcceptedResource: common_info.BuildResourceRequirements("1", "1Gi"),
884+
},
885+
},
886+
}
887+
victim.Job.SetDefaultMinAvailable(0)
888+
889+
result := plugin.getVictimResources(victim)
890+
891+
// Should return 2 resources (each task individually as elastic)
892+
Expect(len(result)).To(Equal(2))
893+
for _, res := range result {
894+
Expect(res).ToNot(BeNil())
895+
Expect(res.Cpu()).To(Equal(1000.0))
896+
}
897+
})
898+
})
776899
})
777900

778901
var _ = Describe("New", func() {

test/e2e/suites/reclaim/reclaim_elastic_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414
v1 "k8s.io/api/core/v1"
1515
"k8s.io/apimachinery/pkg/api/resource"
1616
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17+
"k8s.io/apimachinery/pkg/types"
1718
"k8s.io/apimachinery/pkg/watch"
19+
"sigs.k8s.io/controller-runtime/pkg/client"
1820

1921
v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2"
2022
"github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
@@ -190,7 +192,7 @@ var _ = Describe("Reclaim with Elastic Jobs", Ordered, func() {
190192

191193
It("Reclaim elastic job partially for a distributed job", func(ctx context.Context) {
192194
testCtx = testcontext.GetConnectivity(ctx, Default)
193-
parentQueue, reclaimeeQueue, reclaimerQueue = createQueues(3, 1, 2)
195+
parentQueue, reclaimeeQueue, reclaimerQueue = createQueues(4, 2, 2)
194196
reclaimeeQueue.Spec.Resources.GPU.OverQuotaWeight = 0
195197
testCtx.InitQueues([]*v2.Queue{parentQueue, reclaimeeQueue, reclaimerQueue})
196198
reclaimeeNamespace = queue.GetConnectedNamespaceToQueue(reclaimeeQueue)
@@ -204,6 +206,8 @@ var _ = Describe("Reclaim with Elastic Jobs", Ordered, func() {
204206
reclaimeePodGroup, reclaimeePods := pod_group.CreateWithPods(ctx, testCtx.KubeClientset, testCtx.KubeAiSchedClientset,
205207
"elastic-reclaimee-job", reclaimeeQueue, 3, nil,
206208
reclaimeePodRequirements)
209+
Expect(testCtx.ControllerClient.Patch(
210+
ctx, reclaimeePodGroup, client.RawPatch(types.JSONPatchType, []byte(`[{"op": "replace", "path": "/spec/minMember", "value": 2}]`)))).To(Succeed())
207211
wait.ForPodsScheduled(ctx, testCtx.ControllerClient, reclaimeeNamespace, reclaimeePods)
208212

209213
// reclaimer job
@@ -225,7 +229,7 @@ var _ = Describe("Reclaim with Elastic Jobs", Ordered, func() {
225229
LabelSelector: fmt.Sprintf("%s=%s", podGroupLabelName, reclaimeePodGroup.Name),
226230
})
227231
Expect(err).To(Succeed())
228-
return len(pods.Items) == 1
232+
return len(pods.Items) == 2
229233
})
230234
})
231235
It("Reclaim elastic job with min runtime protecting", func(ctx context.Context) {

0 commit comments

Comments
 (0)