Skip to content

Commit 33bd9d0

Browse files
authored
Natasha/add queue metrics (#320)
* added metrics to queue controller * added reset queue metrics when needed * added tests + fixes * small typo fix * little fix for tests * added some tests to improve coverage + fix the tests found * fix tests * CR fixes - remove unnecessary labels * CR fixes - DefaultMetricsNamespace const in common consts
1 parent 4ff3e8b commit 33bd9d0

File tree

11 files changed

+568
-6
lines changed

11 files changed

+568
-6
lines changed

cmd/queuecontroller/app/app.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
ctrl "sigs.k8s.io/controller-runtime"
2222
"sigs.k8s.io/controller-runtime/pkg/healthz"
2323
"sigs.k8s.io/controller-runtime/pkg/log/zap"
24+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
2425

2526
"github.com/NVIDIA/KAI-scheduler/pkg/queuecontroller/controllers"
27+
"github.com/NVIDIA/KAI-scheduler/pkg/queuecontroller/metrics"
2628
// +kubebuilder:scaffold:imports
2729
)
2830

@@ -47,9 +49,14 @@ func Run() error {
4749

4850
initLogger()
4951

52+
metrics.InitMetrics(opts.MetricsNamespace, opts.QueueLabelToMetricLabel.Get(), opts.QueueLabelToDefaultMetricValue.Get())
53+
5054
var err error
5155
options := ctrl.Options{
52-
Scheme: scheme,
56+
Scheme: scheme,
57+
Metrics: metricsserver.Options{
58+
BindAddress: opts.MetricsAddress,
59+
},
5360
LeaderElection: opts.EnableLeaderElection,
5461
LeaderElectionID: "42ece193.run.ai",
5562
}

cmd/queuecontroller/app/options.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,31 @@ package app
55

66
import (
77
"flag"
8+
9+
"github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
10+
kaiflags "github.com/NVIDIA/KAI-scheduler/pkg/common/flags"
11+
)
12+
13+
const (
14+
defaultSchedulingQueueLabelKey = "kai.scheduler/queue"
15+
defaultMetricsAddress = ":8080"
816
)
917

1018
type Options struct {
1119
EnableLeaderElection bool
1220
SchedulingQueueLabelKey string
21+
22+
MetricsAddress string
23+
MetricsNamespace string
24+
QueueLabelToMetricLabel kaiflags.StringMapFlag
25+
QueueLabelToDefaultMetricValue kaiflags.StringMapFlag
1326
}
1427

1528
func (o *Options) AddFlags(fs *flag.FlagSet) {
1629
fs.BoolVar(&o.EnableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
17-
fs.StringVar(&o.SchedulingQueueLabelKey, "queue-label-key", "kai.scheduler/queue", "Scheduling queue label key name")
30+
fs.StringVar(&o.SchedulingQueueLabelKey, "queue-label-key", defaultSchedulingQueueLabelKey, "Scheduling queue label key name.")
31+
fs.StringVar(&o.MetricsAddress, "metrics-listen-address", defaultMetricsAddress, "The address the metrics endpoint binds to.")
32+
fs.StringVar(&o.MetricsNamespace, "metrics-namespace", constants.DefaultMetricsNamespace, "Metrics namespace.")
33+
fs.Var(&o.QueueLabelToMetricLabel, "queue-label-to-metric-label", "Map of queue label keys to metric label keys, e.g. 'foo=bar,baz=qux'.")
34+
fs.Var(&o.QueueLabelToDefaultMetricValue, "queue-label-to-default-metric-value", "Map of queue label keys to default metric values, in case the label doesn't exist on the queue, e.g. 'foo=1,baz=0'.")
1835
}

cmd/scheduler/app/options/options.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ import (
2525
"github.com/spf13/pflag"
2626
utilfeature "k8s.io/apiserver/pkg/util/feature"
2727

28+
"github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
2829
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log"
2930
)
3031

3132
const (
3233
defaultSchedulerName = "kai-scheduler"
3334
defaultResourceReservationAppLabel = "kai-resource-reservation"
34-
defaultMetricsNamespace = "kai"
3535
defaultNamespace = "kai-scheduler"
3636
defaultSchedulerPeriod = time.Second
3737
defaultStalenessGracePeriod = 60 * time.Second
@@ -99,7 +99,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
9999
// kai-scheduler will ignore pods with scheduler names other than specified with the option
100100
fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "The scheduler name in pod spec that handled by this scheduler")
101101
fs.StringVar(&s.Namspace, "namespace", defaultNamespace, "Scheduler service namespace")
102-
fs.StringVar(&s.MetricsNamespace, "metrics-namespace", defaultMetricsNamespace, "Metrics namespace")
102+
fs.StringVar(&s.MetricsNamespace, "metrics-namespace", constants.DefaultMetricsNamespace, "Metrics namespace")
103103
fs.StringVar(&s.ResourceReservationAppLabel, "resource-reservation-app-label", defaultResourceReservationAppLabel, "App label value of resource reservation pods")
104104
fs.BoolVar(&s.RestrictSchedulingNodes, "restrict-node-scheduling", false, "kai-scheduler will allocate jobs only to restricted nodes")
105105
fs.StringVar(&s.NodePoolLabelKey, "nodepool-label-key", defaultNodePoolLabelKey, "The label key by which to filter scheduling nodepool")

cmd/scheduler/app/options/options_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"testing"
2525
"time"
2626

27+
"github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
28+
2729
"github.com/spf13/pflag"
2830
"k8s.io/apimachinery/pkg/util/diff"
2931
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -45,7 +47,7 @@ func TestAddFlags(t *testing.T) {
4547
expected := &ServerOption{
4648
SchedulerName: defaultSchedulerName,
4749
Namspace: defaultNamespace,
48-
MetricsNamespace: defaultMetricsNamespace,
50+
MetricsNamespace: constants.DefaultMetricsNamespace,
4951
ResourceReservationAppLabel: defaultResourceReservationAppLabel,
5052
SchedulePeriod: 5 * time.Minute,
5153
PrintVersion: true,

pkg/common/constants/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
UnlimitedResourceQuantity = float64(-1)
1111
DefaultQueuePriority = 100
1212
DefaultNodePoolName = "default"
13+
DefaultMetricsNamespace = "kai"
1314

1415
// Pod Groups
1516
PodGrouperWarning = "PodGrouperWarning"

pkg/common/flags/flags_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package flags
5+
6+
import (
7+
. "github.com/onsi/ginkgo/v2"
8+
. "github.com/onsi/gomega"
9+
"golang.org/x/exp/slices"
10+
"strings"
11+
"testing"
12+
)
13+
14+
func TestCommonFlags(t *testing.T) {
15+
RegisterFailHandler(Fail)
16+
RunSpecs(t, "common flags tests")
17+
}
18+
19+
var _ = Describe("StringMapFlag", func() {
20+
It("parses an empty string as an empty map", func() {
21+
var m StringMapFlag
22+
Expect(m.Set("")).To(Succeed())
23+
Expect(m.Get()).To(BeEmpty())
24+
})
25+
26+
It("parses a single key=value pair", func() {
27+
var m StringMapFlag
28+
Expect(m.Set("foo=bar")).To(Succeed())
29+
Expect(m.Get()).To(HaveKeyWithValue("foo", "bar"))
30+
Expect(len(m.Get())).To(Equal(1))
31+
})
32+
33+
It("parses multiple key=value pairs", func() {
34+
var m StringMapFlag
35+
Expect(m.Set("foo=bar,baz=qux")).To(Succeed())
36+
Expect(m.Get()).To(HaveKeyWithValue("foo", "bar"))
37+
Expect(m.Get()).To(HaveKeyWithValue("baz", "qux"))
38+
Expect(len(m.Get())).To(Equal(2))
39+
})
40+
41+
It("overwrites duplicate keys with the last value", func() {
42+
var m StringMapFlag
43+
Expect(m.Set("foo=bar,foo=baz")).To(Succeed())
44+
Expect(m.Get()).To(HaveKeyWithValue("foo", "baz"))
45+
Expect(len(m.Get())).To(Equal(1))
46+
})
47+
48+
It("returns an error for invalid input", func() {
49+
var m StringMapFlag
50+
err := m.Set("foo,bar=baz")
51+
Expect(err).To(HaveOccurred())
52+
})
53+
54+
It("String() returns the correct string representation", func() {
55+
var m StringMapFlag
56+
err := m.Set("foo=bar,baz=qux")
57+
Expect(err).ToNot(HaveOccurred())
58+
str := m.String()
59+
// Accept either order
60+
Expect([]string{str, reversePairs(str)}).To(ContainElement("foo=bar,baz=qux"))
61+
})
62+
})
63+
64+
// Helper to reverse the order of pairs in a comma-separated string
65+
func reversePairs(s string) string {
66+
pairs := strings.Split(s, ",")
67+
slices.Reverse(pairs)
68+
return strings.Join(pairs, ",")
69+
}

pkg/common/flags/string_map.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package flags
5+
6+
import (
7+
"fmt"
8+
"strings"
9+
)
10+
11+
type StringMapFlag map[string]string
12+
13+
func (m *StringMapFlag) Get() map[string]string {
14+
if *m == nil {
15+
return make(map[string]string)
16+
}
17+
return *m
18+
}
19+
20+
func (m *StringMapFlag) String() string {
21+
// Convert map to string: key1=val1,key2=val2
22+
var pairs []string
23+
for k, v := range *m {
24+
pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
25+
}
26+
return strings.Join(pairs, ",")
27+
}
28+
29+
func (m *StringMapFlag) Set(value string) error {
30+
// Convert string to map: key1=val1,key2=val2
31+
if *m == nil {
32+
*m = make(map[string]string)
33+
}
34+
if value == "" {
35+
return nil
36+
}
37+
pairs := strings.Split(value, ",")
38+
for _, pair := range pairs {
39+
kv := strings.SplitN(pair, "=", 2)
40+
if len(kv) != 2 {
41+
return fmt.Errorf("invalid map item: %q", pair)
42+
}
43+
(*m)[kv[0]] = kv[1]
44+
}
45+
return nil
46+
}

pkg/queuecontroller/controllers/queue_controller.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/NVIDIA/KAI-scheduler/pkg/queuecontroller/common"
2121
"github.com/NVIDIA/KAI-scheduler/pkg/queuecontroller/controllers/childqueues_updater"
2222
"github.com/NVIDIA/KAI-scheduler/pkg/queuecontroller/controllers/resource_updater"
23+
"github.com/NVIDIA/KAI-scheduler/pkg/queuecontroller/metrics"
2324
)
2425

2526
// QueueReconciler reconciles a Queue object
@@ -54,7 +55,12 @@ func (r *QueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
5455
queue := &v2.Queue{}
5556
err := r.Get(ctx, req.NamespacedName, queue)
5657
if err != nil {
57-
return ctrl.Result{}, client.IgnoreNotFound(err)
58+
ignoreNotFoundErr := client.IgnoreNotFound(err)
59+
if ignoreNotFoundErr == nil {
60+
// If the queue is not found, reset its metrics
61+
metrics.ResetQueueMetrics(req.Name)
62+
}
63+
return ctrl.Result{}, ignoreNotFoundErr
5864
}
5965
originalQueue := queue.DeepCopy()
6066

@@ -73,6 +79,8 @@ func (r *QueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
7379
return ctrl.Result{}, fmt.Errorf("failed to patch status for queue %s, error: %v", queue.Name, err)
7480
}
7581

82+
metrics.SetQueueMetrics(queue)
83+
7684
return ctrl.Result{}, err
7785
}
7886

pkg/queuecontroller/controllers/suite_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,12 @@ import (
3838
logf "sigs.k8s.io/controller-runtime/pkg/log"
3939
"sigs.k8s.io/controller-runtime/pkg/log/zap"
4040

41+
"github.com/prometheus/client_golang/prometheus"
42+
"github.com/prometheus/client_golang/prometheus/testutil"
43+
4144
v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2"
4245
"github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
46+
"github.com/NVIDIA/KAI-scheduler/pkg/queuecontroller/metrics"
4347
)
4448

4549
const (
@@ -106,6 +110,11 @@ var _ = Describe("QueueController", Ordered, func() {
106110
BeforeAll(func() {
107111
ctx, cancel = context.WithCancel(context.Background())
108112

113+
metrics.InitMetrics("testns",
114+
map[string]string{"priority": "queue_priority", "some-other-label": "some_other_label"},
115+
map[string]string{"priority": "normal"},
116+
)
117+
109118
var err error
110119
mgr, err = ctrl.NewManager(cfg, ctrl.Options{
111120
Scheme: scheme.Scheme,
@@ -134,6 +143,12 @@ var _ = Describe("QueueController", Ordered, func() {
134143
})
135144

136145
Context("When managing child queues", func() {
146+
AfterAll(func() {
147+
deleteQueue(ctx, k8sClient, "child-queue-1")
148+
deleteQueue(ctx, k8sClient, "child-queue-2")
149+
deleteQueue(ctx, k8sClient, "parent-queue")
150+
})
151+
137152
It("Should update parent queue's childQueues field", func() {
138153
parentQueue := &v2.Queue{
139154
ObjectMeta: metav1.ObjectMeta{
@@ -175,6 +190,10 @@ var _ = Describe("QueueController", Ordered, func() {
175190
})
176191

177192
Context("When managing pod groups", func() {
193+
AfterAll(func() {
194+
deleteQueue(ctx, k8sClient, "resource-queue")
195+
})
196+
178197
It("Should update queue status with pod group resources", func() {
179198
queue := &v2.Queue{
180199
ObjectMeta: metav1.ObjectMeta{
@@ -283,4 +302,67 @@ var _ = Describe("QueueController", Ordered, func() {
283302
}, timeout, interval).Should(BeTrue())
284303
})
285304
})
305+
306+
Context("When setting queue metrics", func() {
307+
AfterAll(func() {
308+
deleteQueue(ctx, k8sClient, "test-queue")
309+
})
310+
311+
It("should set metrics for a queue with resources", func() {
312+
queue := &v2.Queue{
313+
ObjectMeta: metav1.ObjectMeta{
314+
Name: "test-queue",
315+
Labels: map[string]string{"priority": "high"},
316+
},
317+
Spec: v2.QueueSpec{
318+
Resources: &v2.QueueResources{
319+
GPU: v2.QueueResource{Quota: 2},
320+
CPU: v2.QueueResource{Quota: 2000},
321+
Memory: v2.QueueResource{Quota: 4},
322+
},
323+
},
324+
}
325+
Expect(k8sClient.Create(ctx, queue)).Should(Succeed())
326+
327+
labels := []string{"test-queue", "high", ""}
328+
329+
Eventually(func(q gomega.Gomega) {
330+
expectMetricValue(q, metrics.GetQueueInfoMetric(), labels, 1)
331+
expectMetricValue(q, metrics.GetQueueDeservedGPUsMetric(), labels, 2)
332+
expectMetricValue(q, metrics.GetQueueQuotaCPUMetric(), labels, 2)
333+
expectMetricValue(q, metrics.GetQueueQuotaMemoryMetric(), labels, 4000000)
334+
}, timeout, interval).Should(Succeed())
335+
336+
// delete the queue and expect metrics to be deleted
337+
Expect(k8sClient.Delete(ctx, queue)).Should(Succeed())
338+
339+
Eventually(func(q gomega.Gomega) {
340+
gathered := testutil.CollectAndCount(metrics.GetQueueInfoMetric())
341+
q.Expect(gathered).To(Equal(0))
342+
gathered = testutil.CollectAndCount(metrics.GetQueueDeservedGPUsMetric())
343+
q.Expect(gathered).To(Equal(0))
344+
gathered = testutil.CollectAndCount(metrics.GetQueueQuotaCPUMetric())
345+
q.Expect(gathered).To(Equal(0))
346+
gathered = testutil.CollectAndCount(metrics.GetQueueQuotaMemoryMetric())
347+
q.Expect(gathered).To(Equal(0))
348+
}, timeout, interval).Should(Succeed())
349+
})
350+
})
286351
})
352+
353+
func expectMetricValue(q gomega.Gomega, gauge *prometheus.GaugeVec, labels []string, expected float64) {
354+
metricGauge, err := gauge.GetMetricWithLabelValues(labels...)
355+
q.Expect(err).To(BeNil())
356+
q.Expect(metricGauge).ToNot(BeNil())
357+
q.Expect(testutil.ToFloat64(metricGauge)).To(BeEquivalentTo(expected))
358+
}
359+
360+
func deleteQueue(ctx context.Context, k8sClient client.Client, queueName string) {
361+
queueObj := &v2.Queue{}
362+
err := k8sClient.Get(ctx, types.NamespacedName{Name: queueName}, queueObj)
363+
if err != nil {
364+
return
365+
}
366+
367+
_ = k8sClient.Delete(ctx, queueObj)
368+
}

0 commit comments

Comments
 (0)