Skip to content

Commit 52b9162

Browse files
authored
Topology Plugin - Domain Packing + Node Sorting (#558)
1 parent 4184dd5 commit 52b9162

File tree

17 files changed

+1756
-70
lines changed

17 files changed

+1756
-70
lines changed

pkg/scheduler/actions/allocate/allocateTopology_test.go

Lines changed: 763 additions & 5 deletions
Large diffs are not rendered by default.

pkg/scheduler/actions/common/allocate.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919

2020
func AllocateJob(ssn *framework.Session, stmt *framework.Statement, nodes []*node_info.NodeInfo,
2121
job *podgroup_info.PodGroupInfo, isPipelineOnly bool) bool {
22+
ssn.PreJobAllocation(job)
23+
2224
tasksToAllocate := podgroup_info.GetTasksToAllocate(job, ssn.PodSetOrderFn, ssn.TaskOrderFn, !isPipelineOnly)
2325

2426
result := ssn.IsJobOverQueueCapacityFn(job, tasksToAllocate)

pkg/scheduler/api/podgroup_info/subgroup_info/subgroup_info.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88
)
99

1010
type SubGroupInfo struct {
11+
parent *SubGroupSet
1112
name string
1213
topologyConstraint *topology_info.TopologyConstraintInfo
1314
}
1415

1516
func newSubGroupInfo(name string, topologyConstraint *topology_info.TopologyConstraintInfo) *SubGroupInfo {
1617
return &SubGroupInfo{
18+
parent: nil,
1719
name: name,
1820
topologyConstraint: topologyConstraint,
1921
}
@@ -26,3 +28,11 @@ func (sgi *SubGroupInfo) GetName() string {
2628
func (sgi *SubGroupInfo) GetTopologyConstraint() *topology_info.TopologyConstraintInfo {
2729
return sgi.topologyConstraint
2830
}
31+
32+
func (sgi *SubGroupInfo) SetParent(parent *SubGroupSet) {
33+
sgi.parent = parent
34+
}
35+
36+
func (sgi *SubGroupInfo) GetParent() *SubGroupSet {
37+
return sgi.parent
38+
}

pkg/scheduler/api/podgroup_info/subgroup_info/subgroupset.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func (sgs *SubGroupSet) AddSubGroup(subGroup *SubGroupSet) {
3030
"cannot have additional nested subgroup %s", sgs.GetName(), subGroup.GetName())
3131
return
3232
}
33+
subGroup.SetParent(sgs)
3334
sgs.groups = append(sgs.groups, subGroup)
3435
}
3536

@@ -39,6 +40,7 @@ func (sgs *SubGroupSet) AddPodSet(podSet *PodSet) {
3940
"it cannot references podset %s", sgs.GetName(), podSet.GetName())
4041
return
4142
}
43+
podSet.SetParent(sgs)
4244
sgs.podSets = append(sgs.podSets, podSet)
4345
}
4446

@@ -76,3 +78,11 @@ func (sgs *SubGroupSet) GetAllPodSets() map[string]*PodSet {
7678
}
7779
return result
7880
}
81+
82+
func (sgs *SubGroupSet) SetParent(parent *SubGroupSet) {
83+
sgs.parent = parent
84+
}
85+
86+
func (sgs *SubGroupSet) GetParent() *SubGroupSet {
87+
return sgs.parent
88+
}

pkg/scheduler/api/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ type OnJobSolutionStartFn func()
7373
// BindRequestMutateFn allows plugins to add annotations before BindRequest creation.
7474
type BindRequestMutateFn func(pod *pod_info.PodInfo, nodeName string) map[string]string
7575

