Skip to content

Commit f44935f

Browse files
authored
Added SubGroups to PodGroup CRD and SubGroupInfo to PodGroupInfo (#342)
Added SubGroups to PodGroup CRD and SubGroupInfo to PodGroupInfo
1 parent f9bacb0 commit f44935f

File tree

10 files changed

+310
-9
lines changed

10 files changed

+310
-9
lines changed

deployments/crds/internal/scheduling.run.ai_podgroups.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,25 @@ spec:
8787
-1 and 1
8888
format: int32
8989
type: integer
90+
subGroups:
91+
description: SubGroups defines finer-grained subsets of pods within
92+
the PodGroup with individual scheduling constraints
93+
items:
94+
properties:
95+
minMember:
96+
description: |-
97+
MinMember defines the minimal number of members/tasks to run this SubGroup;
98+
if there's not enough resources to start all tasks, the scheduler will not start anyone.
99+
format: int32
100+
type: integer
101+
name:
102+
description: Name uniquely identifies the SubGroup within the
103+
parent PodGroup.
104+
type: string
105+
required:
106+
- name
107+
type: object
108+
type: array
90109
topologyConstraint:
91110
description: TopologyConstraint defines the topology constraints for
92111
this PodGroup

pkg/apis/scheduling/v2alpha2/podgroup_types.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ type PodGroupSpec struct {
6666

6767
// TopologyConstraint defines the topology constraints for this PodGroup
6868
TopologyConstraint TopologyConstraint `json:"topologyConstraint,omitempty"`
69+
70+
// SubGroups defines finer-grained subsets of pods within the PodGroup with individual scheduling constraints
71+
SubGroups []SubGroup `json:"subGroups,omitempty"`
72+
}
73+
74+
type SubGroup struct {
75+
// Name uniquely identifies the SubGroup within the parent PodGroup.
76+
Name string `json:"name"`
77+
78+
// MinMember defines the minimal number of members/tasks to run this SubGroup;
79+
// if there's not enough resources to start all tasks, the scheduler will not start anyone.
80+
MinMember int32 `json:"minMember,omitempty"`
6981
}
7082

7183
// PodGroupStatus defines the observed state of PodGroup

pkg/apis/scheduling/v2alpha2/zz_generated.deepcopy.go

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/scheduler/api/pod_info/pod_info.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
GPUGroup = "runai-gpu-group"
4343
ReceivedResourceTypeAnnotationName = "received-resource-type"
4444
WholeGpuIndicator = "-2"
45+
SubGroupLabelKey = "kai.scheduler/subgroup-name"
4546
)
4647

