Skip to content

Commit 94ffd3b

Browse files
authored
Given topology domains AllocatablePods, calculate the domains to be used for job allocation - topology job filter PRs part 3 (#354)
Given topology domains AllocatablePods, calculate the domains to be used for job allocation
1 parent d1645f5 commit 94ffd3b

File tree

2 files changed

+1112
-10
lines changed

2 files changed

+1112
-10
lines changed

pkg/scheduler/plugins/topology/topology_plugin_job_filtering.go

Lines changed: 136 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ package topology
55

66
import (
77
"fmt"
8+
"slices"
9+
"sort"
810

911
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info"
1012
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
1113
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info"
1214
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/resource_info"
15+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log"
1316
)
1417

1518
type jobAllocationMetaData struct {
@@ -27,17 +30,21 @@ func (t *topologyPlugin) prePredicateFn(_ *pod_info.PodInfo, job *podgroup_info.
2730
return nil
2831
}
2932

30-
// Calc tree job allocation data
31-
_, err = t.calcTreeAllocatable(job, topologyTree)
33+
defer t.treeAllocateableCleanup(topologyTree)
34+
maxAllocatablePods, err := t.calcTreeAllocatable(job, topologyTree)
3235
if err != nil {
3336
return err
3437
}
3538

36-
// Clean allocation data from the tree
37-
for _, levelDomains := range topologyTree.DomainsByLevel {
38-
for _, domain := range levelDomains {
39-
domain.AllocatablePods = 0
40-
}
39+
if maxAllocatablePods < len(podgroup_info.GetTasksToAllocate(job, t.subGroupOrderFunc, t.taskOrderFunc, true)) {
40+
log.InfraLogger.V(6).Infof("no relevant domains found for job %s, workload topology name: %s",
41+
job.PodGroup.Name, topologyTree.Name)
42+
return nil
43+
}
44+
45+
_, err = t.getBestjobAllocateableDomains(job, topologyTree)
46+
if err != nil {
47+
return err
4148
}
4249

4350
return nil
@@ -92,7 +99,7 @@ func (t *topologyPlugin) calcSubTreeAllocatable(jobAllocationData *jobAllocation
9299

93100
if len(rootDomain.Children) == 0 {
94101
for _, node := range rootDomain.Nodes {
95-
rootDomain.AllocatablePods += calcNodeAccomedation(jobAllocationData, node)
102+
rootDomain.AllocatablePods += calcNodeAccommodation(jobAllocationData, node)
96103
}
97104
return rootDomain.AllocatablePods, nil
98105
}
@@ -107,7 +114,7 @@ func (t *topologyPlugin) calcSubTreeAllocatable(jobAllocationData *jobAllocation
107114
return rootDomain.AllocatablePods, nil
108115
}
109116

110-
func calcNodeAccomedation(jobAllocationMetaData *jobAllocationMetaData, node *node_info.NodeInfo) int {
117+
func calcNodeAccommodation(jobAllocationMetaData *jobAllocationMetaData, node *node_info.NodeInfo) int {
111118
allocateablePodsCount := 0
112119
for _, resourceRepresentorPod := range jobAllocationMetaData.allocationTestPods {
113120
if node.IsTaskAllocatable(resourceRepresentorPod) {
@@ -145,10 +152,129 @@ func calcNextAllocationTestPodResources(previousTestResources, maxPodResources *
145152
}
146153
} else {
147154
updatedGpuResource := resource_info.NewGpuResourceRequirementWithMultiFraction(
148-
nPlus1Resources.GetNumOfGpuDevices(),
155+
nPlus1Resources.GetNumOfGpuDevices()+maxPodResources.GetNumOfGpuDevices(),
149156
nPlus1Resources.GpuFractionalPortion(),
150157
nPlus1Resources.GpuMemory())
151158
nPlus1Resources.GpuResourceRequirement = *updatedGpuResource
152159
}
153160
return nPlus1Resources
154161
}
162+
163+
func (t *topologyPlugin) getBestjobAllocateableDomains(job *podgroup_info.PodGroupInfo, topologyTree *TopologyInfo) ([]*TopologyDomainInfo, error) {
164+
relevantLevels, err := t.calculateRelevantDomainLevels(job, topologyTree.Name, topologyTree)
165+
if err != nil {
166+
return nil, err
167+
}
168+
taskToAllocateCount := len(podgroup_info.GetTasksToAllocate(job, t.subGroupOrderFunc, t.taskOrderFunc, true))
169+
170+
maxDepthDomains := []*TopologyDomainInfo{}
171+
for _, level := range relevantLevels {
172+
for _, domain := range topologyTree.DomainsByLevel[level] {
173+
if domain.AllocatablePods < taskToAllocateCount { // Filter domains that cannot allocate the job
174+
continue
175+
}
176+
177+
maxDepthDomains = append(maxDepthDomains, domain)
178+
}
179+
if len(maxDepthDomains) > 0 {
180+
break
181+
}
182+
}
183+
184+
if len(maxDepthDomains) == 0 {
185+
return nil, fmt.Errorf("no domains found for the job %s, workload topology name: %s",
186+
job.PodGroup.Name, topologyTree.Name)
187+
}
188+
189+
if job.PodGroup.Spec.TopologyConstraint.PreferredTopologyLevel != "" &&
190+
maxDepthDomains[0].Level != job.PodGroup.Spec.TopologyConstraint.PreferredTopologyLevel {
191+
// If Preferred is defined and we couldn't find a domain on the prefered level,
192+
// return a children subset and not a single domain
193+
return t.improveChoiceForPreference(maxDepthDomains, job)
194+
}
195+
196+
// For stage 1, return a single domain
197+
return []*TopologyDomainInfo{maxDepthDomains[0]}, nil
198+
}
199+
200+
func (*topologyPlugin) calculateRelevantDomainLevels(
201+
job *podgroup_info.PodGroupInfo, jobTopologyName string,
202+
topologyTree *TopologyInfo) ([]string, error) {
203+
requiredPlacement := job.PodGroup.Spec.TopologyConstraint.RequiredTopologyLevel
204+
preferredPlacement := job.PodGroup.Spec.TopologyConstraint.PreferredTopologyLevel
205+
if requiredPlacement == "" && preferredPlacement == "" {
206+
return nil, fmt.Errorf("no topology placement annotations found for job %s, workload topology name: %s", job.PodGroup.Name, jobTopologyName)
207+
}
208+
209+
foundRequiredLevel := false
210+
foundPreferredLevel := false
211+
relevantLevels := []string{}
212+
abovePreferredLevel := preferredPlacement == ""
213+
for _, level := range topologyTree.TopologyResource.Spec.Levels {
214+
if preferredPlacement != "" && preferredPlacement == level.NodeLabel {
215+
foundPreferredLevel = true
216+
abovePreferredLevel = true
217+
}
218+
219+
if !abovePreferredLevel {
220+
continue
221+
}
222+
relevantLevels = append(relevantLevels, level.NodeLabel)
223+
224+
if requiredPlacement != "" && requiredPlacement == level.NodeLabel {
225+
foundRequiredLevel = true
226+
break // Next level won't fulfill the required placement
227+
}
228+
}
229+
if requiredPlacement != "" && !foundRequiredLevel {
230+
return nil, fmt.Errorf("the topology %s doesn't have a level matching the required(%s) spesified for the job %s",
231+
jobTopologyName, requiredPlacement, job.Name,
232+
)
233+
}
234+
if preferredPlacement != "" && !foundPreferredLevel {
235+
return nil, fmt.Errorf("the topology %s doesn't have a level matching the preffered(%s) spesified for the job %s",
236+
jobTopologyName, preferredPlacement, job.Name,
237+
)
238+
}
239+
return relevantLevels, nil
240+
}
241+
242+
func (t *topologyPlugin) improveChoiceForPreference(maxDepthDomains []*TopologyDomainInfo, job *podgroup_info.PodGroupInfo) ([]*TopologyDomainInfo, error) {
243+
taskToAllocateCount := len(podgroup_info.GetTasksToAllocate(job, t.subGroupOrderFunc, t.taskOrderFunc, true))
244+
// Look for a subgroup of children domains that allows the job to be allocated
245+
// and return the one with the least number of domains required for the allocation
246+
bestChildrenSubset := []*TopologyDomainInfo{}
247+
for _, domain := range maxDepthDomains {
248+
childDomainSubset := getJobAllocateableChildrenSubset(domain, taskToAllocateCount)
249+
if len(bestChildrenSubset) == 0 || len(childDomainSubset) < len(bestChildrenSubset) {
250+
bestChildrenSubset = childDomainSubset
251+
}
252+
}
253+
return bestChildrenSubset, nil
254+
}
255+
256+
func getJobAllocateableChildrenSubset(domain *TopologyDomainInfo, taskToAllocateCount int) []*TopologyDomainInfo {
257+
children := slices.Clone(domain.Children)
258+
sort.SliceStable(children, func(i, j int) bool {
259+
return children[i].AllocatablePods > children[j].AllocatablePods
260+
})
261+
262+
allocateablePodsSum := 0
263+
childDomainSubset := []*TopologyDomainInfo{}
264+
for _, childDomain := range children {
265+
allocateablePodsSum += childDomain.AllocatablePods
266+
childDomainSubset = append(childDomainSubset, childDomain)
267+
if allocateablePodsSum >= taskToAllocateCount {
268+
break
269+
}
270+
}
271+
return childDomainSubset
272+
}
273+
274+
func (*topologyPlugin) treeAllocateableCleanup(topologyTree *TopologyInfo) {
275+
for _, levelDomains := range topologyTree.DomainsByLevel {
276+
for _, domain := range levelDomains {
277+
domain.AllocatablePods = 0
278+
}
279+
}
280+
}

0 commit comments

Comments
 (0)