Skip to content

Commit 626bc15

Browse files
authored
Better topology allocation for non homogeneous jobs (#604)
If the topology plugin detects a non homogeneous job (different pods require different resource types) the topology plugin will check if a domain can allocate the job based on the job requested resources to domain free resources ration, instead of checking the amount of representor pods each node in the domain. Small additional improvments: 1- Calculate AllocatablePods only for the domains relevant to the nodes subset 2- Cache ratio results of topology tree sorting by job requested resources to free domain resources Known issue: Does not count fractional gpus as part of the free resources
1 parent 7b92996 commit 626bc15

File tree

4 files changed

+208
-66
lines changed

4 files changed

+208
-66
lines changed

pkg/scheduler/api/resource_info/resource_info.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ func (r *Resource) DetailedString() string {
134134
}
135135

136136
func (r *Resource) AddResourceRequirements(req *ResourceRequirements) {
137+
if req == nil {
138+
return
139+
}
137140
r.BaseResource.Add(&req.BaseResource)
138141
r.gpus += req.GPUs()
139142
for migProfile, migCount := range req.MigResources() {

pkg/scheduler/plugins/topology/job_filtering.go

Lines changed: 102 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ import (
1515
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info"
1616
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info/subgroup_info"
1717
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/resource_info"
18+
v1 "k8s.io/api/core/v1"
1819
)
1920

2021
const (
2122
requiredResourceNotInDomainRatio = 1000.0
23+
maxAllocatableTasksRatio = 1.0
2224
)
2325

2426
type jobAllocationMetaData struct {
@@ -44,38 +46,42 @@ func (t *topologyPlugin) subSetNodesFn(
4446
return []node_info.NodeSet{nodeSet}, nil
4547
}
4648

49+
nodeSetDomain, ok := t.nodeSetToDomain[topologyTree.Name][getNodeSetID(nodeSet)]
50+
if !ok {
51+
return nil, fmt.Errorf("domain not found for node set in topology %s", topologyTree.Name)
52+
}
53+
4754
t.treeAllocatableCleanup(topologyTree)
48-
maxAllocatablePods, err := t.calcTreeAllocatable(tasks, topologyTree, nodeSet)
49-
if err != nil {
50-
return nil, err
55+
calcSubTreeFreeResources(nodeSetDomain)
56+
if useRepresentorPodsAccounting(tasks) {
57+
if err := t.calcTreeAllocatable(tasks, nodeSetDomain); err != nil {
58+
return nil, err
59+
}
5160
}
5261

53-
if maxAllocatablePods < len(tasks) {
62+
tasksResources, tasksCount := getTasksAllocationMetadata(tasks)
63+
64+
if !isJobAllocatableOnDomain(tasksResources, tasksCount, nodeSetDomain) {
5465
job.SetJobFitError(
5566
podgroup_info.PodSchedulingErrors,
5667
fmt.Sprintf("No relevant domains found for workload in topology tree: %s", topologyTree.Name),
5768
nil)
5869
return []node_info.NodeSet{}, nil
5970
}
6071

61-
domain, ok := t.nodeSetToDomain[topologyTree.Name][getNodeSetID(nodeSet)]
62-
if !ok {
63-
return nil, fmt.Errorf("domain not found for node set in topology %s", topologyTree.Name)
64-
}
65-
6672
// Sorting the tree for both packing and closest preferred level domain scoring
6773
preferredLevel := DomainLevel(subGroup.GetTopologyConstraint().PreferredLevel)
6874
requiredLevel := DomainLevel(subGroup.GetTopologyConstraint().RequiredLevel)
6975
maxDepthLevel := preferredLevel
7076
if maxDepthLevel == "" {
7177
maxDepthLevel = requiredLevel
7278
}
73-
sortTreeFromRoot(tasks, domain, maxDepthLevel)
79+
sortTreeFromRoot(tasks, nodeSetDomain, maxDepthLevel)
7480
if preferredLevel != "" {
75-
t.subGroupNodeScores[subGroup.GetName()] = calculateNodeScores(domain, preferredLevel)
81+
t.subGroupNodeScores[subGroup.GetName()] = calculateNodeScores(nodeSetDomain, preferredLevel)
7682
}
7783

78-
jobAllocatableDomains, err := t.getJobAllocatableDomains(job, subGroup, podSets, len(tasks), topologyTree)
84+
jobAllocatableDomains, err := t.getJobAllocatableDomains(job, subGroup, podSets, tasksResources, tasksCount, topologyTree)
7985
if err != nil {
8086
return nil, err
8187
}
@@ -94,6 +100,15 @@ func (t *topologyPlugin) subSetNodesFn(
94100
return domainNodeSets, nil
95101
}
96102

103+
func getTasksAllocationMetadata(tasks []*pod_info.PodInfo) (*resource_info.Resource, int) {
104+
tasksResources := resource_info.NewResource(0, 0, 0)
105+
for _, task := range tasks {
106+
tasksResources.AddResourceRequirements(task.ResReq)
107+
}
108+
tasksCount := len(tasks)
109+
return tasksResources, tasksCount
110+
}
111+
97112
func (t *topologyPlugin) getJobTopology(subGroup *subgroup_info.SubGroupInfo) (*Info, bool) {
98113
if subGroup.GetTopologyConstraint() == nil {
99114
return nil, true
@@ -109,20 +124,17 @@ func (t *topologyPlugin) getJobTopology(subGroup *subgroup_info.SubGroupInfo) (*
109124
return topologyTree, true
110125
}
111126

112-
func (t *topologyPlugin) calcTreeAllocatable(tasks []*pod_info.PodInfo, topologyTree *Info, nodeSet node_info.NodeSet) (int, error) {
113-
jobAllocationData, err := initJobAllocationMetadataStruct(tasks)
127+
func (t *topologyPlugin) calcTreeAllocatable(tasks []*pod_info.PodInfo, domain *DomainInfo) error {
128+
jobAllocationData, err := initTasksRepresentorMetadataStruct(tasks)
114129
if err != nil {
115-
return 0, err
130+
return err
116131
}
117132

118-
nodes := map[string]bool{}
119-
for _, node := range nodeSet {
120-
nodes[node.Name] = true
121-
}
122-
return t.calcSubTreeAllocatable(jobAllocationData, topologyTree.DomainsByLevel[rootLevel][rootDomainId], nodes)
133+
_, err = t.calcSubTreeAllocatable(jobAllocationData, domain)
134+
return err
123135
}
124136

125-
func initJobAllocationMetadataStruct(tasksToAllocate []*pod_info.PodInfo) (*jobAllocationMetaData, error) {
137+
func initTasksRepresentorMetadataStruct(tasksToAllocate []*pod_info.PodInfo) (*jobAllocationMetaData, error) {
126138
maxPodResources := resource_info.NewResourceRequirements(0, 0, 0)
127139
for _, podInfo := range tasksToAllocate {
128140
err := maxPodResources.SetMaxResource(podInfo.ResReq)
@@ -142,24 +154,22 @@ func initJobAllocationMetadataStruct(tasksToAllocate []*pod_info.PodInfo) (*jobA
142154
}
143155

144156
func (t *topologyPlugin) calcSubTreeAllocatable(
145-
jobAllocationData *jobAllocationMetaData, domain *DomainInfo, nodes map[string]bool,
157+
jobAllocationData *jobAllocationMetaData, domain *DomainInfo,
146158
) (int, error) {
147159
if domain == nil {
148160
return 0, nil
149161
}
162+
domain.AllocatablePods = 0 // reset the allocatable pods count for the domain
150163

151164
if len(domain.Children) == 0 {
152165
for _, node := range domain.Nodes {
153-
if _, inSubset := nodes[node.Name]; !inSubset {
154-
continue
155-
}
156166
domain.AllocatablePods += calcNodeAccommodation(jobAllocationData, node)
157167
}
158168
return domain.AllocatablePods, nil
159169
}
160170

161171
for _, child := range domain.Children {
162-
childAllocatable, err := t.calcSubTreeAllocatable(jobAllocationData, child, nodes)
172+
childAllocatable, err := t.calcSubTreeAllocatable(jobAllocationData, child)
163173
if err != nil {
164174
return 0, err
165175
}
@@ -168,6 +178,27 @@ func (t *topologyPlugin) calcSubTreeAllocatable(
168178
return domain.AllocatablePods, nil
169179
}
170180

181+
func calcSubTreeFreeResources(domain *DomainInfo) *resource_info.Resource {
182+
if domain == nil {
183+
return nil
184+
}
185+
186+
if len(domain.Children) == 0 {
187+
for _, node := range domain.Nodes {
188+
domain.IdleOrReleasingResources.Add(node.Idle)
189+
domain.IdleOrReleasingResources.Add(node.Releasing)
190+
// Ignore fractions of GPUs for now
191+
}
192+
return domain.IdleOrReleasingResources
193+
}
194+
195+
for _, child := range domain.Children {
196+
subdomainFreeResources := calcSubTreeFreeResources(child)
197+
domain.IdleOrReleasingResources.Add(subdomainFreeResources)
198+
}
199+
return domain.IdleOrReleasingResources
200+
}
201+
171202
func calcNodeAccommodation(jobAllocationMetaData *jobAllocationMetaData, node *node_info.NodeInfo) int {
172203
if jobAllocationMetaData.maxPodResources.LessEqual(resource_info.EmptyResourceRequirements()) {
173204
return len(jobAllocationMetaData.tasksToAllocate)
@@ -220,7 +251,7 @@ func calcNextAllocationTestPodResources(previousTestResources, maxPodResources *
220251

221252
func (t *topologyPlugin) getJobAllocatableDomains(
222253
job *podgroup_info.PodGroupInfo, subGroup *subgroup_info.SubGroupInfo, podSets map[string]*subgroup_info.PodSet,
223-
taskToAllocateCount int, topologyTree *Info,
254+
tasksResources *resource_info.Resource, tasksCount int, topologyTree *Info,
224255
) ([]*DomainInfo, error) {
225256
relevantLevels, err := t.calculateRelevantDomainLevels(subGroup, topologyTree)
226257
if err != nil {
@@ -239,7 +270,7 @@ func (t *topologyPlugin) getJobAllocatableDomains(
239270
var domains []*DomainInfo
240271
for _, level := range relevantLevels {
241272
for _, domain := range relevantDomainsByLevel[level] {
242-
if domain.AllocatablePods < taskToAllocateCount { // Filter domains that cannot allocate the job
273+
if !isJobAllocatableOnDomain(tasksResources, tasksCount, domain) { // Filter domains that cannot allocate the job
243274
continue
244275
}
245276

@@ -305,6 +336,14 @@ func hasTopologyRequiredConstraint(subGroup *subgroup_info.SubGroupInfo) bool {
305336
return subGroup.GetTopologyConstraint().RequiredLevel != ""
306337
}
307338

339+
func isJobAllocatableOnDomain(tasksResources *resource_info.Resource, tasksCount int, domain *DomainInfo) bool {
340+
if domain.AllocatablePods != allocatablePodsNotSet {
341+
return domain.AllocatablePods >= tasksCount
342+
}
343+
344+
return getJobRatioToFreeResources(tasksResources, domain) <= maxAllocatableTasksRatio
345+
}
346+
308347
func (*topologyPlugin) calculateRelevantDomainLevels(
309348
subGroup *subgroup_info.SubGroupInfo, topologyTree *Info,
310349
) ([]DomainLevel, error) {
@@ -355,7 +394,8 @@ func (*topologyPlugin) calculateRelevantDomainLevels(
355394
func (*topologyPlugin) treeAllocatableCleanup(topologyTree *Info) {
356395
for _, levelDomains := range topologyTree.DomainsByLevel {
357396
for _, domain := range levelDomains {
358-
domain.AllocatablePods = 0
397+
domain.AllocatablePods = allocatablePodsNotSet
398+
domain.IdleOrReleasingResources = resource_info.EmptyResource()
359399
}
360400
}
361401
}
@@ -378,9 +418,14 @@ func sortTree(tasksResources *resource_info.Resource, root *DomainInfo, maxDepth
378418
return
379419
}
380420

421+
domainRatiosCache := make(map[DomainID]float64, len(root.Children))
422+
for _, child := range root.Children {
423+
domainRatiosCache[child.ID] = getJobRatioToFreeResources(tasksResources, child)
424+
}
425+
381426
slices.SortFunc(root.Children, func(i, j *DomainInfo) int {
382-
iRatio := getJobRatioToFreeResources(tasksResources, i)
383-
jRatio := getJobRatioToFreeResources(tasksResources, j)
427+
iRatio := domainRatiosCache[i.ID]
428+
jRatio := domainRatiosCache[j.ID]
384429
if c := cmp.Compare(jRatio, iRatio); c != 0 {
385430
return c
386431
}
@@ -447,3 +492,29 @@ func sortDomainInfos(topologyTree *Info, domainInfos []*DomainInfo) []*DomainInf
447492

448493
return sortedDomainInfos
449494
}
495+
496+
// useRepresentorPodsAccounting checks if the tasks are using representor pods accounting.
497+
// If all the tasks are homogeneous, i.e. all the tasks have the same type of resource requirements, then use representor pods accounting (AllocatablePods).
498+
// If the tasks are heterogeneous, i.e. some of the tasks require resources that other tasks do not require,
499+
// then use the job resources sum to see if a domain can allocate the job.
500+
func useRepresentorPodsAccounting(tasks []*pod_info.PodInfo) bool {
501+
extendedResources := map[v1.ResourceName]int{}
502+
podsUsingGpu := 0
503+
for _, task := range tasks {
504+
for resourceName := range task.ResReq.BaseResource.ScalarResources() {
505+
extendedResources[resourceName] += 1
506+
}
507+
if task.ResReq.GPUs() > 0 {
508+
podsUsingGpu += 1
509+
}
510+
}
511+
if podsUsingGpu != len(tasks) && podsUsingGpu != 0 {
512+
return false
513+
}
514+
for _, count := range extendedResources {
515+
if count != len(tasks) {
516+
return false
517+
}
518+
}
519+
return true
520+
}

0 commit comments

Comments
 (0)