diff --git a/docs/metrics/README.md b/docs/metrics/README.md new file mode 100644 index 000000000..7f00b763b --- /dev/null +++ b/docs/metrics/README.md @@ -0,0 +1,22 @@ +# Kube-Prometheus-Stack +install prometheus operator and enable prometheus instance (and grafana if you want): +``` +helm repo add prometheus-community https://prometheus-community.github.io/helm-charts +helm repo update prometheus-community +helm upgrade -i --create-namespace -n monitoring kube-prometheus-stack prometheus-community/kube-prometheus-stack --values kube-prometheus-values.yaml +``` + +# Service Monitors for kai services + +Install a prometheus instance and the relevant service monitors in kai-scheduler namespace: + +```sh +kubectl apply -f prometheus.yaml +kubectl apply -f service-monitors.yaml +``` + +To enable the prometheus as a grafana datasource, if desired, apply grafana-datasource.yaml: + +```sh +kubectl apply -f grafana-datasource.yaml +``` \ No newline at end of file diff --git a/docs/metrics/grafana-datasource.yaml b/docs/metrics/grafana-datasource.yaml new file mode 100644 index 000000000..ebbd53d5d --- /dev/null +++ b/docs/metrics/grafana-datasource.yaml @@ -0,0 +1,23 @@ +# Copyright 2025 NVIDIA CORPORATION +# SPDX-License-Identifier: Apache-2.0 + +apiVersion: v1 +kind: ConfigMap +metadata: + name: grafana-datasource-kai-prom + namespace: monitoring # Modify according to your Grafana namespace + labels: + grafana_datasource: "1" +data: + kai-prometheus.yaml: | + apiVersion: 1 + datasources: + - name: kai-prometheus + type: prometheus + access: proxy + url: http://prometheus-operated.kai-scheduler.svc:9090 # Modify according to your Prometheus URL + isDefault: false + editable: true + jsonData: + httpMethod: POST + timeInterval: 30s \ No newline at end of file diff --git a/docs/metrics/kube-prometheus-values.yaml b/docs/metrics/kube-prometheus-values.yaml new file mode 100644 index 000000000..279116bd2 --- /dev/null +++ b/docs/metrics/kube-prometheus-values.yaml @@ -0,0 +1,23 @@ +# Copyright 2025 NVIDIA CORPORATION +# SPDX-License-Identifier: Apache-2.0 + +prometheus: + enabled: true +grafana: + enabled: true + persistence: + enabled: true + type: pvc + accessModes: + - ReadWriteOnce + size: 10Gi + # storageClassName: "" # uncomment and set to your StorageClass if not using the default + finalizers: + - kubernetes.io/pvc-protection + # Keep sidecars enabled so ConfigMaps like `docs/metrics/grafana-datasource.yaml` + # (labeled with `grafana_datasource: "1"`) are automatically loaded. + sidecar: + datasources: + enabled: true + dashboards: + enabled: true diff --git a/docs/metrics/prometheus.yaml b/docs/metrics/prometheus.yaml new file mode 100644 index 000000000..84d8c000e --- /dev/null +++ b/docs/metrics/prometheus.yaml @@ -0,0 +1,65 @@ +# Copyright 2025 NVIDIA CORPORATION +# SPDX-License-Identifier: Apache-2.0 + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: prometheus + namespace: kai-scheduler +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: prometheus +rules: +- apiGroups: [""] + resources: + - nodes + - nodes/proxy + - services + - endpoints + - pods + verbs: ["get", "list", "watch"] +- apiGroups: [""] + resources: + - configmaps + verbs: ["get"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: prometheus +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: prometheus +subjects: +- kind: ServiceAccount + name: prometheus + namespace: kai-scheduler +--- +apiVersion: monitoring.coreos.com/v1 +kind: Prometheus +metadata: + name: kai + namespace: kai-scheduler +spec: + replicas: 1 + serviceAccountName: prometheus + enableFeatures: + - promql-experimental-functions + scrapeInterval: 1m + storage: + volumeClaimTemplate: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 20Gi + serviceMonitorSelector: + matchLabels: + accounting: kai-scheduler + serviceMonitorNamespaceSelector: {} + podMonitorNamespaceSelector: {} + probeNamespaceSelector: {} \ No newline at end of file diff --git a/docs/metrics/service-monitors.yaml b/docs/metrics/service-monitors.yaml new file mode 100644 index 000000000..14b4f96bf --- /dev/null +++ b/docs/metrics/service-monitors.yaml @@ -0,0 +1,96 @@ +# Copyright 2025 NVIDIA CORPORATION +# SPDX-License-Identifier: Apache-2.0 + +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: binder + namespace: kai-scheduler + labels: + accounting: kai-scheduler +spec: + jobLabel: binder + namespaceSelector: + matchNames: + - kai-scheduler + selector: + matchLabels: + app: binder + endpoints: + - port: http-metrics + bearerTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: scheduler + namespace: kai-scheduler + labels: + accounting: kai-scheduler +spec: + jobLabel: scheduler + namespaceSelector: + matchNames: + - kai-scheduler + selector: + matchLabels: + app: scheduler + endpoints: + - port: http-metrics + bearerTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: queuecontroller + namespace: kai-scheduler + labels: + accounting: kai-scheduler +spec: + jobLabel: queuecontroller + namespaceSelector: + matchNames: + - kai-scheduler + selector: + matchLabels: + app: queuecontroller + endpoints: + - port: metrics +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: dcgm-exporter + namespace: kai-scheduler + labels: + accounting: kai-scheduler +spec: + jobLabel: dcgm-exporter + namespaceSelector: + matchNames: + - gpu-operator + selector: + matchLabels: + app: nvidia-dcgm-exporter + endpoints: + - port: gpu-metrics +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: kube-state-metrics + namespace: kai-scheduler + labels: + accounting: kai-scheduler +spec: + jobLabel: kube-state-metrics + namespaceSelector: + matchNames: + - monitoring + selector: + matchLabels: + app.kubernetes.io/name: kube-state-metrics + endpoints: + - port: http + interval: 30s + bearerTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token \ No newline at end of file diff --git a/go.mod b/go.mod index 4d97410f3..0b7a48f99 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/onsi/gomega v1.37.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.22.0 + github.com/prometheus/common v0.63.0 github.com/ray-project/kuberay/ray-operator v1.3.1 github.com/run-ai/kwok-operator v0.0.0-20240926063032-05b6364bc7c7 github.com/spf13/pflag v1.0.6 @@ -142,7 +143,6 @@ require ( github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.63.0 // indirect github.com/prometheus/procfs v0.16.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/samber/lo v1.47.0 // indirect diff --git a/go.sum b/go.sum index 8225cc833..d052914e0 100644 --- a/go.sum +++ b/go.sum @@ -192,6 +192,8 @@ github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -233,6 +235,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4= diff --git a/pkg/scheduler/api/queue_info/quota_info.go b/pkg/scheduler/api/queue_info/quota_info.go index e17cb6498..2d2174cf6 100644 --- a/pkg/scheduler/api/queue_info/quota_info.go +++ b/pkg/scheduler/api/queue_info/quota_info.go @@ -3,7 +3,10 @@ package queue_info -import "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" +import ( + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" + v1 "k8s.io/api/core/v1" +) type QueueQuota struct { GPU ResourceQuota `json:"gpu,omitempty"` @@ -20,18 +23,14 @@ type ResourceQuota struct { Limit float64 `json:"limit"` } -type QueueUsage struct { - GPU float64 `json:"gpu,omitempty"` - CPU float64 `json:"cpu,omitempty"` - Memory float64 `json:"memory,omitempty"` -} +type QueueUsage map[v1.ResourceName]float64 type ClusterUsage struct { - Queues map[common_info.QueueID]*QueueUsage `json:"queues"` + Queues map[common_info.QueueID]QueueUsage `json:"queues"` } func NewClusterUsage() *ClusterUsage { return &ClusterUsage{ - Queues: make(map[common_info.QueueID]*QueueUsage), + Queues: make(map[common_info.QueueID]QueueUsage), } } diff --git a/pkg/scheduler/cache/cluster_info/cluster_info_test.go b/pkg/scheduler/cache/cluster_info/cluster_info_test.go index bd49aeeb6..360aa2091 100644 --- a/pkg/scheduler/cache/cluster_info/cluster_info_test.go +++ b/pkg/scheduler/cache/cluster_info/cluster_info_test.go @@ -11,11 +11,11 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" - v1core "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" v12 "k8s.io/api/scheduling/v1" storage "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" @@ -71,13 +71,13 @@ func TestSnapshot(t *testing.T) { }{ "SingleFromEach": { kubeObjects: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, }, - &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pod", Namespace: "my-ns", }, @@ -85,7 +85,7 @@ func TestSnapshot(t *testing.T) { }, kaiSchedulerObjects: []runtime.Object{ &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-department", }, Spec: enginev2.QueueSpec{ @@ -93,7 +93,7 @@ func TestSnapshot(t *testing.T) { }, }, &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-queue", }, Spec: enginev2.QueueSpec{ @@ -101,7 +101,7 @@ func TestSnapshot(t *testing.T) { }, }, &schedulingv1alpha2.BindRequest{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pod-1234", Namespace: "my-ns", }, @@ -117,15 +117,15 @@ func TestSnapshot(t *testing.T) { }, "SingleFromEach2": { kubeObjects: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, }, }, kaiSchedulerObjects: []runtime.Object{ &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-department", }, Spec: enginev2.QueueSpec{ @@ -133,7 +133,7 @@ func TestSnapshot(t *testing.T) { }, }, &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-queue", }, Spec: enginev2.QueueSpec{ @@ -173,21 +173,21 @@ func TestSnapshotUsage(t *testing.T) { { name: "BasicUsage", usage: &queue_info.ClusterUsage{ - Queues: map[common_info.QueueID]*queue_info.QueueUsage{ + Queues: map[common_info.QueueID]queue_info.QueueUsage{ "queue-1": { - CPU: 10, - Memory: 10, - GPU: 10, + corev1.ResourceCPU: 10, + corev1.ResourceMemory: 10, + commonconstants.GpuResource: 10, }, }, }, err: nil, expectedUsage: &queue_info.ClusterUsage{ - Queues: map[common_info.QueueID]*queue_info.QueueUsage{ + Queues: map[common_info.QueueID]queue_info.QueueUsage{ "queue-1": { - CPU: 10, - Memory: 10, - GPU: 10, + corev1.ResourceCPU: 10, + corev1.ResourceMemory: 10, + commonconstants.GpuResource: 10, }, }, }, @@ -201,11 +201,11 @@ func TestSnapshotUsage(t *testing.T) { { name: "Error and usage", usage: &queue_info.ClusterUsage{ - Queues: map[common_info.QueueID]*queue_info.QueueUsage{ + Queues: map[common_info.QueueID]queue_info.QueueUsage{ "queue-1": { - CPU: 11, - Memory: 11, - GPU: 11, + corev1.ResourceCPU: 11, + corev1.ResourceMemory: 11, + commonconstants.GpuResource: 11, }, }, }, @@ -245,21 +245,21 @@ func TestSnapshotUsage(t *testing.T) { } func TestSnapshotNodes(t *testing.T) { - examplePod := &v1core.Pod{ - Spec: v1core.PodSpec{ + examplePod := &corev1.Pod{ + Spec: corev1.PodSpec{ NodeName: "node-1", - Containers: []v1core.Container{ + Containers: []corev1.Container{ { - Resources: v1core.ResourceRequirements{ - Requests: v1core.ResourceList{ + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ "cpu": resource.MustParse("2"), }, }, }, }, }, - Status: v1core.PodStatus{ - Phase: v1core.PodRunning, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, }, } exampleMIGPod := examplePod.DeepCopy() @@ -279,12 +279,12 @@ func TestSnapshotNodes(t *testing.T) { }{ "BasicUsage": { objs: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, @@ -295,18 +295,18 @@ func TestSnapshotNodes(t *testing.T) { { Name: "node-1", Idle: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("8"), }, ), Used: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("2"), "memory": resource.MustParse("0"), }, ), Releasing: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("0"), "memory": resource.MustParse("0"), }, @@ -317,12 +317,12 @@ func TestSnapshotNodes(t *testing.T) { }, "Finished job": { objs: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, @@ -333,18 +333,18 @@ func TestSnapshotNodes(t *testing.T) { { Name: "node-1", Idle: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("10"), }, ), Used: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("0"), "memory": resource.MustParse("0"), }, ), Releasing: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("0"), "memory": resource.MustParse("0"), }, @@ -355,28 +355,28 @@ func TestSnapshotNodes(t *testing.T) { }, "Filter Pods by nodepool": { objs: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", Labels: map[string]string{ defaultNodePoolName: "pool-a", }, }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, }, - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-2", Labels: map[string]string{ defaultNodePoolName: "pool-b", }, }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, @@ -388,18 +388,18 @@ func TestSnapshotNodes(t *testing.T) { { Name: "node-1", Idle: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("8"), }, ), Used: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("2"), "memory": resource.MustParse("0"), }, ), Releasing: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("0"), "memory": resource.MustParse("0"), }, @@ -411,12 +411,12 @@ func TestSnapshotNodes(t *testing.T) { }, "MIG Job": { objs: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), "nvidia.com/mig-1g.5gb": resource.MustParse("10"), }, @@ -429,20 +429,20 @@ func TestSnapshotNodes(t *testing.T) { { Name: "node-1", Idle: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("6"), "nvidia.com/mig-1g.5gb": resource.MustParse("6"), }, ), Used: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("4"), "memory": resource.MustParse("0"), "nvidia.com/mig-1g.5gb": resource.MustParse("4"), }, ), Releasing: resource_info.ResourceFromResourceList( - v1core.ResourceList{ + corev1.ResourceList{ "cpu": resource.MustParse("0"), "memory": resource.MustParse("0"), }, @@ -494,8 +494,8 @@ func TestBindRequests(t *testing.T) { examplePodName := "pod-1" namespace1 := "namespace-1" podGroupName := "podgroup-1" - examplePod := &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + examplePod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Name: examplePodName, Namespace: namespace1, UID: types.UID(examplePodName), @@ -503,24 +503,24 @@ func TestBindRequests(t *testing.T) { commonconstants.PodGroupAnnotationForPod: podGroupName, }, }, - Spec: v1core.PodSpec{ - Containers: []v1core.Container{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ { - Resources: v1core.ResourceRequirements{ - Requests: v1core.ResourceList{ + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ "cpu": resource.MustParse("2"), }, }, }, }, }, - Status: v1core.PodStatus{ - Phase: v1core.PodPending, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, }, } exampleQueue := &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "queue-0", }, } @@ -537,12 +537,12 @@ func TestBindRequests(t *testing.T) { }{ "Pod with PodGroup Waiting For Binding": { kubeObjects: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, @@ -552,7 +552,7 @@ func TestBindRequests(t *testing.T) { kaiSchedulerObjects: []runtime.Object{ exampleQueue, &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: podGroupName, Namespace: namespace1, }, @@ -561,7 +561,7 @@ func TestBindRequests(t *testing.T) { }, }, &schedulingv1alpha2.BindRequest{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pod-1234", Namespace: namespace1, }, @@ -583,12 +583,12 @@ func TestBindRequests(t *testing.T) { }, "Pod with PodGroup Waiting For Binding that is failing but no at backoff limit": { kubeObjects: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, @@ -598,7 +598,7 @@ func TestBindRequests(t *testing.T) { kaiSchedulerObjects: []runtime.Object{ exampleQueue, &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: podGroupName, Namespace: namespace1, }, @@ -607,7 +607,7 @@ func TestBindRequests(t *testing.T) { }, }, &schedulingv1alpha2.BindRequest{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pod-1234", Namespace: namespace1, }, @@ -634,12 +634,12 @@ func TestBindRequests(t *testing.T) { }, "Pod with PodGroup Waiting For Binding that is failing and reached backoff limit": { kubeObjects: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, @@ -649,7 +649,7 @@ func TestBindRequests(t *testing.T) { kaiSchedulerObjects: []runtime.Object{ exampleQueue, &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: podGroupName, Namespace: namespace1, }, @@ -658,7 +658,7 @@ func TestBindRequests(t *testing.T) { }, }, &schedulingv1alpha2.BindRequest{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pod-1234", Namespace: namespace1, }, @@ -686,18 +686,18 @@ func TestBindRequests(t *testing.T) { }, "Pod pending and BindRequest to a different pod": { kubeObjects: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, }, examplePod, - func() *v1core.Pod { + func() *corev1.Pod { pod := examplePod.DeepCopy() pod.Name = "not-" + examplePod.Name pod.UID = types.UID(fmt.Sprintf("not-%s", examplePod.UID)) @@ -707,7 +707,7 @@ func TestBindRequests(t *testing.T) { kaiSchedulerObjects: []runtime.Object{ exampleQueue, &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: podGroupName, Namespace: namespace1, }, @@ -716,7 +716,7 @@ func TestBindRequests(t *testing.T) { }, }, &schedulingv1alpha2.BindRequest{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("not-%s-1234", examplePodName), Namespace: namespace1, }, @@ -739,12 +739,12 @@ func TestBindRequests(t *testing.T) { }, "Pod pending and BindRequest to non existing node and is failed": { kubeObjects: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, @@ -754,7 +754,7 @@ func TestBindRequests(t *testing.T) { kaiSchedulerObjects: []runtime.Object{ exampleQueue, &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: podGroupName, Namespace: namespace1, }, @@ -763,7 +763,7 @@ func TestBindRequests(t *testing.T) { }, }, &schedulingv1alpha2.BindRequest{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pod-1234", Namespace: namespace1, }, @@ -791,12 +791,12 @@ func TestBindRequests(t *testing.T) { }, "Pod pending with stale bind request from another shard and node is not in our shard": { kubeObjects: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, @@ -806,7 +806,7 @@ func TestBindRequests(t *testing.T) { kaiSchedulerObjects: []runtime.Object{ exampleQueue, &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: podGroupName, Namespace: namespace1, }, @@ -815,7 +815,7 @@ func TestBindRequests(t *testing.T) { }, }, &schedulingv1alpha2.BindRequest{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pod-1234", Namespace: namespace1, Labels: map[string]string{ @@ -840,12 +840,12 @@ func TestBindRequests(t *testing.T) { }, "Pod pending with stale bind request from another shard and node is actually in our shard": { kubeObjects: []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, - Status: v1core.NodeStatus{ - Allocatable: v1core.ResourceList{ + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ "cpu": resource.MustParse("10"), }, }, @@ -855,7 +855,7 @@ func TestBindRequests(t *testing.T) { kaiSchedulerObjects: []runtime.Object{ exampleQueue, &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: podGroupName, Namespace: namespace1, }, @@ -864,7 +864,7 @@ func TestBindRequests(t *testing.T) { }, }, &schedulingv1alpha2.BindRequest{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pod-1234", Namespace: namespace1, Labels: map[string]string{ @@ -956,7 +956,7 @@ func TestSnapshotPodGroups(t *testing.T) { "BasicUsage": { objs: []runtime.Object{ &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "podGroup-0", UID: "ABC", }, @@ -966,8 +966,8 @@ func TestSnapshotPodGroups(t *testing.T) { }, }, kubeObjs: []runtime.Object{ - &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ commonconstants.PodGroupAnnotationForPod: "podGroup-0", }, @@ -991,7 +991,7 @@ func TestSnapshotPodGroups(t *testing.T) { "NotExistingQueue": { objs: []runtime.Object{ &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "podGroup-0", UID: "ABC", }, @@ -1001,8 +1001,8 @@ func TestSnapshotPodGroups(t *testing.T) { }, }, kubeObjs: []runtime.Object{ - &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ commonconstants.PodGroupAnnotationForPod: "podGroup-0", }, @@ -1014,7 +1014,7 @@ func TestSnapshotPodGroups(t *testing.T) { "filter unassigned pod groups - no scheduling backoff": { objs: []runtime.Object{ &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "podGroup-0", UID: "ABC", }, @@ -1044,7 +1044,7 @@ func TestSnapshotPodGroups(t *testing.T) { "filter unassigned pod groups - no scheduling conditions": { objs: []runtime.Object{ &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "podGroup-0", UID: "ABC", }, @@ -1067,7 +1067,7 @@ func TestSnapshotPodGroups(t *testing.T) { "filter unassigned pod groups - unschedulable in different nodepool": { objs: []runtime.Object{ &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "podGroup-0", UID: "ABC", }, @@ -1097,7 +1097,7 @@ func TestSnapshotPodGroups(t *testing.T) { "filter unassigned pod groups - unassigned": { objs: []runtime.Object{ &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "podGroup-0", UID: "ABC", }, @@ -1119,7 +1119,7 @@ func TestSnapshotPodGroups(t *testing.T) { "With sub groups": { objs: []runtime.Object{ &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "podGroup-0", UID: "ABC", }, @@ -1141,8 +1141,8 @@ func TestSnapshotPodGroups(t *testing.T) { }, }, kubeObjs: []runtime.Object{ - &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Namespace: testNamespace, Name: "pod-0", UID: types.UID(fmt.Sprintf("%s/pod-0", testNamespace)), @@ -1154,8 +1154,8 @@ func TestSnapshotPodGroups(t *testing.T) { }, }, }, - &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Namespace: testNamespace, Name: "pod-1", UID: types.UID(fmt.Sprintf("%s/pod-1", testNamespace)), @@ -1167,8 +1167,8 @@ func TestSnapshotPodGroups(t *testing.T) { }, }, }, - &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Namespace: testNamespace, Name: "pod-2", UID: types.UID(fmt.Sprintf("%s/pod-2", testNamespace)), @@ -1253,7 +1253,7 @@ func TestSnapshotPodGroups(t *testing.T) { func TestSnapshotQueues(t *testing.T) { objs := []runtime.Object{ &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "department0", }, Spec: enginev2.QueueSpec{ @@ -1266,7 +1266,7 @@ func TestSnapshotQueues(t *testing.T) { }, }, &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "department0-a", Labels: map[string]string{ nodePoolNameLabel: "nodepool-a", @@ -1281,7 +1281,7 @@ func TestSnapshotQueues(t *testing.T) { }, }, &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "queue0", Labels: map[string]string{}, }, @@ -1320,7 +1320,7 @@ func TestSnapshotQueues(t *testing.T) { func TestSnapshotFlatHierarchy(t *testing.T) { parentQueue0 := &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "department0", Labels: map[string]string{ nodePoolNameLabel: "nodepool-a", @@ -1340,7 +1340,7 @@ func TestSnapshotFlatHierarchy(t *testing.T) { parentQueue1.Name = "department1" queue0 := &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "queue0", Labels: map[string]string{ nodePoolNameLabel: "nodepool-a", @@ -1351,7 +1351,7 @@ func TestSnapshotFlatHierarchy(t *testing.T) { }, } queue1 := &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "queue1", Labels: map[string]string{ nodePoolNameLabel: "nodepool-a", @@ -1412,7 +1412,7 @@ func TestSnapshotFlatHierarchy(t *testing.T) { func TestGetPodGroupPriority(t *testing.T) { kubeObjects := []runtime.Object{ &v12.PriorityClass{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-priority", }, Value: 2, @@ -1439,49 +1439,49 @@ func TestGetPodGroupPriority(t *testing.T) { func TestSnapshotStorageObjects(t *testing.T) { kubeObjects := []runtime.Object{ &storage.CSIDriver{ - ObjectMeta: v1.ObjectMeta{Name: "csi-driver"}, + ObjectMeta: metav1.ObjectMeta{Name: "csi-driver"}, Spec: storage.CSIDriverSpec{ StorageCapacity: ptr.To(true), }, }, &storage.StorageClass{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "storage-class", }, Provisioner: "csi-driver", VolumeBindingMode: (*storage.VolumeBindingMode)(ptr.To(string(storage.VolumeBindingWaitForFirstConsumer))), }, &storage.StorageClass{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "non-csi-storage-class", }, Provisioner: "non-csi-driver", VolumeBindingMode: (*storage.VolumeBindingMode)(ptr.To(string(storage.VolumeBindingWaitForFirstConsumer))), }, &storage.StorageClass{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "immediate-binding-storage-class", }, Provisioner: "csi-driver", VolumeBindingMode: (*storage.VolumeBindingMode)(ptr.To(string(storage.VolumeBindingImmediate))), }, - &v1core.PersistentVolumeClaim{ - ObjectMeta: v1.ObjectMeta{ + &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ Name: nonOwnedClaimName, Namespace: testNamespace, UID: "csi-pvc-uid", }, - Spec: v1core.PersistentVolumeClaimSpec{ + Spec: corev1.PersistentVolumeClaimSpec{ StorageClassName: ptr.To("storage-class"), }, - Status: v1core.PersistentVolumeClaimStatus{Phase: v1core.ClaimBound}, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}, }, - &v1core.PersistentVolumeClaim{ - ObjectMeta: v1.ObjectMeta{ + &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ Name: ownedClaimName, Namespace: testNamespace, UID: "owned-csi-pvc-uid", - OwnerReferences: []v1.OwnerReference{ + OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", Kind: "pod", @@ -1490,13 +1490,13 @@ func TestSnapshotStorageObjects(t *testing.T) { }, }, }, - Spec: v1core.PersistentVolumeClaimSpec{ + Spec: corev1.PersistentVolumeClaimSpec{ StorageClassName: ptr.To("storage-class"), }, - Status: v1core.PersistentVolumeClaimStatus{Phase: v1core.ClaimBound}, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}, }, - &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Name: "owner-pod", Namespace: testNamespace, UID: "owner-pod-uid", @@ -1504,11 +1504,11 @@ func TestSnapshotStorageObjects(t *testing.T) { commonconstants.PodGroupAnnotationForPod: "podGroup-0", }, }, - Spec: v1core.PodSpec{ - Volumes: []v1core.Volume{ + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{ { - VolumeSource: v1core.VolumeSource{ - PersistentVolumeClaim: &v1core.PersistentVolumeClaimVolumeSource{ + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ ClaimName: ownedClaimName, }, }, @@ -1520,12 +1520,12 @@ func TestSnapshotStorageObjects(t *testing.T) { kubeAiSchedOjbs := []runtime.Object{ &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "queue-0", }, }, &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "podGroup-0", UID: "ABC", }, @@ -1537,7 +1537,7 @@ func TestSnapshotStorageObjects(t *testing.T) { kueueObjects := []runtime.Object{ &kueuev1alpha1.Topology{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "topology-0", }, }, @@ -1568,7 +1568,7 @@ func TestSnapshotStorageObjects(t *testing.T) { Name: nonOwnedClaimName, Namespace: testNamespace, Size: resource.NewQuantity(0, resource.BinarySI), - Phase: v1core.ClaimBound, + Phase: corev1.ClaimBound, StorageClass: "storage-class", PodOwnerReference: nil, DeletedOwner: true, @@ -1578,7 +1578,7 @@ func TestSnapshotStorageObjects(t *testing.T) { Name: ownedClaimName, Namespace: testNamespace, Size: resource.NewQuantity(0, resource.BinarySI), - Phase: v1core.ClaimBound, + Phase: corev1.ClaimBound, StorageClass: "storage-class", PodOwnerReference: &storageclaim_info.PodOwnerReference{ PodID: "owner-pod-uid", @@ -1614,7 +1614,7 @@ func TestGetPodGroupPriorityNotExistingPriority(t *testing.T) { func TestGetDefaultPriority(t *testing.T) { kubeObjects := []runtime.Object{ &v12.PriorityClass{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-priority", }, Value: 2, @@ -1638,7 +1638,7 @@ func TestGetDefaultPriority(t *testing.T) { func TestGetDefaultPriorityNotExists(t *testing.T) { kubeObjects := []runtime.Object{ &v12.PriorityClass{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-priority", }, Value: 2, @@ -1671,7 +1671,7 @@ func TestGetDefaultPriorityWithError(t *testing.T) { func TestPodGroupWithIndex(t *testing.T) { podGroup := &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ UID: "ABC", }, } @@ -1691,7 +1691,7 @@ func TestPodGroupWithIndex(t *testing.T) { func TestPodGroupWithIndexNonMatching(t *testing.T) { podGroup := &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ UID: "ABC", }, } @@ -1815,8 +1815,8 @@ func TestIsPodGroupUpForScheduler(t *testing.T) { func TestNotSchedulingPodWithTerminatingPVC(t *testing.T) { kubeObjects := []runtime.Object{ - &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Name: "pod-1", Namespace: "test", UID: "pod-1", @@ -1824,42 +1824,42 @@ func TestNotSchedulingPodWithTerminatingPVC(t *testing.T) { commonconstants.PodGroupAnnotationForPod: "podGroup-0", }, }, - Spec: v1core.PodSpec{ - Containers: []v1core.Container{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ { Name: "container-1", }, }, - Volumes: []v1core.Volume{ + Volumes: []corev1.Volume{ { Name: "pv-1", - VolumeSource: v1core.VolumeSource{ - PersistentVolumeClaim: &v1core.PersistentVolumeClaimVolumeSource{ + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ ClaimName: "pvc-1", }, }, }, }, }, - Status: v1core.PodStatus{ - Phase: v1core.PodPending, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, }, }, &storage.CSIDriver{ - ObjectMeta: v1.ObjectMeta{Name: "csi-driver"}, + ObjectMeta: metav1.ObjectMeta{Name: "csi-driver"}, Spec: storage.CSIDriverSpec{ StorageCapacity: ptr.To(true), }, }, &storage.StorageClass{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "storage-class", }, Provisioner: "csi-driver", VolumeBindingMode: (*storage.VolumeBindingMode)(ptr.To(string(storage.VolumeBindingWaitForFirstConsumer))), }, - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-1", Labels: map[string]string{ "kubernetes.io/hostname": "node-1", @@ -1867,10 +1867,10 @@ func TestNotSchedulingPodWithTerminatingPVC(t *testing.T) { }, }, &storage.CSIStorageCapacity{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "capacity-node-1", }, - NodeTopology: &v1.LabelSelector{ + NodeTopology: &metav1.LabelSelector{ MatchLabels: map[string]string{ "kubernetes.io/hostname": "node-1", }, @@ -1879,11 +1879,11 @@ func TestNotSchedulingPodWithTerminatingPVC(t *testing.T) { }, } - pvc := &v1core.PersistentVolumeClaim{ - ObjectMeta: v1.ObjectMeta{ + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ Name: "pvc-1", Namespace: "test", - OwnerReferences: []v1.OwnerReference{ + OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", Kind: "Pod", @@ -1892,23 +1892,23 @@ func TestNotSchedulingPodWithTerminatingPVC(t *testing.T) { }, }, }, - Spec: v1core.PersistentVolumeClaimSpec{ + Spec: corev1.PersistentVolumeClaimSpec{ VolumeName: "pv-1", StorageClassName: ptr.To("storage-class"), }, - Status: v1core.PersistentVolumeClaimStatus{ - Phase: v1core.ClaimPending, + Status: corev1.PersistentVolumeClaimStatus{ + Phase: corev1.ClaimPending, }, } kubeAiSchedOjbs := []runtime.Object{ &enginev2.Queue{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "queue-0", }, }, &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "podGroup-0", UID: "ABC", }, @@ -1951,7 +1951,7 @@ func TestNotSchedulingPodWithTerminatingPVC(t *testing.T) { func createFakePodGroup(name string, schedulingBackoff *int32, nodePoolName string, lastSchedulingCondition *enginev2alpha2.SchedulingCondition) *enginev2alpha2.PodGroup { result := &enginev2alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{}, Name: name, }, @@ -1989,21 +1989,21 @@ func TestSnapshotWithListerErrors(t *testing.T) { }, "twiceSamePod": { func(mdl *data_lister.MockDataLister) { - mdl.EXPECT().ListNodes().Return([]*v1core.Node{ + mdl.EXPECT().ListNodes().Return([]*corev1.Node{ { - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "node-0", }, }, }, nil) - mdl.EXPECT().ListPods().Return([]*v1core.Pod{ + mdl.EXPECT().ListPods().Return([]*corev1.Pod{ { - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pod", }, }, { - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pod", }, }, @@ -2014,16 +2014,16 @@ func TestSnapshotWithListerErrors(t *testing.T) { }, "listQueues": { func(mdl *data_lister.MockDataLister) { - mdl.EXPECT().ListNodes().Return([]*v1core.Node{}, nil) - mdl.EXPECT().ListPods().Return([]*v1core.Pod{}, nil) + mdl.EXPECT().ListNodes().Return([]*corev1.Node{}, nil) + mdl.EXPECT().ListPods().Return([]*corev1.Pod{}, nil) mdl.EXPECT().ListBindRequests().Return([]*schedulingv1alpha2.BindRequest{}, nil) mdl.EXPECT().ListQueues().Return(nil, fmt.Errorf(successErrorMsg)) }, }, "listPodGroups": { func(mdl *data_lister.MockDataLister) { - mdl.EXPECT().ListNodes().Return([]*v1core.Node{}, nil) - mdl.EXPECT().ListPods().Return([]*v1core.Pod{}, nil) + mdl.EXPECT().ListNodes().Return([]*corev1.Node{}, nil) + mdl.EXPECT().ListPods().Return([]*corev1.Pod{}, nil) mdl.EXPECT().ListBindRequests().Return([]*schedulingv1alpha2.BindRequest{}, nil) mdl.EXPECT().ListQueues().Return([]*enginev2.Queue{}, nil) mdl.EXPECT().ListResourceUsage().Return(nil, nil) @@ -2033,8 +2033,8 @@ func TestSnapshotWithListerErrors(t *testing.T) { }, "defaultPriorityClass": { func(mdl *data_lister.MockDataLister) { - mdl.EXPECT().ListNodes().Return([]*v1core.Node{}, nil) - mdl.EXPECT().ListPods().Return([]*v1core.Pod{}, nil) + mdl.EXPECT().ListNodes().Return([]*corev1.Node{}, nil) + mdl.EXPECT().ListPods().Return([]*corev1.Pod{}, nil) mdl.EXPECT().ListBindRequests().Return([]*schedulingv1alpha2.BindRequest{}, nil) mdl.EXPECT().ListQueues().Return([]*enginev2.Queue{}, nil) mdl.EXPECT().ListResourceUsage().Return(nil, nil) @@ -2043,12 +2043,12 @@ func TestSnapshotWithListerErrors(t *testing.T) { }, "getPriorityClassByNameAndPodByPodGroup": { func(mdl *data_lister.MockDataLister) { - mdl.EXPECT().ListNodes().Return([]*v1core.Node{}, nil) - mdl.EXPECT().ListPods().Return([]*v1core.Pod{}, nil) + mdl.EXPECT().ListNodes().Return([]*corev1.Node{}, nil) + mdl.EXPECT().ListPods().Return([]*corev1.Pod{}, nil) mdl.EXPECT().ListBindRequests().Return([]*schedulingv1alpha2.BindRequest{}, nil) mdl.EXPECT().ListQueues().Return([]*enginev2.Queue{ { - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "queue-0", }, Spec: enginev2.QueueSpec{ @@ -2056,7 +2056,7 @@ func TestSnapshotWithListerErrors(t *testing.T) { }, }, { - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "default", }, Spec: enginev2.QueueSpec{ @@ -2068,7 +2068,7 @@ func TestSnapshotWithListerErrors(t *testing.T) { mdl.EXPECT().ListPriorityClasses().Return([]*v12.PriorityClass{}, nil) mdl.EXPECT().ListPodGroups().Return([]*enginev2alpha2.PodGroup{ { - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "my-pg", }, Spec: enginev2alpha2.PodGroupSpec{Queue: "queue-0"}, @@ -2080,8 +2080,8 @@ func TestSnapshotWithListerErrors(t *testing.T) { }, "ListBindRequests": { func(mdl *data_lister.MockDataLister) { - mdl.EXPECT().ListPods().Return([]*v1core.Pod{}, nil) - mdl.EXPECT().ListNodes().Return([]*v1core.Node{}, nil) + mdl.EXPECT().ListPods().Return([]*corev1.Pod{}, nil) + mdl.EXPECT().ListNodes().Return([]*corev1.Node{}, nil) mdl.EXPECT().ListBindRequests().Return(nil, fmt.Errorf(successErrorMsg)) }, }, @@ -2215,41 +2215,41 @@ func newFakeClients(kubernetesObjects, kaiSchedulerObjects, kueueObjects []runti func TestSnapshotPodsInPartition(t *testing.T) { clusterObjects := []runtime.Object{ - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node1", Labels: map[string]string{ nodePoolNameLabel: "foo", }, }, }, - &v1core.Node{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ Name: "node2", Labels: map[string]string{ nodePoolNameLabel: "bar", }, }, }, - &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Name: "pod1", Labels: map[string]string{ nodePoolNameLabel: "foo", }, }, - Spec: v1core.PodSpec{ + Spec: corev1.PodSpec{ NodeName: "node1", }, }, - &v1core.Pod{ - ObjectMeta: v1.ObjectMeta{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ Name: "pod2", Labels: map[string]string{ nodePoolNameLabel: "bar", }, }, - Spec: v1core.PodSpec{ + Spec: corev1.PodSpec{ NodeName: "node2", }, }, @@ -2272,19 +2272,19 @@ func TestSnapshotPodsInPartition(t *testing.T) { assert.Equal(t, "pod1", snapshot.Pods[0].Name) } -func newCompletedPod(pod *v1core.Pod) *v1core.Pod { +func newCompletedPod(pod *corev1.Pod) *corev1.Pod { newPod := pod.DeepCopy() - newPod.Status.Phase = v1core.PodSucceeded - newPod.Status.Conditions = []v1core.PodCondition{ + newPod.Status.Phase = corev1.PodSucceeded + newPod.Status.Conditions = []corev1.PodCondition{ { - Type: v1core.PodReady, - Status: v1core.ConditionTrue, + Type: corev1.PodReady, + Status: corev1.ConditionTrue, }, } return newPod } -func newPodOnNode(pod *v1core.Pod, nodeName string) *v1core.Pod { +func newPodOnNode(pod *corev1.Pod, nodeName string) *corev1.Pod { newPod := pod.DeepCopy() newPod.Spec.NodeName = nodeName newPod.Name = fmt.Sprintf("%s-%s", pod.Name, nodeName) diff --git a/pkg/scheduler/cache/usagedb/api/defaults.go b/pkg/scheduler/cache/usagedb/api/defaults.go new file mode 100644 index 000000000..687b61a0e --- /dev/null +++ b/pkg/scheduler/cache/usagedb/api/defaults.go @@ -0,0 +1,74 @@ +// Copyright 2025 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package api + +import "time" + +func (up *UsageParams) SetDefaults() { + if up.HalfLifePeriod == nil { + // noop: disabled by default + } + if up.WindowSize == nil { + windowSize := time.Hour * 24 * 7 + up.WindowSize = &windowSize + } + if up.WindowType == nil { + windowType := SlidingWindow + up.WindowType = &windowType + } +} + +// WindowType defines the type of time window for aggregating usage data +type WindowType string + +const ( + // TumblingWindow represents non-overlapping, fixed-size time windows + // Example: 1-hour windows at 0-1h, 1-2h, 2-3h + TumblingWindow WindowType = "tumbling" + + // SlidingWindow represents overlapping time windows that slide forward + // Example: a 1-hour sliding window will consider the usage of the last 1 hour prior to the current time. + SlidingWindow WindowType = "sliding" +) + +// IsValid returns true if the WindowType is a valid value +func (wt WindowType) IsValid() bool { + switch wt { + case TumblingWindow, SlidingWindow: + return true + default: + return false + } +} + +func (up *UsageParams) GetExtraDurationParamOrDefault(key string, defaultValue time.Duration) time.Duration { + if up.ExtraParams == nil { + return defaultValue + } + + value, exists := up.ExtraParams[key] + if !exists { + return defaultValue + } + + duration, err := time.ParseDuration(value) + if err != nil { + return defaultValue + } + + return duration +} + +func (up *UsageParams) GetExtraStringParamOrDefault(key string, defaultValue string) string { + if up.ExtraParams == nil { + return defaultValue + } + + value, exists := up.ExtraParams[key] + if !exists { + return defaultValue + } + + return value +} diff --git a/pkg/scheduler/cache/usagedb/api/interface.go b/pkg/scheduler/cache/usagedb/api/interface.go index fd9cadd95..6f2329446 100644 --- a/pkg/scheduler/cache/usagedb/api/interface.go +++ b/pkg/scheduler/cache/usagedb/api/interface.go @@ -37,54 +37,6 @@ type UsageParams struct { WindowSize *time.Duration `yaml:"windowSize" json:"windowSize"` // Window type for time-series aggregation. If not set, defaults to sliding. WindowType *WindowType `yaml:"windowType" json:"windowType"` -} - -func (up *UsageParams) SetDefaults() { - if up.HalfLifePeriod == nil { - // noop: disabled by default - } - if up.WindowSize == nil { - windowSize := time.Hour * 24 * 7 - up.WindowSize = &windowSize - } - if up.WindowType == nil { - windowType := SlidingWindow - up.WindowType = &windowType - } -} - -// WindowType defines the type of time window for aggregating usage data -type WindowType string - -const ( - // TumblingWindow represents non-overlapping, fixed-size time windows - // Example: 1-hour windows at 0-1h, 1-2h, 2-3h - TumblingWindow WindowType = "tumbling" - - // SlidingWindow represents overlapping time windows that slide forward - // Example: a 1-hour sliding window will consider the usage of the last 1 hour prior to the current time. - SlidingWindow WindowType = "sliding" -) - -// IsValid returns true if the WindowType is a valid value -func (wt WindowType) IsValid() bool { - switch wt { - case TumblingWindow, SlidingWindow: - return true - default: - return false - } -} - -// GetDefaultWindowType returns the default window type (sliding) -func GetDefaultWindowType() WindowType { - return SlidingWindow -} - -// GetWindowTypeOrDefault returns the window type if set, otherwise returns the default (sliding) -func (up *UsageParams) GetWindowTypeOrDefault() WindowType { - if up.WindowType == nil { - return GetDefaultWindowType() - } - return *up.WindowType + // ExtraParams are extra parameters for the usage db client, which are client specific. + ExtraParams map[string]string `yaml:"extraParams" json:"extraParams"` } diff --git a/pkg/scheduler/cache/usagedb/api/usage_params_test.go b/pkg/scheduler/cache/usagedb/api/usage_params_test.go index e1367c5ae..8b485ca0f 100644 --- a/pkg/scheduler/cache/usagedb/api/usage_params_test.go +++ b/pkg/scheduler/cache/usagedb/api/usage_params_test.go @@ -119,8 +119,8 @@ func TestUsageParams_GetWindowTypeOrDefault(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := tt.input.GetWindowTypeOrDefault() - assert.Equal(t, tt.expected, result) + tt.input.SetDefaults() + assert.Equal(t, tt.expected, *tt.input.WindowType) }) } } @@ -281,17 +281,6 @@ func TestWindowType_IsValid(t *testing.T) { } } -func TestGetDefaultWindowType(t *testing.T) { - result := GetDefaultWindowType() - assert.Equal(t, SlidingWindow, result) -} - -func TestWindowTypeConstants(t *testing.T) { - // Test that constants have expected values - assert.Equal(t, "tumbling", string(TumblingWindow)) - assert.Equal(t, "sliding", string(SlidingWindow)) -} - func TestUsageParams_ZeroValues(t *testing.T) { // Test behavior with zero duration values zeroDuration := time.Duration(0) diff --git a/pkg/scheduler/cache/usagedb/hub.go b/pkg/scheduler/cache/usagedb/hub.go index 066a4a902..6362adc8e 100644 --- a/pkg/scheduler/cache/usagedb/hub.go +++ b/pkg/scheduler/cache/usagedb/hub.go @@ -11,13 +11,15 @@ import ( "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/api" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/fake" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/prometheus" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log" ) type GetClientFn func(connectionString string, usageParams *api.UsageParams) (api.Interface, error) var defaultClientMap = map[string]GetClientFn{ - "fake": fake.NewFakeClient, + "fake": fake.NewFakeClient, + "prometheus": prometheus.NewPrometheusClient, } type ClientResolver struct { diff --git a/pkg/scheduler/cache/usagedb/hub_test.go b/pkg/scheduler/cache/usagedb/hub_test.go index 70bbe6ec2..661ec9c28 100644 --- a/pkg/scheduler/cache/usagedb/hub_test.go +++ b/pkg/scheduler/cache/usagedb/hub_test.go @@ -36,15 +36,15 @@ func TestNewClientResolver(t *testing.T) { description string }{ { - name: "no overrides - should have default fake client", + name: "no overrides - should have default fake and prometheus clients", clientMapOverrides: nil, - expectedClientTypes: []string{"fake"}, - description: "When no overrides are provided, should contain only default fake client", + expectedClientTypes: []string{"fake", "prometheus"}, + description: "When no overrides are provided, should contain default fake and prometheus clients", }, { - name: "empty overrides - should have default fake client", + name: "empty overrides - should have default fake and prometheus clients", clientMapOverrides: map[string]GetClientFn{}, - expectedClientTypes: []string{"fake"}, + expectedClientTypes: []string{"fake", "prometheus"}, description: "Empty overrides map should result in default behavior", }, { @@ -52,16 +52,16 @@ func TestNewClientResolver(t *testing.T) { clientMapOverrides: map[string]GetClientFn{ "custom": mockClientFn, }, - expectedClientTypes: []string{"fake", "custom"}, - description: "Should contain both default fake and custom client", + expectedClientTypes: []string{"fake", "prometheus", "custom"}, + description: "Should contain default fake, prometheus, and custom clients", }, { name: "override existing client type", clientMapOverrides: map[string]GetClientFn{ "fake": mockClientFn, }, - expectedClientTypes: []string{"fake"}, - description: "Should override the default fake client with custom implementation", + expectedClientTypes: []string{"fake", "prometheus"}, + description: "Should override the default fake client with custom implementation but keep prometheus", }, { name: "multiple overrides", @@ -70,7 +70,7 @@ func TestNewClientResolver(t *testing.T) { "custom2": errorClientFn, "fake": mockClientFn, }, - expectedClientTypes: []string{"fake", "custom1", "custom2"}, + expectedClientTypes: []string{"fake", "prometheus", "custom1", "custom2"}, description: "Should contain all client types from overrides and defaults", }, } @@ -280,10 +280,6 @@ func TestClientResolver_GetClient(t *testing.T) { assert.NoError(t, err, "Unexpected error: %v", err) if tt.wantClient { assert.NotNil(t, client, "Expected client but got nil") - - // Verify the client implements the Interface - _, ok := client.(api.Interface) - assert.True(t, ok, "Client should implement api.Interface") } else { assert.Nil(t, client, "Expected nil client") } @@ -341,12 +337,14 @@ func TestClientResolver_GetClient_Integration(t *testing.T) { } func TestClientResolver_DefaultBehavior(t *testing.T) { - t.Run("default client map should contain fake", func(t *testing.T) { + t.Run("default client map should contain fake and prometheus", func(t *testing.T) { // Test the module-level defaultClientMap assert.NotNil(t, defaultClientMap) assert.Contains(t, defaultClientMap, "fake") - // Can't directly compare function pointers, but we can verify it's not nil + assert.Contains(t, defaultClientMap, "prometheus") + // Can't directly compare function pointers, but we can verify they're not nil assert.NotNil(t, defaultClientMap["fake"]) + assert.NotNil(t, defaultClientMap["prometheus"]) }) t.Run("resolver should clone default map", func(t *testing.T) { @@ -355,18 +353,21 @@ func TestClientResolver_DefaultBehavior(t *testing.T) { "custom": mockClientFn, }) - // Verify resolver1 has only fake - assert.Len(t, resolver1.clientMap, 1) + // Verify resolver1 has fake and prometheus + assert.Len(t, resolver1.clientMap, 2) assert.Contains(t, resolver1.clientMap, "fake") + assert.Contains(t, resolver1.clientMap, "prometheus") - // Verify resolver2 has both fake and custom - assert.Len(t, resolver2.clientMap, 2) + // Verify resolver2 has fake, prometheus, and custom + assert.Len(t, resolver2.clientMap, 3) assert.Contains(t, resolver2.clientMap, "fake") + assert.Contains(t, resolver2.clientMap, "prometheus") assert.Contains(t, resolver2.clientMap, "custom") // Verify that modifying one doesn't affect the default - assert.Len(t, defaultClientMap, 1) + assert.Len(t, defaultClientMap, 2) assert.Contains(t, defaultClientMap, "fake") + assert.Contains(t, defaultClientMap, "prometheus") }) } @@ -390,6 +391,13 @@ func TestGetClient(t *testing.T) { ConnectionString: "fake-connection", }, }, + { + name: "prometheus client", + config: &usagedbapi.UsageDBConfig{ + ClientType: "prometheus", + ConnectionString: "http://localhost:9090", + }, + }, { name: "unknown client type", config: &usagedbapi.UsageDBConfig{ diff --git a/pkg/scheduler/cache/usagedb/prometheus/prometheus.go b/pkg/scheduler/cache/usagedb/prometheus/prometheus.go new file mode 100644 index 000000000..c30716b7b --- /dev/null +++ b/pkg/scheduler/cache/usagedb/prometheus/prometheus.go @@ -0,0 +1,131 @@ +// Copyright 2025 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package prometheus + +import ( + "context" + "fmt" + "time" + + commonconstants "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/queue_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/api" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log" + promapi "github.com/prometheus/client_golang/api" + promv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + v1 "k8s.io/api/core/v1" +) + +var _ api.Interface = &PrometheusClient{} + +type PrometheusClient struct { + client promv1.API + promClient promapi.Client + usageParams *api.UsageParams + + // Extra params + usageQueryTimeout time.Duration + queryResolution time.Duration + allocationMetricsMap map[string]string +} + +func NewPrometheusClient(address string, params *api.UsageParams) (api.Interface, error) { + cfg := promapi.Config{ + Address: address, + } + + client, err := promapi.NewClient(cfg) + if err != nil { + return nil, fmt.Errorf("error creating prometheus client: %v", err) + } + + v1api := promv1.NewAPI(client) + + if params.WindowType != nil && *params.WindowType == api.TumblingWindow { + log.InfraLogger.V(3).Warnf("Tumbling window is not supported for prometheus client, using sliding window instead") + windowType := api.SlidingWindow + params.WindowType = &windowType + } + + usageQueryTimeout := params.GetExtraDurationParamOrDefault("usageQueryTimeout", 10*time.Second) + queryResolution := params.GetExtraDurationParamOrDefault("queryResolution", 1*time.Minute) + + allocationMetricsMap := map[string]string{ + "gpu": params.GetExtraStringParamOrDefault("gpuAllocationMetric", "kai_queue_allocated_gpus"), + "cpu": params.GetExtraStringParamOrDefault("cpuAllocationMetric", "kai_queue_allocated_cpu_cores"), + "memory": params.GetExtraStringParamOrDefault("memoryAllocationMetric", "kai_queue_allocated_memory_bytes"), + } + + return &PrometheusClient{ + client: v1api, + promClient: client, + usageParams: params, + + usageQueryTimeout: usageQueryTimeout, + queryResolution: queryResolution, + allocationMetricsMap: allocationMetricsMap, + }, nil +} + +func (p *PrometheusClient) GetResourceUsage() (*queue_info.ClusterUsage, error) { + ctx, cancel := context.WithTimeout(context.Background(), p.usageQueryTimeout) + defer cancel() + + usage := queue_info.NewClusterUsage() + + for _, resource := range []v1.ResourceName{commonconstants.GpuResource, v1.ResourceCPU, v1.ResourceMemory} { + resourceUsage, err := p.queryResourceUsage(ctx, p.allocationMetricsMap[string(resource)]) + if err != nil { + return nil, fmt.Errorf("error querying %s and usage: %v", resource, err) + } + for queueID, queueResourceUsage := range resourceUsage { + if _, exists := usage.Queues[queueID]; !exists { + usage.Queues[queueID] = queue_info.QueueUsage{} + } + usage.Queues[queueID][resource] = queueResourceUsage + } + } + + return usage, nil +} + +func (p *PrometheusClient) queryResourceUsage(ctx context.Context, allocationMetric string) (map[common_info.QueueID]float64, error) { + queueUsage := make(map[common_info.QueueID]float64) + + usageQuery := fmt.Sprintf("sum_over_time((%s)[%s:%s])", + allocationMetric, + p.usageParams.WindowSize.String(), + p.queryResolution.String(), + ) + + usageResult, warnings, err := p.client.Query(ctx, usageQuery, time.Now()) + if err != nil { + return nil, fmt.Errorf("error running query %s: %v", usageQuery, err) + } + + // Log warnings if exist + for _, w := range warnings { + log.InfraLogger.V(3).Warnf("Warning querying cluster usage metric %s: %s", allocationMetric, w) + } + + if usageResult.Type() != model.ValVector { + return nil, fmt.Errorf("unexpected query result: got %s, expected vector", usageResult.Type()) + } + + usageVector := usageResult.(model.Vector) + if len(usageVector) == 0 { + return nil, fmt.Errorf("no data returned for cluster usage metric %s", allocationMetric) + } + + for _, usageSample := range usageVector { + queueName := string(usageSample.Metric["queue_name"]) + value := float64(usageSample.Value) + + queueUsage[common_info.QueueID(queueName)] = value + } + + return queueUsage, nil +} diff --git a/pkg/scheduler/cache/usagedb/prometheus/prometheus_test.go b/pkg/scheduler/cache/usagedb/prometheus/prometheus_test.go new file mode 100644 index 000000000..27ea3c585 --- /dev/null +++ b/pkg/scheduler/cache/usagedb/prometheus/prometheus_test.go @@ -0,0 +1,54 @@ +// Copyright 2025 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package prometheus + +import ( + "testing" + + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/api" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewPrometheusClient(t *testing.T) { + tests := []struct { + name string + address string + expectError bool + errorContains string + }{ + { + name: "valid address", + address: "http://localhost:9090", + expectError: false, + errorContains: "", + }, + { + name: "invalid address", + address: "://invalid:9090", + expectError: true, + errorContains: "error creating prometheus client", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + params := &api.UsageParams{} + params.SetDefaults() + client, err := NewPrometheusClient(tt.address, params) + if tt.expectError { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errorContains) + assert.Nil(t, client) + } else { + require.NoError(t, err) + assert.NotNil(t, client) + promClient, ok := client.(*PrometheusClient) + require.True(t, ok) + assert.NotNil(t, promClient.client) + assert.NotNil(t, promClient.promClient) + } + }) + } +} diff --git a/pkg/scheduler/cache/usagedb/usagedb.go b/pkg/scheduler/cache/usagedb/usagedb.go index d7cdbc9df..1bc26de1a 100644 --- a/pkg/scheduler/cache/usagedb/usagedb.go +++ b/pkg/scheduler/cache/usagedb/usagedb.go @@ -11,6 +11,7 @@ import ( "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/queue_info" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/api" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/metrics" ) var defaultFetchInterval = 1 * time.Minute @@ -135,14 +136,13 @@ func (l *UsageLister) WaitForCacheSync(stopCh <-chan struct{}) bool { } func (l *UsageLister) fetchAndUpdateUsage() { - // TODO: Add metrics for fetch times + now := time.Now() usage, err := l.client.GetResourceUsage() if err != nil { log.InfraLogger.V(1).Errorf("failed to fetch usage data: %v", err) return } - - now := time.Now() + metrics.UpdateUsageQueryLatency(time.Since(now)) l.lastUsageDataMutex.Lock() defer l.lastUsageDataMutex.Unlock() diff --git a/pkg/scheduler/cache/usagedb/usagedb_test.go b/pkg/scheduler/cache/usagedb/usagedb_test.go index 1b8550812..f81d8e229 100644 --- a/pkg/scheduler/cache/usagedb/usagedb_test.go +++ b/pkg/scheduler/cache/usagedb/usagedb_test.go @@ -7,9 +7,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/queue_info" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/fake" - "github.com/stretchr/testify/assert" ) func TestNewUsageLister(t *testing.T) { @@ -87,14 +89,14 @@ func TestGetResourceUsage(t *testing.T) { name: "fresh data available", setupLister: func(l *UsageLister) { usage := queue_info.NewClusterUsage() - usage.Queues["queue1"] = &queue_info.QueueUsage{GPU: 5} + usage.Queues["queue1"] = queue_info.QueueUsage{constants.GpuResource: 5} now := time.Now() l.lastUsageData = usage l.lastUsageDataTime = &now }, wantUsage: func() *queue_info.ClusterUsage { usage := queue_info.NewClusterUsage() - usage.Queues["queue1"] = &queue_info.QueueUsage{GPU: 5} + usage.Queues["queue1"] = queue_info.QueueUsage{constants.GpuResource: 5} return usage }(), }, diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index e74e805ec..aef9d29e8 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -54,6 +54,7 @@ var ( queueCPUUsage *prometheus.GaugeVec queueMemoryUsage *prometheus.GaugeVec queueGPUUsage *prometheus.GaugeVec + usageQueryLatency *prometheus.HistogramVec ) func init() { @@ -175,21 +176,29 @@ func InitMetrics(namespace string) { queueCPUUsage = promauto.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, - Name: "queue_cpu_usage_cores", - Help: "CPU usage of queue, as a gauge. Value is proportional to cpu*hours usage with time decay applied", + Name: "queue_cpu_usage", + Help: "CPU usage of queue, as a gauge. Units depend on UsageDB configuration", }, []string{"queue_name"}) queueMemoryUsage = promauto.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, - Name: "queue_memory_usage_gb", - Help: "Memory usage of queue, as a gauge. Value is proportional to memory*hours usage with time decay applied", + Name: "queue_memory_usage", + Help: "Memory usage of queue, as a gauge. Units depend on UsageDB configuration", }, []string{"queue_name"}) queueGPUUsage = promauto.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, - Name: "queue_gpu_usage_devices", - Help: "GPU usage of queue, as a gauge. Value is proportional to gpu*hours usage with time decay applied", + Name: "queue_gpu_usage", + Help: "GPU usage of queue, as a gauge. Units depend on UsageDB configuration", }, []string{"queue_name"}) + + usageQueryLatency = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "usage_query_latency_milliseconds", + Help: "Usage database query latency histogram in milliseconds", + Buckets: prometheus.ExponentialBuckets(5, 2, 10), + }, []string{}) } // UpdateOpenSessionDuration updates latency for open session, including all plugins @@ -277,6 +286,10 @@ func ResetQueueUsage() { queueGPUUsage.Reset() } +func UpdateUsageQueryLatency(latency time.Duration) { + usageQueryLatency.WithLabelValues().Observe(float64(latency.Milliseconds())) +} + // RegisterPreemptionAttempts records number of attempts for preemption func RegisterPreemptionAttempts() { preemptionAttempts.Inc() diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 51c41b6d3..7640cf04b 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -312,7 +312,7 @@ func (pp *proportionPlugin) createQueueResourceAttrs(ssn *framework.Session) { usage, found := ssn.ResourceUsage.Queues[queue.UID] if found { - queueAttributes.SetResourceUsage(*usage) + queueAttributes.SetResourceUsage(usage) } pp.queues[queue.UID] = queueAttributes diff --git a/pkg/scheduler/plugins/proportion/resource_share/queue_resource_share.go b/pkg/scheduler/plugins/proportion/resource_share/queue_resource_share.go index a8c261041..b4a527841 100644 --- a/pkg/scheduler/plugins/proportion/resource_share/queue_resource_share.go +++ b/pkg/scheduler/plugins/proportion/resource_share/queue_resource_share.go @@ -7,6 +7,7 @@ import ( "math" "slices" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" commonconstants "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" @@ -185,14 +186,14 @@ func (qrs *QueueResourceShare) SetQuotaResources(resource ResourceName, deserved func (qrs *QueueResourceShare) GetResourceUsage() queue_info.QueueUsage { return queue_info.QueueUsage{ - GPU: qrs.GPU.Usage, - CPU: qrs.CPU.Usage, - Memory: qrs.Memory.Usage, + commonconstants.GpuResource: qrs.GPU.Usage, + v1.ResourceCPU: qrs.CPU.Usage, + v1.ResourceMemory: qrs.Memory.Usage, } } func (qrs *QueueResourceShare) SetResourceUsage(usage queue_info.QueueUsage) { - qrs.GPU.Usage = usage.GPU - qrs.CPU.Usage = usage.CPU - qrs.Memory.Usage = usage.Memory + qrs.GPU.Usage = usage[commonconstants.GpuResource] + qrs.CPU.Usage = usage[v1.ResourceCPU] + qrs.Memory.Usage = usage[v1.ResourceMemory] }