Skip to content

Commit 4899978

Browse files
authored
plugin server - adding option for plugins to expose http api (#12)
1 parent 644903c commit 4899978

File tree

11 files changed

+89
-11
lines changed

11 files changed

+89
-11
lines changed

cmd/scheduler/app/options/options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type ServerOption struct {
5454
AllowConsolidatingReclaim bool
5555
NumOfStatusRecordingWorkers int
5656
GlobalDefaultStalenessGracePeriod time.Duration
57+
PluginServerPort int
5758

5859
QPS int
5960
Burst int
@@ -97,6 +98,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
9798
fs.BoolVar(&s.AllowConsolidatingReclaim, "allow-consolidating-reclaim", true, "Do not count pipelined pods towards 'reclaimed' resources")
9899
fs.IntVar(&s.NumOfStatusRecordingWorkers, "num-of-status-recording-workers", defaultNumOfStatusRecordingWorkers, "specifies the max number of go routines spawned to update pod and podgroups conditions and events. Defaults to 5")
99100
fs.DurationVar(&s.GlobalDefaultStalenessGracePeriod, "default-staleness-grace-period", defaultStalenessGracePeriod, "Global default staleness grace period duration. Negative values means infinite. Defaults to 60s")
101+
fs.IntVar(&s.PluginServerPort, "plugin-server-port", 8081, "The port to bind for plugin server requests")
100102

101103
utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
102104
}

cmd/scheduler/app/options/options_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func TestAddFlags(t *testing.T) {
4646
GlobalDefaultStalenessGracePeriod: defaultStalenessGracePeriod,
4747
NumOfStatusRecordingWorkers: defaultNumOfStatusRecordingWorkers,
4848
NodePoolLabelKey: defaultNodePoolLabelKey,
49+
PluginServerPort: 8081,
4950
}
5051

5152
if !reflect.DeepEqual(expected, s) {

cmd/scheduler/app/server.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ func RunApp() error {
8585
os.Exit(1)
8686
}
8787

88+
mux := http.NewServeMux()
89+
go func() {
90+
_ = http.ListenAndServe(fmt.Sprintf(":%d", so.PluginServerPort), mux)
91+
}()
92+
8893
setupProfiling(so)
8994
if err := setupLogging(so); err != nil {
9095
fmt.Printf("Failed to initialize loggers: %v", err)
@@ -96,7 +101,7 @@ func RunApp() error {
96101
config.QPS = float32(so.QPS)
97102
config.Burst = so.Burst
98103

99-
return Run(so, config)
104+
return Run(so, config, mux)
100105
}
101106

102107
func setupProfiling(so *options.ServerOption) {
@@ -122,14 +127,15 @@ func setupLogging(so *options.ServerOption) error {
122127
return nil
123128
}
124129

125-
func Run(opt *options.ServerOption, config *restclient.Config) error {
130+
func Run(opt *options.ServerOption, config *restclient.Config, mux *http.ServeMux) error {
126131
if opt.PrintVersion {
127132
version.PrintVersion()
128133
}
129134

130135
scheduler, err := scheduler.NewScheduler(config,
131136
opt.SchedulerConf,
132137
BuildSchedulerParams(opt),
138+
mux,
133139
)
134140
if err != nil {
135141
return err

pkg/env-tests/scheduler/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func RunScheduler(cfg *rest.Config, stopCh chan struct{}) error {
3939

4040
params := app.BuildSchedulerParams(opt)
4141

42-
s, err := scheduler.NewScheduler(cfg, "", params)
42+
s, err := scheduler.NewScheduler(cfg, "", params, nil)
4343
if err != nil {
4444
return err
4545
}

pkg/scheduler/framework/framework.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package framework
55

66
import (
7+
"net/http"
78
"time"
89

910
"k8s.io/apimachinery/pkg/types"
@@ -15,11 +16,15 @@ import (
1516
)
1617

1718
func OpenSession(cache cache.Cache, config *conf.SchedulerConfiguration,
18-
schedulerParams *conf.SchedulerParams, sessionId types.UID) (*Session, error) {
19+
schedulerParams *conf.SchedulerParams, sessionId types.UID, mux *http.ServeMux) (*Session, error) {
1920
openSessionStart := time.Now()
2021
defer metrics.UpdateOpenSessionDuration(openSessionStart)
2122

22-
ssn, err := openSession(cache, sessionId, *schedulerParams)
23+
if server == nil {
24+
server = newPluginServer(mux)
25+
}
26+
27+
ssn, err := openSession(cache, sessionId, *schedulerParams, mux)
2328
if err != nil {
2429
return nil, err
2530
}

pkg/scheduler/framework/server.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package framework
5+
6+
import (
7+
"fmt"
8+
"net/http"
9+
)
10+
11+
type PluginServer struct {
12+
registeredPlugins map[string]func(http.ResponseWriter, *http.Request)
13+
mux *http.ServeMux
14+
}
15+
16+
func newPluginServer(mux *http.ServeMux) *PluginServer {
17+
ps := &PluginServer{
18+
registeredPlugins: map[string]func(http.ResponseWriter, *http.Request){},
19+
mux: mux,
20+
}
21+
if ps.mux != nil {
22+
ps.mux.HandleFunc("/", ps.handlePlugin)
23+
}
24+
25+
return ps
26+
}
27+
28+
func (ps *PluginServer) registerPlugin(path string, handler func(http.ResponseWriter, *http.Request)) error {
29+
if ps.mux == nil {
30+
return fmt.Errorf("server not initialized")
31+
}
32+
33+
ps.registeredPlugins[path] = handler
34+
return nil
35+
}
36+
37+
func (ps *PluginServer) handlePlugin(w http.ResponseWriter, r *http.Request) {
38+
path := r.URL.Path
39+
handler, ok := ps.registeredPlugins[path]
40+
if !ok {
41+
http.Error(w, "plugin not found", http.StatusNotFound)
42+
return
43+
}
44+
handler(w, r)
45+
}

pkg/scheduler/framework/session.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package framework
55

66
import (
77
"fmt"
8+
"net/http"
89
"sort"
910
"sync"
1011
"time"
@@ -29,6 +30,8 @@ import (
2930
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/scheduler_util"
3031
)
3132

33+
var server *PluginServer
34+
3235
type Session struct {
3336
UID types.UID
3437
Cache cache.Cache
@@ -60,6 +63,7 @@ type Session struct {
6063
plugins map[string]Plugin
6164
eventHandlers []*EventHandler
6265
schedulerParams conf.SchedulerParams
66+
mux *http.ServeMux
6367

6468
k8sPodState map[types.UID]k8s_internal.SessionState
6569
}
@@ -307,7 +311,7 @@ func (ssn *Session) clear() {
307311
ssn.JobOrderFns = nil
308312
}
309313

310-
func openSession(cache cache.Cache, sessionId types.UID, schedulerParams conf.SchedulerParams) (*Session, error) {
314+
func openSession(cache cache.Cache, sessionId types.UID, schedulerParams conf.SchedulerParams, mux *http.ServeMux) (*Session, error) {
311315
ssn := &Session{
312316
UID: sessionId,
313317
Cache: cache,
@@ -318,8 +322,8 @@ func openSession(cache cache.Cache, sessionId types.UID, schedulerParams conf.Sc
318322

319323
plugins: map[string]Plugin{},
320324
schedulerParams: schedulerParams,
321-
322-
k8sPodState: map[types.UID]k8s_internal.SessionState{},
325+
mux: mux,
326+
k8sPodState: map[types.UID]k8s_internal.SessionState{},
323327
}
324328

325329
log.InfraLogger.V(2).Infof("Taking cluster snapshot ...")

pkg/scheduler/framework/session_plugins.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package framework
55

66
import (
7+
"net/http"
8+
79
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api"
810
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
911
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info"
@@ -63,6 +65,13 @@ func (ssn *Session) AddGetQueueAllocatedResourcesFn(of api.QueueResource) {
6365
ssn.GetQueueAllocatedResourcesFns = append(ssn.GetQueueAllocatedResourcesFns, of)
6466
}
6567

68+
func (ssn *Session) AddHttpHandler(path string, handler func(http.ResponseWriter, *http.Request)) {
69+
if server == nil {
70+
return
71+
}
72+
server.registerPlugin(path, handler)
73+
}
74+
6675
func (ssn *Session) CanReclaimResources(reclaimer *reclaimer_info.ReclaimerInfo) bool {
6776
for _, canReclaimFn := range ssn.CanReclaimResourcesFns {
6877
return canReclaimFn(reclaimer)

pkg/scheduler/plugins/podaffinity/podaffinity_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,9 @@ func testPodPreferredAffinity(testData testInput, clusterAffinityInfo pod_affini
203203
mockCache,
204204
&conf.SchedulerConfiguration{Tiers: []conf.Tier{}},
205205
&conf.SchedulerParams{},
206-
sessionId)
206+
sessionId,
207+
nil,
208+
)
207209
Expect(err).To(Succeed())
208210

209211
Expect(ssn.NodePreOrderFns).To(HaveLen(0), "Expected no node pre-order functions")

pkg/scheduler/plugins/proportion/proportion_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ var _ = Describe("Set Fair Share in Proportion", func() {
765765
RestrictSchedulingNodes: testData.isRestrictNode,
766766
SchedulerName: schedulerName,
767767
},
768-
"1")
768+
"1", nil)
769769
if got := getNodeResources(session, testData.node); !reflect.DeepEqual(got, testData.want) {
770770
Fail(fmt.Sprintf("getNodeResources() = %v, want %v", got, testData.want))
771771
}

0 commit comments

Comments
 (0)