Skip to content

Commit 97583b7

Browse files
authored
Add job order reflection plugin with HTTP endpoint (#445)
* feat: (GH:#135) Add JobOrder plugin to reflect job order in http endpoint Signed-off-by: Saurabh Kumar Singh <[email protected]> --------- Signed-off-by: Saurabh Kumar Singh <[email protected]>
1 parent f6ab7bd commit 97583b7

File tree

3 files changed

+183
-0
lines changed

3 files changed

+183
-0
lines changed

pkg/scheduler/plugins/factory.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/plugins/priority"
3737
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/plugins/proportion"
3838
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/plugins/ray"
39+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/plugins/reflectjoborder"
3940
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/plugins/resourcetype"
4041
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/plugins/snapshot"
4142
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/plugins/subgrouporder"
@@ -69,4 +70,7 @@ func InitDefaultPlugins() {
6970

7071
// Other Plugins
7172
framework.RegisterPluginBuilder("snapshot", snapshot.New)
73+
74+
// Always register the Job Order Plugin last.
75+
framework.RegisterPluginBuilder("reflectjoborder", reflectjoborder.New)
7276
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package reflectjoborder
5+
6+
import (
7+
"encoding/json"
8+
"net/http"
9+
10+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions/utils"
11+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
12+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework"
13+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log"
14+
)
15+
16+
type JobOrder struct {
17+
ID common_info.PodGroupID `json:"id"`
18+
Priority int32 `json:"priority"`
19+
}
20+
21+
type ReflectJobOrder struct {
22+
GlobalOrder []JobOrder `json:"global_order"`
23+
QueueOrder map[common_info.QueueID][]JobOrder `json:"queue_order"`
24+
}
25+
26+
type JobOrderPlugin struct {
27+
session *framework.Session
28+
ReflectJobOrder *ReflectJobOrder
29+
}
30+
31+
func (jp *JobOrderPlugin) Name() string {
32+
return "joborder"
33+
}
34+
35+
func New(arguments map[string]string) framework.Plugin {
36+
return &JobOrderPlugin{}
37+
}
38+
39+
func (jp *JobOrderPlugin) OnSessionOpen(ssn *framework.Session) {
40+
jp.session = ssn
41+
log.InfraLogger.V(3).Info("Job Order registering get-jobs")
42+
43+
jp.ReflectJobOrder = &ReflectJobOrder{
44+
GlobalOrder: make([]JobOrder, 0),
45+
QueueOrder: make(map[common_info.QueueID][]JobOrder),
46+
}
47+
48+
jobsOrderByQueues := utils.NewJobsOrderByQueues(ssn, utils.JobsOrderInitOptions{
49+
FilterNonPending: true,
50+
FilterUnready: true,
51+
MaxJobsQueueDepth: ssn.GetJobsDepth(framework.Allocate),
52+
})
53+
jobsOrderByQueues.InitializeWithJobs(ssn.PodGroupInfos)
54+
55+
for !jobsOrderByQueues.IsEmpty() {
56+
job := jobsOrderByQueues.PopNextJob()
57+
jobOrder := JobOrder{
58+
ID: job.UID,
59+
Priority: job.Priority,
60+
}
61+
jp.ReflectJobOrder.GlobalOrder = append(jp.ReflectJobOrder.GlobalOrder, jobOrder)
62+
jp.ReflectJobOrder.QueueOrder[job.Queue] = append(jp.ReflectJobOrder.QueueOrder[job.Queue], jobOrder)
63+
}
64+
65+
ssn.AddHttpHandler("/get-job-order", jp.serveJobs)
66+
}
67+
68+
func (jp *JobOrderPlugin) OnSessionClose(ssn *framework.Session) {}
69+
70+
func (jp *JobOrderPlugin) serveJobs(w http.ResponseWriter, r *http.Request) {
71+
if jp.ReflectJobOrder == nil {
72+
http.Error(w, "Job order data not ready", http.StatusServiceUnavailable)
73+
return
74+
}
75+
w.Header().Set("Content-Type", "application/json")
76+
enc := json.NewEncoder(w)
77+
enc.SetIndent("", " ")
78+
if err := enc.Encode(jp.ReflectJobOrder); err != nil {
79+
http.Error(w, "Failed to encode job order data", http.StatusInternalServerError)
80+
}
81+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package reflectjoborder
5+
6+
import (
7+
"bytes"
8+
"encoding/json"
9+
"net/http"
10+
"net/http/httptest"
11+
"testing"
12+
13+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
14+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info"
15+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf"
16+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework"
17+
)
18+
19+
func TestJobOrderPlugin_OnSessionOpen(t *testing.T) {
20+
ssn := &framework.Session{
21+
PodGroupInfos: map[common_info.PodGroupID]*podgroup_info.PodGroupInfo{
22+
"pg1": {UID: "pg1", Priority: 5, Queue: "q1"},
23+
"pg2": {UID: "pg2", Priority: 2, Queue: "q2"},
24+
},
25+
Config: &conf.SchedulerConfiguration{
26+
QueueDepthPerAction: map[string]int{"Allocate": 10},
27+
},
28+
}
29+
plugin := &JobOrderPlugin{}
30+
plugin.OnSessionOpen(ssn)
31+
32+
if plugin.ReflectJobOrder == nil {
33+
t.Fatalf("ReflectJobOrder should be initialized")
34+
}
35+
}
36+
37+
// Test serveJobs returns correct JSON and status when ReflectJobOrder is set
38+
func TestServeJobs_ReflectJobOrderReady(t *testing.T) {
39+
plugin := &JobOrderPlugin{
40+
ReflectJobOrder: &ReflectJobOrder{
41+
GlobalOrder: []JobOrder{{ID: "pg1", Priority: 10}},
42+
QueueOrder: map[common_info.QueueID][]JobOrder{"q1": {{ID: "pg1", Priority: 10}}},
43+
},
44+
}
45+
req := httptest.NewRequest(http.MethodGet, "/get-job-order", nil)
46+
rr := httptest.NewRecorder()
47+
plugin.serveJobs(rr, req)
48+
if rr.Code != http.StatusOK {
49+
t.Errorf("Expected HTTP 200 OK, got %d", rr.Code)
50+
}
51+
var resp ReflectJobOrder
52+
if err := json.NewDecoder(bytes.NewReader(rr.Body.Bytes())).Decode(&resp); err != nil {
53+
t.Fatalf("Failed to decode serveJobs response: %v", err)
54+
}
55+
if len(resp.GlobalOrder) != 1 || resp.GlobalOrder[0].Priority != 10 {
56+
t.Errorf("Unexpected response json: %+v", resp)
57+
}
58+
if len(resp.QueueOrder) != 1 {
59+
t.Errorf("Expected 1 queue, got %d", len(resp.QueueOrder))
60+
}
61+
}
62+
63+
// Test serveJobs returns 503 if ReflectJobOrder is nil
64+
func TestServeJobs_ReflectJobOrderNotReady(t *testing.T) {
65+
plugin := &JobOrderPlugin{ReflectJobOrder: nil}
66+
req := httptest.NewRequest(http.MethodGet, "/get-job-order", nil)
67+
rr := httptest.NewRecorder()
68+
plugin.serveJobs(rr, req)
69+
if rr.Code != http.StatusServiceUnavailable {
70+
t.Errorf("Expected HTTP 503, got %d", rr.Code)
71+
}
72+
if !bytes.Contains(rr.Body.Bytes(), []byte("Job order data not ready")) {
73+
t.Errorf("Expected error message in body, got: %s", rr.Body.String())
74+
}
75+
}
76+
77+
// Test serveJobs handles encoding error gracefully
78+
type brokenWriter struct{ http.ResponseWriter }
79+
80+
func (b *brokenWriter) Write(_ []byte) (int, error) { return 0, errEncode }
81+
82+
var errEncode = &encodeError{"forced encode error"}
83+
84+
type encodeError struct{ msg string }
85+
86+
func (e *encodeError) Error() string { return e.msg }
87+
88+
func TestServeJobs_EncodeError(t *testing.T) {
89+
plugin := &JobOrderPlugin{ReflectJobOrder: &ReflectJobOrder{}}
90+
req := httptest.NewRequest(http.MethodGet, "/get-job-order", nil)
91+
rr := httptest.NewRecorder()
92+
bw := &brokenWriter{rr}
93+
plugin.serveJobs(bw, req)
94+
// Should write 500 error
95+
if rr.Code != http.StatusInternalServerError {
96+
t.Errorf("Expected HTTP 500, got %d", rr.Code)
97+
}
98+
}

0 commit comments

Comments
 (0)