4748
type ResourceRequestType string
@@ -71,6 +72,8 @@ type PodInfo struct {
7172
Name string
7273
Namespace string
7374

75+
SubGroupName string
76+
7477
ResourceRequestType ResourceRequestType
7578
ResourceReceivedType ResourceReceivedType
7679

@@ -175,6 +178,7 @@ func NewTaskInfoWithBindRequest(pod *v1.Pod, bindRequest *bindrequest_info.BindR
175178
Job: getPodGroupID(pod),
176179
Name: pod.Name,
177180
Namespace: pod.Namespace,
181+
SubGroupName: pod.Labels[SubGroupLabelKey],
178182
NodeName: nodeName,
179183
Status: getTaskStatus(pod, bindRequest),
180184
IsVirtualStatus: false,

pkg/scheduler/api/podgroup_info/job_info.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type PodGroupInfo struct {
8686
LastStartTimestamp *time.Time
8787
PodGroup *enginev2alpha2.PodGroup
8888
PodGroupUID types.UID
89+
SubGroups map[string]*SubGroupInfo
8990

9091
StalenessInfo
9192

@@ -115,6 +116,8 @@ func NewPodGroupInfo(uid common_info.PodGroupID, tasks ...*pod_info.PodInfo) *Po
115116
Stale: false,
116117
},
117118

119+
SubGroups: map[string]*SubGroupInfo{},
120+
118121
LastStartTimestamp: nil,
119122
activeAllocatedCount: ptr.To(0),
120123
}
@@ -146,6 +149,11 @@ func (pgi *PodGroupInfo) SetPodGroup(pg *enginev2alpha2.PodGroup) {
146149
pgi.PodGroup = pg
147150
pgi.PodGroupUID = pg.UID
148151

152+
for _, sg := range pg.Spec.SubGroups {
153+
subGroupInfo := fromSubGroup(&sg)
154+
pgi.SubGroups[subGroupInfo.Name] = subGroupInfo
155+
}
156+
149157
if pg.Annotations[commonconstants.StalePodgroupTimeStamp] != "" {
150158
staleTimeStamp, err := time.Parse(time.RFC3339, pg.Annotations[commonconstants.StalePodgroupTimeStamp])
151159
if err != nil {
@@ -187,6 +195,10 @@ func (pgi *PodGroupInfo) addTaskIndex(ti *pod_info.PodInfo) {
187195

188196
func (pgi *PodGroupInfo) AddTaskInfo(ti *pod_info.PodInfo) {
189197
pgi.PodInfos[ti.UID] = ti
198+
subGroup, found := pgi.SubGroups[ti.SubGroupName]
199+
if found {
200+
subGroup.assignTask(ti)
201+
}
190202
pgi.addTaskIndex(ti)
191203

192204
if pod_status.AllocatedStatus(ti.Status) {
@@ -397,6 +409,7 @@ func (pgi *PodGroupInfo) CloneWithTasks(tasks []*pod_info.PodInfo) *PodGroupInfo
397409

398410
PodGroup: pgi.PodGroup,
399411
PodGroupUID: pgi.PodGroupUID,
412+
SubGroups: map[string]*SubGroupInfo{},
400413

401414
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{},
402415
PodInfos: pod_info.PodsMap{},
@@ -405,6 +418,10 @@ func (pgi *PodGroupInfo) CloneWithTasks(tasks []*pod_info.PodInfo) *PodGroupInfo
405418

406419
pgi.CreationTimestamp.DeepCopyInto(&info.CreationTimestamp)
407420

421+
for _, subGroup := range pgi.SubGroups {
422+
info.SubGroups[subGroup.Name] = newSubGroupInfo(subGroup.Name, subGroup.MinAvailable)
423+
}
424+
408425
for _, task := range tasks {
409426
info.AddTaskInfo(task.Clone())
410427
}

pkg/scheduler/api/podgroup_info/job_info_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func TestAddTaskInfo(t *testing.T) {
7474
expected: &PodGroupInfo{
7575
UID: case01_uid,
7676
Allocated: common_info.BuildResource("4000m", "4G"),
77+
SubGroups: map[string]*SubGroupInfo{},
7778
PodInfos: pod_info.PodsMap{
7879
case01_task1.UID: case01_task1,
7980
case01_task2.UID: case01_task2,
@@ -155,6 +156,7 @@ func TestDeleteTaskInfo(t *testing.T) {
155156
expected: &PodGroupInfo{
156157
UID: case01_uid,
157158
Allocated: common_info.BuildResource("3000m", "3G"),
159+
SubGroups: map[string]*SubGroupInfo{},
158160
PodInfos: pod_info.PodsMap{
159161
case01_task1.UID: case01_task1,
160162
case01_task2.UID: case01_task2,
@@ -177,6 +179,7 @@ func TestDeleteTaskInfo(t *testing.T) {
177179
expected: &PodGroupInfo{
178180
UID: case02_uid,
179181
Allocated: common_info.BuildResource("3000m", "3G"),
182+
SubGroups: map[string]*SubGroupInfo{},
180183
PodInfos: pod_info.PodsMap{
181184
case02_task1.UID: case02_task1,
182185
case02_task2.UID: case02_task2,
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package podgroup_info
5+
6+
import (
7+
"github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
8+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
9+
)
10+
11+
type SubGroupInfo struct {
12+
Name string
13+
MinAvailable int32
14+
PodInfos pod_info.PodsMap
15+
}
16+
17+
func newSubGroupInfo(name string, minAvailable int32) *SubGroupInfo {
18+
return &SubGroupInfo{
19+
Name: name,
20+
MinAvailable: minAvailable,
21+
PodInfos: pod_info.PodsMap{},
22+
}
23+
}
24+
25+
func fromSubGroup(subGroup *v2alpha2.SubGroup) *SubGroupInfo {
26+
return newSubGroupInfo(subGroup.Name, subGroup.MinMember)
27+
}
28+
29+
func (sgi *SubGroupInfo) assignTask(ti *pod_info.PodInfo) {
30+
sgi.PodInfos[ti.UID] = ti
31+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package podgroup_info
5+
6+
import (
7+
"testing"
8+
9+
"github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
10+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
11+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status"
12+
)
13+
14+
func TestNewSubGroupInfo(t *testing.T) {
15+
name := "my-subgroup"
16+
minAvailable := int32(4)
17+
sgi := newSubGroupInfo(name, minAvailable)
18+
19+
if sgi.Name != name {
20+
t.Errorf("Expected Name %s, got %s", name, sgi.Name)
21+
}
22+
if sgi.MinAvailable != minAvailable {
23+
t.Errorf("Expected MinAvailable %d, got %d", minAvailable, sgi.MinAvailable)
24+
}
25+
if len(sgi.PodInfos) != 0 {
26+
t.Errorf("Expected empty PodInfos, got %d items", len(sgi.PodInfos))
27+
}
28+
}
29+
30+
func TestFromSubGroup(t *testing.T) {
31+
subGroup := &v2alpha2.SubGroup{
32+
Name: "test-subgroup",
33+
MinMember: 3,
34+
}
35+
36+
sgi := fromSubGroup(subGroup)
37+
if sgi.Name != subGroup.Name {
38+
t.Errorf("Expected name %s, got %s", subGroup.Name, sgi.Name)
39+
}
40+
if sgi.MinAvailable != subGroup.MinMember {
41+
t.Errorf("Expected MinAvailable %d, got %d", subGroup.MinMember, sgi.MinAvailable)
42+
}
43+
if len(sgi.PodInfos) != 0 {
44+
t.Errorf("Expected empty PodInfos, got %d items", len(sgi.PodInfos))
45+
}
46+
}
47+
48+
func TestAddTaskInfoToSubGroup(t *testing.T) {
49+
sgi := newSubGroupInfo("test", 1)
50+
podInfo := &pod_info.PodInfo{
51+
UID: "pod-1",
52+
Status: pod_status.Pending,
53+
}
54+
55+
sgi.assignTask(podInfo)
56+
if len(sgi.PodInfos) != 1 {
57+
t.Errorf("Expected 1 pod info object, got %d", len(sgi.PodInfos))
58+
}
59+
if sgi.PodInfos[podInfo.UID] != podInfo {
60+
t.Error("Pod info not properly stored in map")
61+
}
62+
63+
podInfo2 := &pod_info.PodInfo{
64+
UID: "pod-2",
65+
Status: pod_status.Pending,
66+
}
67+
sgi.assignTask(podInfo2)
68+
if len(sgi.PodInfos) != 2 {
69+
t.Errorf("Expected 2 pod info objects, got %d", len(sgi.PodInfos))
70+
}
71+
72+
if sgi.PodInfos[podInfo2.UID] != podInfo2 {
73+
t.Error("Pod info 2 not properly stored in map")
74+
}
75+
}

0 commit comments

Comments
 (0)