Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]

### Added
- Added support for parentless queues by adopting them under a synthetic default root queue [#845](https://github.com/NVIDIA/KAI-Scheduler/pull/845) [gshaibi](https://github.com/gshaibi)
- Added the option to disable prometheus service monitor creation [#810](https://github.com/NVIDIA/KAI-Scheduler/pull/810) [itsomri](https://github.com/itsomri)
- Fixed prometheus instance deprecation - ensure single instance [#779](https://github.com/NVIDIA/KAI-Scheduler/pull/779) [itsomri](https://github.com/itsomri)
- Added clear error messages for jobs referencing missing or orphan queues, reporting via events and conditions [#820](https://github.com/NVIDIA/KAI-Scheduler/pull/820) [gshaibi](https://github.com/gshaibi)
Expand Down
62 changes: 54 additions & 8 deletions pkg/scheduler/cache/cluster_info/cluster_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestSnapshot(t *testing.T) {
},
},
expectedNodes: 1,
expectedQueues: 2,
expectedQueues: 3, // my-department, my-queue, and synthetic "default" root
expectedBindRequests: 1,
},
"SingleFromEach2": {
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestSnapshot(t *testing.T) {
},
},
expectedNodes: 1,
expectedQueues: 2,
expectedQueues: 3, // my-department, my-queue, and synthetic "default" root
},
}

Expand Down Expand Up @@ -1418,15 +1418,46 @@ func TestSnapshotQueues(t *testing.T) {
)
snapshot, err := clusterInfo.Snapshot()
assert.Nil(t, err)
assert.Equal(t, 2, len(snapshot.Queues))
// Expecting 3 queues: default (synthetic root), department0, and queue0
// department0-a is filtered out by nodepool label
assert.Equal(t, 3, len(snapshot.Queues))
assert.Equal(t, common_info.QueueID("queue0"), snapshot.Queues["queue0"].UID)
assert.Equal(t, common_info.QueueID("department0"), snapshot.Queues["department0"].UID)
assert.Equal(t, "queue0", snapshot.Queues["queue0"].Name)
assert.Equal(t, "department-zero", snapshot.Queues["department0"].Name)
assert.Equal(t, common_info.QueueID(""), snapshot.Queues["department0"].ParentQueue)
// Parentless queues (queues without a parent) are adopted by the synthetic "default" root queue
assert.Equal(t, common_info.QueueID("department0"), snapshot.Queues["queue0"].ParentQueue)
assert.Equal(t, []common_info.QueueID{"queue0"}, snapshot.Queues["department0"].ChildQueues)
assert.Equal(t, []common_info.QueueID{}, snapshot.Queues["queue0"].ChildQueues)
// Verify the default root queue exists
assert.Equal(t, common_info.QueueID(defaultQueueName), snapshot.Queues[defaultQueueName].UID)
}

func TestSnapshotQueues_TwoLevelHierarchyLimit(t *testing.T) {
// Create 3-level hierarchy: grandparent -> parent -> child
objs := []runtime.Object{
&enginev2.Queue{
ObjectMeta: metav1.ObjectMeta{Name: "grandparent"},
Spec: enginev2.QueueSpec{},
},
&enginev2.Queue{
ObjectMeta: metav1.ObjectMeta{Name: "parent"},
Spec: enginev2.QueueSpec{ParentQueue: "grandparent"},
},
&enginev2.Queue{
ObjectMeta: metav1.ObjectMeta{Name: "child"},
Spec: enginev2.QueueSpec{ParentQueue: "parent"},
},
}

clusterInfo := newClusterInfoTests(t, clusterInfoTestParams{kaiSchedulerObjects: objs})
snapshot, err := clusterInfo.Snapshot()
assert.Nil(t, err)

// Parent should be detached from grandparent to enforce 2-level limit
assert.Equal(t, common_info.QueueID(""), snapshot.Queues["parent"].ParentQueue)
assert.Equal(t, common_info.QueueID("parent"), snapshot.Queues["child"].ParentQueue)
}

