Skip to content

Commit 781fe28

Browse files
[WIP] Topology aware subGroupSet (#556)
* Added SubGroupSet data structure * Clone PodSet * RootSubGroupName * Lower text * Moved global topology constraint from PodGroupInfo to SubGroupSet * SubSet plugin signature gets subGroupSet * Implement subGroupSet topology * Add TopologyConstraint on every SubGroup * Snapshot topology constraint * Changed TopologyConstraint description to reflect proper group type it applies to * Added more tests to cover TopologyConstraints at different subgroup levels * Small enhancement in job_filtering internal implementation * Changed jobHasTopologyRequiredConstraint function name to hasTopologyRequiredConstraint --------- Co-authored-by: Roman Baron <[email protected]>
1 parent 98b1080 commit 781fe28

File tree

18 files changed

+1161
-285
lines changed

18 files changed

+1161
-285
lines changed

deployments/kai-scheduler/crds/scheduling.run.ai_podgroups.yaml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,29 @@ spec:
116116
description: Parent is an optional attribute that specifies
117117
the name of the parent SubGroup
118118
type: string
119+
topologyConstraint:
120+
description: TopologyConstraint defines the topology constraints
121+
for this SubGroup
122+
properties:
123+
preferredTopologyLevel:
124+
description: |-
125+
PreferredTopologyLevel defines the preferred level in the topology hierarchy
126+
that this constraint applies to (e.g., "rack", "zone", "datacenter").
127+
Jobs will be scheduled to maintain locality at this level when possible.
128+
type: string
129+
requiredTopologyLevel:
130+
description: |-
131+
RequiredTopologyLevel defines the maximal level in the topology hierarchy
132+
that all pods must be scheduled within.
133+
If set, all pods of the job must be scheduled within a single domain at this level.
134+
type: string
135+
topology:
136+
description: |-
137+
Topology specifies the name of the topology CRD that defines the
138+
physical layout to use for this constraint. This allows for supporting
139+
multiple different topology configurations in the same cluster.
140+
type: string
141+
type: object
119142
required:
120143
- name
121144
type: object

pkg/apis/scheduling/v2alpha2/podgroup_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ type SubGroup struct {
120120
// Parent is an optional attribute that specifies the name of the parent SubGroup
121121
// +kubebuilder:validation:Optional
122122
Parent *string `json:"parent,omitempty"`
123+
124+
// TopologyConstraint defines the topology constraints for this SubGroup
125+
TopologyConstraint *TopologyConstraint `json:"topologyConstraint,omitempty"`
123126
}
124127

125128
// PodGroupStatus defines the observed state of PodGroup

pkg/apis/scheduling/v2alpha2/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/scheduler/actions/common/allocate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func AllocateJob(ssn *framework.Session, stmt *framework.Statement, nodes []*nod
2727
return false
2828
}
2929

30-
nodeSets, err := ssn.SubsetNodesFn(job, tasksToAllocate, nodes)
30+
nodeSets, err := ssn.SubsetNodesFn(job, job.RootSubGroupSet, tasksToAllocate, nodes)
3131
if err != nil {
3232
log.InfraLogger.Errorf(
3333
"Failed to run SubsetNodes on job <%s/%s>: %v", job.Namespace, job.Namespace, err)

pkg/scheduler/api/podgroup_info/job_info.go

Lines changed: 29 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status"
3939
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info/subgroup_info"
4040
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/resource_info"
41-
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/topology_info"
4241
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log"
4342
)
4443

@@ -85,8 +84,8 @@ type PodGroupInfo struct {
8584
PodGroup *enginev2alpha2.PodGroup
8685
PodGroupUID types.UID
8786

88-
TopologyConstraint *topology_info.TopologyConstraintInfo
89-
PodSets map[string]*subgroup_info.PodSet
87+
RootSubGroupSet *subgroup_info.SubGroupSet
88+
PodSets map[string]*subgroup_info.PodSet
9089

9190
StalenessInfo
9291

@@ -100,6 +99,9 @@ type PodGroupInfo struct {
10099
}
101100

102101
func NewPodGroupInfo(uid common_info.PodGroupID, tasks ...*pod_info.PodInfo) *PodGroupInfo {
102+
defaultSubGroupSet := subgroup_info.NewSubGroupSet(subgroup_info.RootSubGroupSetName, nil)
103+
defaultSubGroupSet.AddPodSet(subgroup_info.NewPodSet(DefaultSubGroup, 1, nil))
104+
103105
podGroupInfo := &PodGroupInfo{
104106
UID: uid,
105107
Allocated: resource_info.EmptyResource(),
@@ -113,10 +115,8 @@ func NewPodGroupInfo(uid common_info.PodGroupID, tasks ...*pod_info.PodInfo) *Po
113115
TimeStamp: nil,
114116
Stale: false,
115117
},
116-
117-
PodSets: map[string]*subgroup_info.PodSet{
118-
DefaultSubGroup: subgroup_info.NewPodSet(DefaultSubGroup, 1, nil),
119-
},
118+
RootSubGroupSet: defaultSubGroupSet,
119+
PodSets: defaultSubGroupSet.GetPodSets(),
120120

121121
LastStartTimestamp: nil,
122122
activeAllocatedCount: ptr.To(0),
@@ -155,14 +155,10 @@ func (pgi *PodGroupInfo) SetPodGroup(pg *enginev2alpha2.PodGroup) {
155155
pgi.CreationTimestamp = pg.GetCreationTimestamp()
156156
pgi.PodGroup = pg
157157
pgi.PodGroupUID = pg.UID
158-
pgi.setSubGroups(pg)
159-
160-
if pg.Spec.TopologyConstraint.Topology != "" {
161-
pgi.TopologyConstraint = &topology_info.TopologyConstraintInfo{
162-
Topology: pg.Spec.TopologyConstraint.Topology,
163-
RequiredLevel: pg.Spec.TopologyConstraint.RequiredTopologyLevel,
164-
PreferredLevel: pg.Spec.TopologyConstraint.PreferredTopologyLevel,
165-
}
158+
err := pgi.setSubGroups(pg)
159+
if err != nil {
160+
log.InfraLogger.V(7).Warnf("Failed to set subgroups for podgroup <%s> err: %v",
161+
pg.Namespace, pg.Name)
166162
}
167163

168164
if pg.Annotations[commonconstants.StalePodgroupTimeStamp] != "" {
@@ -191,24 +187,27 @@ func (pgi *PodGroupInfo) SetPodGroup(pg *enginev2alpha2.PodGroup) {
191187
pgi.Name, pgi.PodGroupUID)
192188
}
193189

194-
func (pgi *PodGroupInfo) setSubGroups(podGroup *enginev2alpha2.PodGroup) {
195-
if len(podGroup.Spec.SubGroups) > 0 {
196-
pgi.PodSets = map[string]*subgroup_info.PodSet{}
197-
for _, sg := range podGroup.Spec.SubGroups {
198-
subGroupInfo := subgroup_info.FromSubGroup(&sg)
199-
pgi.PodSets[subGroupInfo.GetName()] = subGroupInfo
190+
func (pgi *PodGroupInfo) setSubGroups(podGroup *enginev2alpha2.PodGroup) error {
191+
if len(podGroup.Spec.SubGroups) == 0 {
192+
if defaultSubGroup, found := pgi.PodSets[DefaultSubGroup]; found {
193+
defaultSubGroup.SetMinAvailable(max(podGroup.Spec.MinMember, 1))
200194
}
201-
return
195+
return nil
202196
}
203-
if pgi.PodSets == nil {
204-
pgi.PodSets = map[string]*subgroup_info.PodSet{}
197+
rootSubGroupSet, err := subgroup_info.FromPodGroup(podGroup)
198+
if err != nil {
199+
return err
205200
}
206-
defaultSubGroup, found := pgi.PodSets[DefaultSubGroup]
207-
if !found {
208-
pgi.PodSets[DefaultSubGroup] = subgroup_info.NewPodSet(DefaultSubGroup, max(podGroup.Spec.MinMember, 1), nil)
201+
pgi.RootSubGroupSet = rootSubGroupSet
202+
podSets := rootSubGroupSet.GetPodSets()
203+
if len(podSets) > 0 {
204+
pgi.PodSets = podSets
209205
} else {
210-
defaultSubGroup.SetMinAvailable(max(podGroup.Spec.MinMember, 1))
206+
for _, podSet := range pgi.PodSets {
207+
rootSubGroupSet.AddPodSet(podSet)
208+
}
211209
}
210+
return nil
212211
}
213212

214213
func (pgi *PodGroupInfo) addTaskIndex(ti *pod_info.PodInfo) {
@@ -474,29 +473,14 @@ func (pgi *PodGroupInfo) CloneWithTasks(tasks []*pod_info.PodInfo) *PodGroupInfo
474473
PodGroup: pgi.PodGroup,
475474
PodGroupUID: pgi.PodGroupUID,
476475

477-
PodSets: map[string]*subgroup_info.PodSet{},
478-
TopologyConstraint: func() *topology_info.TopologyConstraintInfo {
479-
if pgi.TopologyConstraint == nil {
480-
return nil
481-
}
482-
return &topology_info.TopologyConstraintInfo{
483-
Topology: pgi.TopologyConstraint.Topology,
484-
RequiredLevel: pgi.TopologyConstraint.RequiredLevel,
485-
PreferredLevel: pgi.TopologyConstraint.PreferredLevel,
486-
}
487-
}(),
488-
489476
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{},
490477
activeAllocatedCount: ptr.To(0),
491478
}
492479

493480
pgi.CreationTimestamp.DeepCopyInto(&info.CreationTimestamp)
494481

495-
for _, podSet := range pgi.PodSets {
496-
info.PodSets[podSet.GetName()] = subgroup_info.NewPodSet(
497-
podSet.GetName(), podSet.GetMinAvailable(), nil,
498-
)
499-
}
482+
info.RootSubGroupSet = pgi.RootSubGroupSet.Clone()
483+
info.PodSets = info.RootSubGroupSet.GetPodSets()
500484

501485
for _, task := range tasks {
502486
info.AddTaskInfo(task.Clone())

pkg/scheduler/api/podgroup_info/job_info_test.go

Lines changed: 56 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ func TestAddTaskInfo(t *testing.T) {
6262
case01_pod4 := common_info.BuildPod(case01_ns, "p4", "n1", v1.PodPending, common_info.BuildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string), podAnnotations)
6363
case01_task4 := pod_info.NewTaskInfo(case01_pod4)
6464

65+
subGroupSet := subgroup_info.NewSubGroupSet(subgroup_info.RootSubGroupSetName, nil)
66+
defaultSubGroup := subgroup_info.NewPodSet(DefaultSubGroup, 1, nil).WithPodInfos(pod_info.PodsMap{
67+
case01_task1.UID: case01_task1,
68+
case01_task2.UID: case01_task2,
69+
case01_task3.UID: case01_task3,
70+
case01_task4.UID: case01_task4,
71+
})
72+
subGroupSet.AddPodSet(defaultSubGroup)
73+
6574
tests := []struct {
6675
name string
6776
uid common_info.PodGroupID
@@ -73,15 +82,10 @@ func TestAddTaskInfo(t *testing.T) {
7382
uid: case01_uid,
7483
pods: []*v1.Pod{case01_pod1, case01_pod2, case01_pod3, case01_pod4},
7584
expected: &PodGroupInfo{
76-
UID: case01_uid,
77-
Allocated: common_info.BuildResource("4000m", "4G"),
78-
PodSets: map[string]*subgroup_info.PodSet{DefaultSubGroup: subgroup_info.NewPodSet(DefaultSubGroup, 1, nil).
79-
WithPodInfos(pod_info.PodsMap{
80-
case01_task1.UID: case01_task1,
81-
case01_task2.UID: case01_task2,
82-
case01_task3.UID: case01_task3,
83-
case01_task4.UID: case01_task4,
84-
})},
85+
UID: case01_uid,
86+
Allocated: common_info.BuildResource("4000m", "4G"),
87+
RootSubGroupSet: subGroupSet,
88+
PodSets: map[string]*subgroup_info.PodSet{DefaultSubGroup: defaultSubGroup},
8589
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
8690
pod_status.Running: {
8791
case01_task2.UID: case01_task2,
@@ -154,50 +158,64 @@ func TestDeleteTaskInfo(t *testing.T) {
154158
uid: case01_uid,
155159
pods: []*v1.Pod{case01_pod1, case01_pod2, case01_pod3},
156160
rmPods: []*v1.Pod{case01_pod2},
157-
expected: &PodGroupInfo{
158-
UID: case01_uid,
159-
Allocated: common_info.BuildResource("3000m", "3G"),
160-
PodSets: map[string]*subgroup_info.PodSet{DefaultSubGroup: subgroup_info.NewPodSet(DefaultSubGroup, 1, nil).
161+
expected: func() *PodGroupInfo {
162+
subGroupSet := subgroup_info.NewSubGroupSet(subgroup_info.RootSubGroupSetName, nil)
163+
defaultSubGroup := subgroup_info.NewPodSet(DefaultSubGroup, 1, nil).
161164
WithPodInfos(pod_info.PodsMap{
162165
case01_task1.UID: case01_task1,
163166
case01_task2.UID: case01_task2,
164167
case01_task3.UID: case01_task3,
165-
})},
166-
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
167-
pod_status.Pending: {case01_task1.UID: case01_task1},
168-
pod_status.Running: {case01_task3.UID: case01_task3},
169-
},
170-
activeAllocatedCount: ptr.To(1),
171-
JobFitErrors: make(v2alpha2.UnschedulableExplanations, 0),
172-
NodesFitErrors: map[common_info.PodID]*common_info.FitErrors{},
173-
},
168+
})
169+
subGroupSet.AddPodSet(defaultSubGroup)
170+
171+
return &PodGroupInfo{
172+
UID: case01_uid,
173+
Allocated: common_info.BuildResource("3000m", "3G"),
174+
RootSubGroupSet: subGroupSet,
175+
PodSets: map[string]*subgroup_info.PodSet{DefaultSubGroup: defaultSubGroup},
176+
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
177+
pod_status.Pending: {case01_task1.UID: case01_task1},
178+
pod_status.Running: {case01_task3.UID: case01_task3},
179+
},
180+
activeAllocatedCount: ptr.To(1),
181+
JobFitErrors: make(v2alpha2.UnschedulableExplanations, 0),
182+
NodesFitErrors: map[common_info.PodID]*common_info.FitErrors{},
183+
}
184+
}(),
174185
},
175186
{
176187
name: "add 2 pending owner pod, 1 running owner pod, remove 1 pending owner pod",
177188
uid: case02_uid,
178189
pods: []*v1.Pod{case02_pod1, case02_pod2, case02_pod3},
179190
rmPods: []*v1.Pod{case02_pod2},
180-
expected: &PodGroupInfo{
181-
UID: case02_uid,
182-
Allocated: common_info.BuildResource("3000m", "3G"),
183-
PodSets: map[string]*subgroup_info.PodSet{DefaultSubGroup: subgroup_info.NewPodSet(DefaultSubGroup, 1, nil).
191+
expected: func() *PodGroupInfo {
192+
subGroupSet := subgroup_info.NewSubGroupSet(subgroup_info.RootSubGroupSetName, nil)
193+
defaultSubGroup := subgroup_info.NewPodSet(DefaultSubGroup, 1, nil).
184194
WithPodInfos(pod_info.PodsMap{
185195
case02_task1.UID: case02_task1,
186196
case02_task2.UID: case02_task2,
187197
case02_task3.UID: case02_task3,
188-
})},
189-
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
190-
pod_status.Pending: {
191-
case02_task1.UID: case02_task1,
192-
},
193-
pod_status.Running: {
194-
case02_task3.UID: case02_task3,
198+
})
199+
subGroupSet.AddPodSet(defaultSubGroup)
200+
201+
return &PodGroupInfo{
202+
UID: case02_uid,
203+
Allocated: common_info.BuildResource("3000m", "3G"),
204+
RootSubGroupSet: subGroupSet,
205+
PodSets: map[string]*subgroup_info.PodSet{DefaultSubGroup: defaultSubGroup},
206+
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
207+
pod_status.Pending: {
208+
case02_task1.UID: case02_task1,
209+
},
210+
pod_status.Running: {
211+
case02_task3.UID: case02_task3,
212+
},
195213
},
196-
},
197-
activeAllocatedCount: ptr.To(1),
198-
JobFitErrors: make(v2alpha2.UnschedulableExplanations, 0),
199-
NodesFitErrors: map[common_info.PodID]*common_info.FitErrors{},
200-
},
214+
activeAllocatedCount: ptr.To(1),
215+
JobFitErrors: make(v2alpha2.UnschedulableExplanations, 0),
216+
NodesFitErrors: map[common_info.PodID]*common_info.FitErrors{},
217+
}
218+
}(),
201219
},
202220
}
203221

0 commit comments

Comments
 (0)