76+
// PreJobAllocationFn is used for notifying on job allocation start
77+
type PreJobAllocationFn func(job *podgroup_info.PodGroupInfo)
78+
7679
type SchedulableResult struct {
7780
IsSchedulable bool
7881
Reason v2alpha2.UnschedulableReason

pkg/scheduler/framework/session.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ type Session struct {
8585
PrePredicateFns []api.PrePredicateFn
8686
PredicateFns []api.PredicateFn
8787
BindRequestMutateFns []api.BindRequestMutateFn
88+
PreJobAllocationFns []api.PreJobAllocationFn
8889

8990
Config *conf.SchedulerConfiguration
9091
plugins map[string]Plugin

pkg/scheduler/framework/session_plugins.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ func (ssn *Session) AddBindRequestMutateFn(fn api.BindRequestMutateFn) {
112112
ssn.BindRequestMutateFns = append(ssn.BindRequestMutateFns, fn)
113113
}
114114

115+
func (ssn *Session) AddPreJobAllocationFn(fn api.PreJobAllocationFn) {
116+
ssn.PreJobAllocationFns = append(ssn.PreJobAllocationFns, fn)
117+
}
118+
115119
func (ssn *Session) CanReclaimResources(reclaimer *podgroup_info.PodGroupInfo) bool {
116120
for _, canReclaimFn := range ssn.CanReclaimResourcesFns {
117121
return canReclaimFn(reclaimer)
@@ -444,3 +448,9 @@ func (ssn *Session) MutateBindRequestAnnotations(pod *pod_info.PodInfo, nodeName
444448
}
445449
return annotations
446450
}
451+
452+
func (ssn *Session) PreJobAllocation(job *podgroup_info.PodGroupInfo) {
453+
for _, preJobAllocationFn := range ssn.PreJobAllocationFns {
454+
preJobAllocationFn(job)
455+
}
456+
}

pkg/scheduler/plugins/topology/job_filtering.go

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
package topology
55

66
import (
7+
"cmp"
78
"fmt"
8-
"sort"
9+
"slices"
910

1011
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info"
1112
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
@@ -38,7 +39,7 @@ func (t *topologyPlugin) subSetNodesFn(
3839
return []node_info.NodeSet{nodeSet}, nil
3940
}
4041

41-
defer t.treeAllocatableCleanup(topologyTree)
42+
t.treeAllocatableCleanup(topologyTree)
4243
maxAllocatablePods, err := t.calcTreeAllocatable(tasks, topologyTree, nodeSet)
4344
if err != nil {
4445
return nil, err
@@ -52,12 +53,24 @@ func (t *topologyPlugin) subSetNodesFn(
5253
return []node_info.NodeSet{}, nil
5354
}
5455

56+
// Sorting the tree for both packing and closest preferred level domain scoring
57+
preferredLevel := DomainLevel(subGroup.GetTopologyConstraint().PreferredLevel)
58+
requiredLevel := DomainLevel(subGroup.GetTopologyConstraint().RequiredLevel)
59+
maxDepthLevel := preferredLevel
60+
if maxDepthLevel == "" {
61+
maxDepthLevel = requiredLevel
62+
}
63+
sortTree(topologyTree.DomainsByLevel[rootLevel][rootDomainId], maxDepthLevel)
64+
if preferredLevel != "" {
65+
t.subGroupNodeScores[subGroup.GetName()] = calculateNodeScores(topologyTree.DomainsByLevel[rootLevel][rootDomainId], preferredLevel)
66+
}
67+
5568
jobAllocatableDomains, err := t.getJobAllocatableDomains(job, subGroup, podSets, len(tasks), topologyTree)
5669
if err != nil {
5770
return nil, err
5871
}
5972

60-
jobAllocatableDomains = sortDomainInfos(jobAllocatableDomains)
73+
jobAllocatableDomains = sortDomainInfos(topologyTree, jobAllocatableDomains)
6174

6275
var domainNodeSets []node_info.NodeSet
6376
for _, jobAllocatableDomain := range jobAllocatableDomains {
@@ -156,7 +169,7 @@ func calcNodeAccommodation(jobAllocationMetaData *jobAllocationMetaData, node *n
156169
}
157170
// Add more to jobResourcesAllocationsRepresenters until the node cannot accommodate any more pods
158171
if allocatablePodsCount == len(jobAllocationMetaData.allocationTestPods) {
159-
for i := allocatablePodsCount; i < len(jobAllocationMetaData.tasksToAllocate); i++ {
172+
for i := allocatablePodsCount; ; i++ {
160173
latestTestPod := jobAllocationMetaData.allocationTestPods[len(jobAllocationMetaData.allocationTestPods)-1]
161174

162175
iAllocationsTestPod := &pod_info.PodInfo{
@@ -333,18 +346,45 @@ func (*topologyPlugin) treeAllocatableCleanup(topologyTree *Info) {
333346
}
334347
}
335348

336-
func sortDomainInfos(domainInfos []*DomainInfo) []*DomainInfo {
337-
sort.SliceStable(domainInfos, func(i, j int) bool {
338-
if domainInfos[i].Level != domainInfos[j].Level {
339-
return false
340-
}
349+
// sortTree recursively sorts the topology tree for bin-packing behavior.
350+
// Domains are sorted by AllocatablePods (ascending) to prioritize filling domains
351+
// with fewer available resources first, implementing a bin-packing strategy.
352+
// Within domains with equal AllocatablePods, sorts by ID for deterministic ordering.
353+
func sortTree(root *DomainInfo, maxDepthLevel DomainLevel) {
354+
if root == nil || maxDepthLevel == "" {
355+
return
356+
}
341357

342-
iDomainGPUs := domainInfos[i].GetNonAllocatedGPUsInDomain()
343-
jDomainGPUs := domainInfos[j].GetNonAllocatedGPUsInDomain()
344-
if iDomainGPUs != jDomainGPUs {
345-
return iDomainGPUs < jDomainGPUs
358+
slices.SortFunc(root.Children, func(i, j *DomainInfo) int {
359+
if c := cmp.Compare(i.AllocatablePods, j.AllocatablePods); c != 0 {
360+
return c
346361
}
347-
return domainInfos[i].ID < domainInfos[j].ID
362+
return cmp.Compare(i.ID, j.ID)
348363
})
349-
return domainInfos
364+
365+
if root.Level == maxDepthLevel {
366+
return
367+
}
368+
369+
for _, child := range root.Children {
370+
sortTree(child, maxDepthLevel)
371+
}
372+
}
373+
374+
// sortDomainInfos orders domains according to the sorted topology tree for consistent allocation.
375+
// Assumes the topology tree is already sorted
376+
func sortDomainInfos(topologyTree *Info, domainInfos []*DomainInfo) []*DomainInfo {
377+
root := topologyTree.DomainsByLevel[rootLevel][rootDomainId]
378+
reverseLevelOrderedDomains := reverseLevelOrder(root)
379+
380+
sortedDomainInfos := make([]*DomainInfo, 0, len(domainInfos))
381+
for _, domain := range reverseLevelOrderedDomains {
382+
for _, domainInfo := range domainInfos {
383+
if domain.ID == domainInfo.ID && domain.Level == domainInfo.Level {
384+
sortedDomainInfos = append(sortedDomainInfos, domainInfo)
385+
}
386+
}
387+
}
388+
389+
return sortedDomainInfos
350390
}

pkg/scheduler/plugins/topology/job_filtering_test.go

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ func TestTopologyPlugin_subsetNodesFn(t *testing.T) {
115115
}
116116

117117
// Set parent relationships
118-
tree.DomainsByLevel["zone"]["zone1"].Children = map[DomainID]*DomainInfo{
119-
"rack1.zone1": tree.DomainsByLevel["rack"]["rack1.zone1"],
120-
"rack2.zone1": tree.DomainsByLevel["rack"]["rack2.zone1"],
118+
tree.DomainsByLevel["zone"]["zone1"].Children = []*DomainInfo{
119+
tree.DomainsByLevel["rack"]["rack1.zone1"],
120+
tree.DomainsByLevel["rack"]["rack2.zone1"],
121121
}
122122

123123
return tree
@@ -353,8 +353,8 @@ func TestTopologyPlugin_subsetNodesFn(t *testing.T) {
353353
}
354354

355355
// Set parent relationships
356-
tree.DomainsByLevel["zone"]["zone1"].Children = map[DomainID]*DomainInfo{
357-
"rack1.zone1": tree.DomainsByLevel["rack"]["rack1.zone1"],
356+
tree.DomainsByLevel["zone"]["zone1"].Children = []*DomainInfo{
357+
tree.DomainsByLevel["rack"]["rack1.zone1"],
358358
}
359359

360360
return tree
@@ -399,6 +399,7 @@ func TestTopologyPlugin_subsetNodesFn(t *testing.T) {
399399
TopologyTrees: map[string]*Info{
400400
"test-topology": topologyTree,
401401
},
402+
subGroupNodeScores: map[subgroupName]map[string]float64{},
402403
}
403404

404405
// Call the function under test
@@ -419,7 +420,7 @@ func TestTopologyPlugin_subsetNodesFn(t *testing.T) {
419420
}
420421

421422
if tt.expectedJobFitError != "" {
422-
if job.JobFitErrors == nil || len(job.JobFitErrors) == 0 {
423+
if len(job.JobFitErrors) == 0 {
423424
t.Errorf("expected job fit error '%s', but got nil", tt.expectedJobFitError)
424425
}
425426
if job.JobFitErrors[0].Message != tt.expectedJobFitError {
@@ -859,9 +860,9 @@ func TestTopologyPlugin_calcTreeAllocatable(t *testing.T) {
859860
}
860861

861862
// Set parent relationships
862-
tree.DomainsByLevel["zone"]["zone1"].Children = map[DomainID]*DomainInfo{
863-
"rack1.zone1": tree.DomainsByLevel["rack"]["rack1.zone1"],
864-
"rack2.zone1": tree.DomainsByLevel["rack"]["rack2.zone1"],
863+
tree.DomainsByLevel["zone"]["zone1"].Children = []*DomainInfo{
864+
tree.DomainsByLevel["rack"]["rack1.zone1"],
865+
tree.DomainsByLevel["rack"]["rack2.zone1"],
865866
}
866867

867868
return tree
@@ -957,9 +958,9 @@ func TestTopologyPlugin_calcTreeAllocatable(t *testing.T) {
957958
}
958959

959960
// Set parent relationships
960-
tree.DomainsByLevel["zone"]["zone1"].Children = map[DomainID]*DomainInfo{
961-
"rack1.zone1": tree.DomainsByLevel["rack"]["rack1.zone1"],
962-
"rack2.zone1": tree.DomainsByLevel["rack"]["rack2.zone1"],
961+
tree.DomainsByLevel["zone"]["zone1"].Children = []*DomainInfo{
962+
tree.DomainsByLevel["rack"]["rack1.zone1"],
963+
tree.DomainsByLevel["rack"]["rack2.zone1"],
963964
}
964965

965966
return tree
@@ -1061,9 +1062,9 @@ func TestTopologyPlugin_calcTreeAllocatable(t *testing.T) {
10611062
}
10621063

10631064
// Set parent relationships
1064-
tree.DomainsByLevel["zone"]["zone1"].Children = map[DomainID]*DomainInfo{
1065-
"rack1.zone1": tree.DomainsByLevel["rack"]["rack1.zone1"],
1066-
"rack2.zone1": tree.DomainsByLevel["rack"]["rack2.zone1"],
1065+
tree.DomainsByLevel["zone"]["zone1"].Children = []*DomainInfo{
1066+
tree.DomainsByLevel["rack"]["rack1.zone1"],
1067+
tree.DomainsByLevel["rack"]["rack2.zone1"],
10671068
}
10681069

10691070
return tree
@@ -1226,9 +1227,9 @@ func TestTopologyPlugin_calcTreeAllocatable(t *testing.T) {
12261227
}
12271228

12281229
// Set parent relationships
1229-
tree.DomainsByLevel["zone"]["zone1"].Children = map[DomainID]*DomainInfo{
1230-
"rack1.zone1": tree.DomainsByLevel["rack"]["rack1.zone1"],
1231-
"rack2.zone1": tree.DomainsByLevel["rack"]["rack2.zone1"],
1230+
tree.DomainsByLevel["zone"]["zone1"].Children = []*DomainInfo{
1231+
tree.DomainsByLevel["rack"]["rack1.zone1"],
1232+
tree.DomainsByLevel["rack"]["rack2.zone1"],
12321233
}
12331234

12341235
return tree
@@ -1767,13 +1768,13 @@ func TestTopologyPlugin_getJobAllocatableDomains(t *testing.T) {
17671768
ID: "zone1.region1",
17681769
Level: "zone",
17691770
AllocatablePods: 6,
1770-
Children: map[DomainID]*DomainInfo{
1771-
"rack1.zone1.region1": {
1771+
Children: []*DomainInfo{
1772+
{
17721773
ID: "rack1.zone1.region1",
17731774
Level: "rack",
17741775
AllocatablePods: 3,
17751776
},
1776-
"rack2.zone1.region1": {
1777+
{
17771778
ID: "rack2.zone1.region1",
17781779
Level: "rack",
17791780
AllocatablePods: 3,
@@ -1784,28 +1785,28 @@ func TestTopologyPlugin_getJobAllocatableDomains(t *testing.T) {
17841785
ID: "zone2.region1",
17851786
Level: "zone",
17861787
AllocatablePods: 6,
1787-
Children: map[DomainID]*DomainInfo{
1788-
"rack1.zone2.region1": {
1788+
Children: []*DomainInfo{
1789+
{
17891790
ID: "rack1.zone2.region1",
17901791
Level: "rack",
17911792
AllocatablePods: 2,
17921793
},
1793-
"rack2.zone1.region1": {
1794+
{
17941795
ID: "rack2.zone1.region1",
17951796
Level: "rack",
17961797
AllocatablePods: 1,
17971798
},
1798-
"rack3.zone2.region1": {
1799+
{
17991800
ID: "rack3.zone2.region1",
18001801
Level: "rack",
18011802
AllocatablePods: 1,
18021803
},
1803-
"rack4.zone2.region1": {
1804+
{
18041805
ID: "rack4.zone2.region1",
18051806
Level: "rack",
18061807
AllocatablePods: 1,
18071808
},
1808-
"rack5.zone2.region1": {
1809+
{
18091810
ID: "rack5.zone2.region1",
18101811
Level: "rack",
18111812
AllocatablePods: 1,

0 commit comments

Comments
 (0)