func TestSnapshotFlatHierarchy(t *testing.T) {
Expand Down Expand Up @@ -1486,14 +1517,16 @@ func TestSnapshotFlatHierarchy(t *testing.T) {

snapshot, err := clusterInfo.Snapshot()
assert.Nil(t, err)
assert.Equal(t, 3, len(snapshot.Queues))
// In ProjectLevelFairness mode, all queues are flattened under the synthetic "default" root:
// default, department0, department1, queue0, queue1
assert.Equal(t, 5, len(snapshot.Queues))

defaultParentQueueId := common_info.QueueID(defaultQueueName)
parentQueue, found := snapshot.Queues[defaultParentQueueId]
defaultQueue, found := snapshot.Queues[defaultParentQueueId]
assert.True(t, found)
assert.Equal(t, parentQueue.Name, defaultQueueName)
assert.Equal(t, parentQueue.UID, defaultParentQueueId)
assert.Equal(t, parentQueue.Resources, queue_info.QueueQuota{
assert.Equal(t, defaultQueue.Name, defaultQueueName)
assert.Equal(t, defaultQueue.UID, defaultParentQueueId)
assert.Equal(t, defaultQueue.Resources, queue_info.QueueQuota{
GPU: queue_info.ResourceQuota{
Quota: -1,
OverQuotaWeight: 1,
Expand All @@ -1510,13 +1543,26 @@ func TestSnapshotFlatHierarchy(t *testing.T) {
Limit: -1,
},
})

// In flat hierarchy mode, all queues should have "default" as their parent
snapshotDept0, found := snapshot.Queues[common_info.QueueID(parentQueue0.Name)]
assert.True(t, found)
assert.Equal(t, snapshotDept0.ParentQueue, defaultParentQueueId)

snapshotDept1, found := snapshot.Queues[common_info.QueueID(parentQueue1.Name)]
assert.True(t, found)
assert.Equal(t, snapshotDept1.ParentQueue, defaultParentQueueId)

snapshotQueue0, found := snapshot.Queues[common_info.QueueID(queue0.Name)]
assert.True(t, found)
assert.Equal(t, snapshotQueue0.ParentQueue, defaultParentQueueId)

snapshotQueue1, found := snapshot.Queues[common_info.QueueID(queue1.Name)]
assert.True(t, found)
assert.Equal(t, snapshotQueue1.ParentQueue, defaultParentQueueId)

// Default queue should have all 4 user queues as children
assert.ElementsMatch(t, []common_info.QueueID{"department0", "department1", "queue0", "queue1"}, defaultQueue.ChildQueues)
}

func TestGetPodGroupPriority(t *testing.T) {
Expand Down
44 changes: 28 additions & 16 deletions pkg/scheduler/cache/cluster_info/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/queue_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log"

"github.com/samber/lo"
)

const (
defaultQueueName = "default"
defaultQueueName = "__default__"
)

func (c *ClusterInfo) getDefaultParentQueue() *queue_info.QueueInfo {
Expand Down Expand Up @@ -55,25 +57,33 @@ func (c *ClusterInfo) snapshotQueues() (map[common_info.QueueID]*queue_info.Queu
return nil, err
}

result := map[common_info.QueueID]*queue_info.QueueInfo{}
if c.fairnessLevelType == FullFairness {
for _, queue := range queues {
queueInfo := queue_info.NewQueueInfo(queue)
result[queueInfo.UID] = queueInfo
for _, queue := range queues {
// Adopt parentless queues (no parent specified) under the default root queue.
// In ProjectLevelFairness mode, flatten all queues under the default root.
if len(queue.Spec.ParentQueue) == 0 || c.fairnessLevelType == ProjectLevelFairness {
queue.Spec.ParentQueue = defaultQueueName
}
} else if c.fairnessLevelType == ProjectLevelFairness {
defaultParentQueue := c.getDefaultParentQueue()
result[defaultParentQueue.UID] = defaultParentQueue

for _, queue := range queues {
if len(queue.Spec.ParentQueue) > 0 {
queue.Spec.ParentQueue = defaultQueueName
queueInfo := queue_info.NewQueueInfo(queue)
result[queueInfo.UID] = queueInfo
}
}

queuesByName := lo.SliceToMap(queues, func(queue *enginev2.Queue) (string, *enginev2.Queue) {
return queue.Name, queue
})
// Detach queues from their grandparents to enforce 2-level hierarchy limit
// TODO: Remove this restriction when n-level hierarchy support is added
for _, queue := range queues {
if queue.Spec.ParentQueue != "" && queuesByName[queue.Spec.ParentQueue] != nil {
queuesByName[queue.Spec.ParentQueue].Spec.ParentQueue = ""
}
}

result := map[common_info.QueueID]*queue_info.QueueInfo{}
defaultParentQueue := c.getDefaultParentQueue()
result[defaultParentQueue.UID] = defaultParentQueue
for _, queue := range queues {
queueInfo := queue_info.NewQueueInfo(queue)
result[queueInfo.UID] = queueInfo
}

return result, nil
}

Expand Down Expand Up @@ -102,6 +112,8 @@ func updateQueueChildren(queues map[common_info.QueueID]*queue_info.QueueInfo) {
}
}

// cleanQueueOrphans removes queues that reference a non-existent parent queue
// (parentless queues are not considered orphans)
func cleanQueueOrphans(queues map[common_info.QueueID]*queue_info.QueueInfo) {
for queueId, queue := range queues {
if queue.ParentQueue != "" {
Expand Down
Loading