From 7da4d42c98e34c5eacbb14576895bdd038dc3001 Mon Sep 17 00:00:00 2001 From: itsomri Date: Tue, 11 Nov 2025 15:45:57 +0200 Subject: [PATCH 1/3] Fix serialization of conf object --- pkg/env-tests/timeaware/timeaware.go | 6 +- .../operands/scheduler/resources_test.go | 6 +- pkg/operator/operands/scheduler/scheduler.go | 15 ---- .../operands/scheduler/scheduler_test.go | 4 +- pkg/scheduler/cache/cache.go | 6 +- pkg/scheduler/cache/usagedb/api/defaults.go | 46 ++++++------ pkg/scheduler/cache/usagedb/api/interface.go | 70 +++++++++++++++---- .../cache/usagedb/api/usage_params_test.go | 65 +++++++++-------- .../cache/usagedb/fake/fake_with_history.go | 10 +-- pkg/scheduler/conf/scheduler_conf.go | 16 ++--- .../conf_util/scheduler_conf_util.go | 2 +- .../conf_util/scheduler_conf_util_test.go | 27 ++++--- 12 files changed, 157 insertions(+), 116 deletions(-) diff --git a/pkg/env-tests/timeaware/timeaware.go b/pkg/env-tests/timeaware/timeaware.go index 30ebf500a..7aca5ee37 100644 --- a/pkg/env-tests/timeaware/timeaware.go +++ b/pkg/env-tests/timeaware/timeaware.go @@ -120,9 +120,9 @@ func setupControllers(backgroundCtx context.Context, cfg *rest.Config, ClientType: "fake-with-history", ConnectionString: "fake-connection", UsageParams: &api.UsageParams{ - WindowSize: &[]time.Duration{time.Second * time.Duration(*windowSize)}[0], - FetchInterval: &[]time.Duration{time.Millisecond}[0], - HalfLifePeriod: &[]time.Duration{time.Second * time.Duration(*halfLifePeriod)}[0], + WindowSize: &metav1.Duration{Duration: time.Second * time.Duration(*windowSize)}, + FetchInterval: &metav1.Duration{Duration: time.Millisecond}, + HalfLifePeriod: &metav1.Duration{Duration: time.Second * time.Duration(*halfLifePeriod)}, }, } schedulerConf.UsageDBConfig.UsageParams.SetDefaults() diff --git a/pkg/operator/operands/scheduler/resources_test.go b/pkg/operator/operands/scheduler/resources_test.go index bf299defc..4f1c069a3 100644 --- a/pkg/operator/operands/scheduler/resources_test.go +++ b/pkg/operator/operands/scheduler/resources_test.go @@ -187,7 +187,7 @@ func TestValidateJobDepthMap(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - innerConfig := config{ + innerConfig := conf.SchedulerConfiguration{ Actions: strings.Join(tt.actions, ", "), } @@ -454,7 +454,7 @@ tiers: require.True(t, found, "ConfigMap missing config.yaml") // Unmarshal expected YAML from test case - var expectedConfig config + var expectedConfig conf.SchedulerConfiguration if _, ok := tt.expected["config.yaml"]; !ok { t.Fatal("Test case must provide expected YAML for config.yaml") } @@ -462,7 +462,7 @@ tiers: require.NoError(t, err, "Failed to unmarshal expected config") // Unmarshal actual YAML from ConfigMap - var actualConfig config + var actualConfig conf.SchedulerConfiguration err = yaml.Unmarshal([]byte(actualYAML), &actualConfig) require.NoError(t, err, "Failed to unmarshal actual config") diff --git a/pkg/operator/operands/scheduler/scheduler.go b/pkg/operator/operands/scheduler/scheduler.go index 77066c41e..f577916dd 100644 --- a/pkg/operator/operands/scheduler/scheduler.go +++ b/pkg/operator/operands/scheduler/scheduler.go @@ -22,21 +22,6 @@ const ( defaultResourceName = "scheduler" ) -type config struct { - Actions string `yaml:"actions"` - Tiers []tier `yaml:"tiers,omitempty"` - QueueDepthPerAction map[string]int `yaml:"queueDepthPerAction,omitempty"` -} - -type tier struct { - Plugins []plugin `yaml:"plugins"` -} - -type plugin struct { - Name string `yaml:"name"` - Arguments map[string]string `yaml:"arguments,omitempty"` -} - type SchedulerForShard struct { schedulingShard *kaiv1.SchedulingShard diff --git a/pkg/operator/operands/scheduler/scheduler_test.go b/pkg/operator/operands/scheduler/scheduler_test.go index 3a69495ba..4e0a6299b 100644 --- a/pkg/operator/operands/scheduler/scheduler_test.go +++ b/pkg/operator/operands/scheduler/scheduler_test.go @@ -137,7 +137,7 @@ var _ = Describe("Scheduler", func() { cm := cmObj.(*v1.ConfigMap) Expect(err).To(BeNil()) - Expect(cm.Data["config.yaml"]).To(Equal(`actions: allocate, consolidation, reclaim, preempt, stalegangeviction + Expect(cm.Data["config.yaml"]).To(MatchYAML(`actions: allocate, consolidation, reclaim, preempt, stalegangeviction tiers: - plugins: - name: predicates @@ -176,7 +176,7 @@ tiers: cm := cmObj.(*v1.ConfigMap) Expect(err).To(BeNil()) - Expect(cm.Data["config.yaml"]).To(Equal(`actions: allocate, reclaim, preempt, stalegangeviction + Expect(cm.Data["config.yaml"]).To(MatchYAML(`actions: allocate, reclaim, preempt, stalegangeviction tiers: - plugins: - name: predicates diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index f5225ed67..3571907b6 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -169,9 +169,9 @@ func newSchedulerCache(schedulerCacheParams *SchedulerCacheParams) *SchedulerCac if schedulerCacheParams.UsageDBClient != nil { sc.usageLister = usagedb.NewUsageLister(schedulerCacheParams.UsageDBClient, - schedulerCacheParams.UsageDBParams.FetchInterval, - schedulerCacheParams.UsageDBParams.StalenessPeriod, - schedulerCacheParams.UsageDBParams.WaitTimeout) + &schedulerCacheParams.UsageDBParams.FetchInterval.Duration, + &schedulerCacheParams.UsageDBParams.StalenessPeriod.Duration, + &schedulerCacheParams.UsageDBParams.WaitTimeout.Duration) } clusterInfo, err := cluster_info.New(sc.informerFactory, sc.kubeAiSchedulerInformerFactory, sc.kueueInformerFactory, sc.usageLister, sc.schedulingNodePoolParams, diff --git a/pkg/scheduler/cache/usagedb/api/defaults.go b/pkg/scheduler/cache/usagedb/api/defaults.go index 644234a72..7adc12d32 100644 --- a/pkg/scheduler/cache/usagedb/api/defaults.go +++ b/pkg/scheduler/cache/usagedb/api/defaults.go @@ -3,31 +3,31 @@ package api -import "time" +import ( + "time" -func (up *UsageParams) SetDefaults() { - if up.HalfLifePeriod == nil { + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (p *UsageParams) SetDefaults() { + if p.HalfLifePeriod == nil { // noop: disabled by default } - if up.WindowSize == nil { - windowSize := time.Hour * 24 * 7 - up.WindowSize = &windowSize + if p.WindowSize == nil { + p.WindowSize = &metav1.Duration{Duration: time.Hour * 24 * 7} } - if up.WindowType == nil { + if p.WindowType == nil { windowType := SlidingWindow - up.WindowType = &windowType + p.WindowType = &windowType } - if up.FetchInterval == nil { - fetchInterval := 1 * time.Minute - up.FetchInterval = &fetchInterval + if p.FetchInterval == nil { + p.FetchInterval = &metav1.Duration{Duration: 1 * time.Minute} } - if up.StalenessPeriod == nil { - stalenessPeriod := 5 * time.Minute - up.StalenessPeriod = &stalenessPeriod + if p.StalenessPeriod == nil { + p.StalenessPeriod = &metav1.Duration{Duration: 5 * time.Minute} } - if up.WaitTimeout == nil { - waitTimeout := 1 * time.Minute - up.WaitTimeout = &waitTimeout + if p.WaitTimeout == nil { + p.WaitTimeout = &metav1.Duration{Duration: 1 * time.Minute} } } @@ -54,12 +54,12 @@ func (wt WindowType) IsValid() bool { } } -func (up *UsageParams) GetExtraDurationParamOrDefault(key string, defaultValue time.Duration) time.Duration { - if up.ExtraParams == nil { +func (p *UsageParams) GetExtraDurationParamOrDefault(key string, defaultValue time.Duration) time.Duration { + if p.ExtraParams == nil { return defaultValue } - value, exists := up.ExtraParams[key] + value, exists := p.ExtraParams[key] if !exists { return defaultValue } @@ -72,12 +72,12 @@ func (up *UsageParams) GetExtraDurationParamOrDefault(key string, defaultValue t return duration } -func (up *UsageParams) GetExtraStringParamOrDefault(key string, defaultValue string) string { - if up.ExtraParams == nil { +func (p *UsageParams) GetExtraStringParamOrDefault(key string, defaultValue string) string { + if p.ExtraParams == nil { return defaultValue } - value, exists := up.ExtraParams[key] + value, exists := p.ExtraParams[key] if !exists { return defaultValue } diff --git a/pkg/scheduler/cache/usagedb/api/interface.go b/pkg/scheduler/cache/usagedb/api/interface.go index befa0ae53..723d0e147 100644 --- a/pkg/scheduler/cache/usagedb/api/interface.go +++ b/pkg/scheduler/cache/usagedb/api/interface.go @@ -4,9 +4,8 @@ package api import ( - "time" - "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/queue_info" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type Interface interface { @@ -15,8 +14,8 @@ type Interface interface { type UsageDBConfig struct { ClientType string `yaml:"clientType" json:"clientType"` ConnectionString string `yaml:"connectionString" json:"connectionString"` - ConnectionStringEnvVar string `yaml:"connectionStringEnvVar" json:"connectionStringEnvVar"` - UsageParams *UsageParams `yaml:"usageParams" json:"usageParams"` + ConnectionStringEnvVar string `yaml:"connectionStringEnvVar,omitempty" json:"connectionStringEnvVar,omitempty"` + UsageParams *UsageParams `yaml:"usageParams,omitempty" json:"usageParams,omitempty"` } // GetUsageParams returns the usage params if set, and default params if not set. @@ -29,23 +28,70 @@ func (c *UsageDBConfig) GetUsageParams() *UsageParams { return &up } +func (c *UsageDBConfig) DeepCopy() *UsageDBConfig { + out := new(UsageDBConfig) + out.ClientType = c.ClientType + out.ConnectionString = c.ConnectionString + out.ConnectionStringEnvVar = c.ConnectionStringEnvVar + if c.UsageParams != nil { + out.UsageParams = c.UsageParams.DeepCopy() + } + return out +} + // UsageParams defines common params for all usage db clients. Some clients may not support all the params. type UsageParams struct { // Half life period of the usage. If not set, or set to 0, the usage will not be decayed. - HalfLifePeriod *time.Duration `yaml:"halfLifePeriod" json:"halfLifePeriod"` + HalfLifePeriod *metav1.Duration `yaml:"halfLifePeriod,omitempty" json:"halfLifePeriod,omitempty"` // Window size of the usage. Default is 1 week. - WindowSize *time.Duration `yaml:"windowSize" json:"windowSize"` + WindowSize *metav1.Duration `yaml:"windowSize,omitempty" json:"windowSize,omitempty"` // Window type for time-series aggregation. If not set, defaults to sliding. - WindowType *WindowType `yaml:"windowType" json:"windowType"` + WindowType *WindowType `yaml:"windowType,omitempty" json:"windowType,omitempty"` // A cron string used to determine when to reset resource usage for all queues. - TumblingWindowCronString string `yaml:"tumblingWindowCronString" json:"tumblingWindowCronString"` + TumblingWindowCronString string `yaml:"tumblingWindowCronString,omitempty" json:"tumblingWindowCronString,omitempty"` // Fetch interval of the usage. Default is 1 minute. - FetchInterval *time.Duration `yaml:"fetchInterval" json:"fetchInterval"` + FetchInterval *metav1.Duration `yaml:"fetchInterval,omitempty" json:"fetchInterval,omitempty"` // Staleness period of the usage. Default is 5 minutes. - StalenessPeriod *time.Duration `yaml:"stalenessPeriod" json:"stalenessPeriod"` + StalenessPeriod *metav1.Duration `yaml:"stalenessPeriod,omitempty" json:"stalenessPeriod,omitempty"` // Wait timeout of the usage. Default is 1 minute. - WaitTimeout *time.Duration `yaml:"waitTimeout" json:"waitTimeout"` + WaitTimeout *metav1.Duration `yaml:"waitTimeout,omitempty" json:"waitTimeout,omitempty"` // ExtraParams are extra parameters for the usage db client, which are client specific. - ExtraParams map[string]string `yaml:"extraParams" json:"extraParams"` + ExtraParams map[string]string `yaml:"extraParams,omitempty" json:"extraParams,omitempty"` +} + +func (p *UsageParams) DeepCopy() *UsageParams { + out := new(UsageParams) + if p.HalfLifePeriod != nil { + duration := *p.HalfLifePeriod + out.HalfLifePeriod = &duration + } + if p.WindowSize != nil { + duration := *p.WindowSize + out.WindowSize = &duration + } + if p.WindowType != nil { + windowType := *p.WindowType + out.WindowType = &windowType + } + out.TumblingWindowCronString = p.TumblingWindowCronString + if p.FetchInterval != nil { + duration := *p.FetchInterval + out.FetchInterval = &duration + } + if p.StalenessPeriod != nil { + duration := *p.StalenessPeriod + out.StalenessPeriod = &duration + } + if p.WaitTimeout != nil { + duration := *p.WaitTimeout + out.WaitTimeout = &duration + } + if p.ExtraParams != nil { + out.ExtraParams = make(map[string]string, len(p.ExtraParams)) + for k, v := range p.ExtraParams { + out.ExtraParams[k] = v + } + } + return out } diff --git a/pkg/scheduler/cache/usagedb/api/usage_params_test.go b/pkg/scheduler/cache/usagedb/api/usage_params_test.go index 8b485ca0f..3007ebc0f 100644 --- a/pkg/scheduler/cache/usagedb/api/usage_params_test.go +++ b/pkg/scheduler/cache/usagedb/api/usage_params_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestUsageParams_SetDefaults(t *testing.T) { @@ -22,29 +23,29 @@ func TestUsageParams_SetDefaults(t *testing.T) { input: &UsageParams{}, expected: &UsageParams{ HalfLifePeriod: nil, // should remain nil (disabled by default) - WindowSize: &[]time.Duration{time.Hour * 24 * 7}[0], + WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7}, WindowType: &[]WindowType{SlidingWindow}[0], }, }, { name: "params with half life set should preserve it", input: &UsageParams{ - HalfLifePeriod: &[]time.Duration{30 * time.Minute}[0], + HalfLifePeriod: &metav1.Duration{Duration: 30 * time.Minute}, }, expected: &UsageParams{ - HalfLifePeriod: &[]time.Duration{30 * time.Minute}[0], - WindowSize: &[]time.Duration{time.Hour * 24 * 7}[0], + HalfLifePeriod: &metav1.Duration{Duration: 30 * time.Minute}, + WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7}, WindowType: &[]WindowType{SlidingWindow}[0], }, }, { name: "params with window size set should preserve it", input: &UsageParams{ - WindowSize: &[]time.Duration{2 * time.Hour}[0], + WindowSize: &metav1.Duration{Duration: 2 * time.Hour}, }, expected: &UsageParams{ HalfLifePeriod: nil, - WindowSize: &[]time.Duration{2 * time.Hour}[0], + WindowSize: &metav1.Duration{Duration: 2 * time.Hour}, WindowType: &[]WindowType{SlidingWindow}[0], }, }, @@ -55,20 +56,20 @@ func TestUsageParams_SetDefaults(t *testing.T) { }, expected: &UsageParams{ HalfLifePeriod: nil, - WindowSize: &[]time.Duration{time.Hour * 24 * 7}[0], + WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7}, WindowType: &[]WindowType{TumblingWindow}[0], }, }, { name: "all params set should preserve all", input: &UsageParams{ - HalfLifePeriod: &[]time.Duration{45 * time.Minute}[0], - WindowSize: &[]time.Duration{3 * time.Hour}[0], + HalfLifePeriod: &metav1.Duration{Duration: 45 * time.Minute}, + WindowSize: &metav1.Duration{Duration: 3 * time.Hour}, WindowType: &[]WindowType{TumblingWindow}[0], }, expected: &UsageParams{ - HalfLifePeriod: &[]time.Duration{45 * time.Minute}[0], - WindowSize: &[]time.Duration{3 * time.Hour}[0], + HalfLifePeriod: &metav1.Duration{Duration: 45 * time.Minute}, + WindowSize: &metav1.Duration{Duration: 3 * time.Hour}, WindowType: &[]WindowType{TumblingWindow}[0], }, }, @@ -82,11 +83,11 @@ func TestUsageParams_SetDefaults(t *testing.T) { assert.Nil(t, tt.input.HalfLifePeriod) } else { require.NotNil(t, tt.input.HalfLifePeriod) - assert.Equal(t, *tt.expected.HalfLifePeriod, *tt.input.HalfLifePeriod) + assert.Equal(t, tt.expected.HalfLifePeriod.Duration, tt.input.HalfLifePeriod.Duration) } require.NotNil(t, tt.input.WindowSize) - assert.Equal(t, *tt.expected.WindowSize, *tt.input.WindowSize) + assert.Equal(t, tt.expected.WindowSize.Duration, tt.input.WindowSize.Duration) require.NotNil(t, tt.input.WindowType) assert.Equal(t, *tt.expected.WindowType, *tt.input.WindowType) @@ -140,7 +141,7 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) { }, expected: &UsageParams{ HalfLifePeriod: nil, - WindowSize: &[]time.Duration{time.Hour * 24 * 7}[0], + WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7}, WindowType: &[]WindowType{SlidingWindow}[0], }, }, @@ -153,7 +154,7 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) { }, expected: &UsageParams{ HalfLifePeriod: nil, - WindowSize: &[]time.Duration{time.Hour * 24 * 7}[0], + WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7}, WindowType: &[]WindowType{SlidingWindow}[0], }, }, @@ -163,12 +164,12 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) { ClientType: "prometheus", ConnectionString: "http://localhost:9090", UsageParams: &UsageParams{ - HalfLifePeriod: &[]time.Duration{30 * time.Minute}[0], + HalfLifePeriod: &metav1.Duration{Duration: 30 * time.Minute}, }, }, expected: &UsageParams{ - HalfLifePeriod: &[]time.Duration{30 * time.Minute}[0], - WindowSize: &[]time.Duration{time.Hour * 24 * 7}[0], + HalfLifePeriod: &metav1.Duration{Duration: 30 * time.Minute}, + WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7}, WindowType: &[]WindowType{SlidingWindow}[0], }, }, @@ -178,14 +179,14 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) { ClientType: "prometheus", ConnectionString: "http://localhost:9090", UsageParams: &UsageParams{ - HalfLifePeriod: &[]time.Duration{45 * time.Minute}[0], - WindowSize: &[]time.Duration{2 * time.Hour}[0], + HalfLifePeriod: &metav1.Duration{Duration: 45 * time.Minute}, + WindowSize: &metav1.Duration{Duration: 2 * time.Hour}, WindowType: &[]WindowType{TumblingWindow}[0], }, }, expected: &UsageParams{ - HalfLifePeriod: &[]time.Duration{45 * time.Minute}[0], - WindowSize: &[]time.Duration{2 * time.Hour}[0], + HalfLifePeriod: &metav1.Duration{Duration: 45 * time.Minute}, + WindowSize: &metav1.Duration{Duration: 2 * time.Hour}, WindowType: &[]WindowType{TumblingWindow}[0], }, }, @@ -200,11 +201,11 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) { assert.Nil(t, result.HalfLifePeriod) } else { require.NotNil(t, result.HalfLifePeriod) - assert.Equal(t, *tt.expected.HalfLifePeriod, *result.HalfLifePeriod) + assert.Equal(t, tt.expected.HalfLifePeriod.Duration, result.HalfLifePeriod.Duration) } require.NotNil(t, result.WindowSize) - assert.Equal(t, *tt.expected.WindowSize, *result.WindowSize) + assert.Equal(t, tt.expected.WindowSize.Duration, result.WindowSize.Duration) require.NotNil(t, result.WindowType) assert.Equal(t, *tt.expected.WindowType, *result.WindowType) @@ -215,7 +216,7 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) { func TestUsageDBConfig_GetUsageParams_ImmutableOriginal(t *testing.T) { // Test that GetUsageParams doesn't modify the original config originalParams := &UsageParams{ - HalfLifePeriod: &[]time.Duration{30 * time.Minute}[0], + HalfLifePeriod: &metav1.Duration{Duration: 30 * time.Minute}, } config := &UsageDBConfig{ @@ -227,12 +228,11 @@ func TestUsageDBConfig_GetUsageParams_ImmutableOriginal(t *testing.T) { result := config.GetUsageParams() // Modify the result - newWindowSize := 5 * time.Hour - result.WindowSize = &newWindowSize + result.WindowSize = &metav1.Duration{Duration: 5 * time.Hour} // Original should remain unchanged assert.Nil(t, originalParams.WindowSize) - assert.Equal(t, 30*time.Minute, *originalParams.HalfLifePeriod) + assert.Equal(t, 30*time.Minute, originalParams.HalfLifePeriod.Duration) } func TestWindowType_IsValid(t *testing.T) { @@ -283,20 +283,19 @@ func TestWindowType_IsValid(t *testing.T) { func TestUsageParams_ZeroValues(t *testing.T) { // Test behavior with zero duration values - zeroDuration := time.Duration(0) params := &UsageParams{ - HalfLifePeriod: &zeroDuration, - WindowSize: &zeroDuration, + HalfLifePeriod: &metav1.Duration{Duration: time.Duration(0)}, + WindowSize: &metav1.Duration{Duration: time.Duration(0)}, } params.SetDefaults() // Zero values should be preserved, not replaced with defaults require.NotNil(t, params.HalfLifePeriod) - assert.Equal(t, time.Duration(0), *params.HalfLifePeriod) + assert.Equal(t, time.Duration(0), params.HalfLifePeriod.Duration) require.NotNil(t, params.WindowSize) - assert.Equal(t, time.Duration(0), *params.WindowSize) + assert.Equal(t, time.Duration(0), params.WindowSize.Duration) require.NotNil(t, params.WindowType) assert.Equal(t, SlidingWindow, *params.WindowType) diff --git a/pkg/scheduler/cache/usagedb/fake/fake_with_history.go b/pkg/scheduler/cache/usagedb/fake/fake_with_history.go index 9311824ec..7131d9aaf 100644 --- a/pkg/scheduler/cache/usagedb/fake/fake_with_history.go +++ b/pkg/scheduler/cache/usagedb/fake/fake_with_history.go @@ -6,12 +6,12 @@ package fake import ( "math" "sync" - "time" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/queue_info" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/api" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type FakeUsageDBClient struct { @@ -49,7 +49,7 @@ func (f *FakeUsageDBClient) GetResourceUsage() (*queue_info.ClusterUsage, error) usage := queue_info.NewClusterUsage() var windowStart, windowEnd int - size := f.usageParams.WindowSize.Seconds() + size := f.usageParams.WindowSize.Duration.Seconds() l := len(f.allocationHistory) if l == 0 { return usage, nil @@ -108,8 +108,8 @@ func (f *FakeUsageDBClient) AppendQueueAllocation(queueAllocations map[common_in type AllocationHistory []map[common_info.QueueID]queue_info.QueueUsage type ClusterCapacityHistory []map[v1.ResourceName]float64 -func getDecaySlice(length int, period *time.Duration) []float64 { - if period == nil || period.Seconds() == 0 { +func getDecaySlice(length int, period *metav1.Duration) []float64 { + if period == nil || period.Duration.Seconds() == 0 { decaySlice := make([]float64, length) for i := range decaySlice { decaySlice[i] = 1 @@ -117,7 +117,7 @@ func getDecaySlice(length int, period *time.Duration) []float64 { return decaySlice } - seconds := period.Seconds() + seconds := period.Duration.Seconds() decaySlice := make([]float64, length) for i := range decaySlice { val := math.Pow(0.5, float64(length-i)/seconds) diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index 396ae0fa9..d58e7a219 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -69,21 +69,21 @@ type PluginOption struct { // The name of Plugin Name string `yaml:"name" json:"name"` // JobOrderDisabled defines whether jobOrderFn is disabled - JobOrderDisabled bool `yaml:"disableJobOrder" json:"disableJobOrder"` + JobOrderDisabled bool `yaml:"disableJobOrder,omitempty" json:"disableJobOrder,omitempty"` // TaskOrderDisabled defines whether taskOrderFn is disabled - TaskOrderDisabled bool `yaml:"disableTaskOrder" json:"disableTaskOrder"` + TaskOrderDisabled bool `yaml:"disableTaskOrder,omitempty" json:"disableTaskOrder,omitempty"` // PreemptableDisabled defines whether preemptableFn is disabled - PreemptableDisabled bool `yaml:"disablePreemptable" json:"disablePreemptable"` + PreemptableDisabled bool `yaml:"disablePreemptable,omitempty" json:"disablePreemptable,omitempty"` // ReclaimableDisabled defines whether reclaimableFn is disabled - ReclaimableDisabled bool `yaml:"disableReclaimable" json:"disableReclaimable"` + ReclaimableDisabled bool `yaml:"disableReclaimable,omitempty" json:"disableReclaimable,omitempty"` // QueueOrderDisabled defines whether queueOrderFn is disabled - QueueOrderDisabled bool `yaml:"disableQueueOrder" json:"disableQueueOrder"` + QueueOrderDisabled bool `yaml:"disableQueueOrder,omitempty" json:"disableQueueOrder,omitempty"` // PredicateDisabled defines whether predicateFn is disabled - PredicateDisabled bool `yaml:"disablePredicate" json:"disablePredicate"` + PredicateDisabled bool `yaml:"disablePredicate,omitempty" json:"disablePredicate,omitempty"` // NodeOrderDisabled defines whether NodeOrderFn is disabled - NodeOrderDisabled bool `yaml:"disableNodeOrder" json:"disableNodeOrder"` + NodeOrderDisabled bool `yaml:"disableNodeOrder,omitempty" json:"disableNodeOrder,omitempty"` // Arguments defines the different arguments that can be given to different plugins - Arguments map[string]string `yaml:"arguments" json:"arguments"` + Arguments map[string]string `yaml:"arguments,omitempty" json:"arguments,omitempty"` } type SchedulingNodePoolParams struct { diff --git a/pkg/scheduler/conf_util/scheduler_conf_util.go b/pkg/scheduler/conf_util/scheduler_conf_util.go index 7273cf560..d7f361ebc 100644 --- a/pkg/scheduler/conf_util/scheduler_conf_util.go +++ b/pkg/scheduler/conf_util/scheduler_conf_util.go @@ -24,7 +24,7 @@ import ( "io/ioutil" "strings" - "gopkg.in/yaml.v2" + "sigs.k8s.io/yaml" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework" diff --git a/pkg/scheduler/conf_util/scheduler_conf_util_test.go b/pkg/scheduler/conf_util/scheduler_conf_util_test.go index 081153116..8e088925f 100644 --- a/pkg/scheduler/conf_util/scheduler_conf_util_test.go +++ b/pkg/scheduler/conf_util/scheduler_conf_util_test.go @@ -8,7 +8,7 @@ import ( "reflect" "testing" - "gopkg.in/yaml.v2" + "sigs.k8s.io/yaml" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf" @@ -53,8 +53,7 @@ func TestResolveConfigurationFromFile(t *testing.T) { { Plugins: []conf.PluginOption{ { - Name: "n1", - Arguments: map[string]string{}, + Name: "n1", }, }, }, @@ -87,13 +86,11 @@ func TestResolveConfigurationFromFile(t *testing.T) { { Plugins: []conf.PluginOption{ { - Name: "n1", - Arguments: map[string]string{}, + Name: "n1", }, }, }, }, - QueueDepthPerAction: nil, }, wantErr: false, }, @@ -133,8 +130,22 @@ func TestResolveConfigurationFromFile(t *testing.T) { t.Errorf("ResolveConfigurationFromFile() error = %v, wantErr %v", err, tt.wantErr) return } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("ResolveConfigurationFromFile() got = %v, want %v", got, tt.want) + if !reflect.DeepEqual(*got, *tt.want) { + t.Errorf("ResolveConfigurationFromFile() got: \n%+v, want: \n%+v", *got, *tt.want) + t.Errorf("Actions equal: %v", got.Actions == tt.want.Actions) + t.Errorf("Tiers equal: %v", reflect.DeepEqual(got.Tiers, tt.want.Tiers)) + t.Errorf("QueueDepthPerAction equal: %v", reflect.DeepEqual(got.QueueDepthPerAction, tt.want.QueueDepthPerAction)) + t.Errorf("UsageDBConfig equal: %v", reflect.DeepEqual(got.UsageDBConfig, tt.want.UsageDBConfig)) + if len(got.Tiers) > 0 && len(tt.want.Tiers) > 0 { + t.Errorf("First tier plugins equal: %v", reflect.DeepEqual(got.Tiers[0].Plugins, tt.want.Tiers[0].Plugins)) + if len(got.Tiers[0].Plugins) > 0 && len(tt.want.Tiers[0].Plugins) > 0 { + t.Errorf("First plugin Arguments: got=%v (nil=%v), want=%v (nil=%v)", + got.Tiers[0].Plugins[0].Arguments, + got.Tiers[0].Plugins[0].Arguments == nil, + tt.want.Tiers[0].Plugins[0].Arguments, + tt.want.Tiers[0].Plugins[0].Arguments == nil) + } + } } }) } From 7c869e24d69a3bf54b1e8109ccfd813a610afb4b Mon Sep 17 00:00:00 2001 From: itsomri Date: Tue, 11 Nov 2025 16:15:22 +0200 Subject: [PATCH 2/3] Fixed types --- .../operands/scheduler/resources_for_shard.go | 21 ++++++++++--------- .../operands/scheduler/resources_test.go | 1 + .../cache/usagedb/prometheus/prometheus.go | 6 +++--- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/pkg/operator/operands/scheduler/resources_for_shard.go b/pkg/operator/operands/scheduler/resources_for_shard.go index ae5e7888c..f69a2e342 100644 --- a/pkg/operator/operands/scheduler/resources_for_shard.go +++ b/pkg/operator/operands/scheduler/resources_for_shard.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" + "github.com/spf13/pflag" "golang.org/x/exp/slices" "gopkg.in/yaml.v3" @@ -22,7 +23,7 @@ import ( kaiv1 "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1" kaiConfigUtils "github.com/NVIDIA/KAI-scheduler/pkg/operator/config" "github.com/NVIDIA/KAI-scheduler/pkg/operator/operands/common" - "github.com/spf13/pflag" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf" ) const ( @@ -125,7 +126,7 @@ func (s *SchedulerForShard) configMapForShard( APIVersion: "v1", } placementArguments := calculatePlacementArguments(shard.Spec.PlacementStrategy) - innerConfig := config{} + innerConfig := conf.SchedulerConfiguration{} actions := []string{"allocate"} if placementArguments[gpuResource] != spreadStrategy && placementArguments[cpuResource] != spreadStrategy { @@ -135,9 +136,9 @@ func (s *SchedulerForShard) configMapForShard( innerConfig.Actions = strings.Join(actions, ", ") - innerConfig.Tiers = []tier{ + innerConfig.Tiers = []conf.Tier{ { - Plugins: []plugin{ + Plugins: []conf.PluginOption{ {Name: "predicates"}, {Name: "proportion"}, {Name: "priority"}, @@ -160,8 +161,8 @@ func (s *SchedulerForShard) configMapForShard( innerConfig.Tiers[0].Plugins = append( innerConfig.Tiers[0].Plugins, - plugin{Name: fmt.Sprintf("gpu%s", strings.Replace(placementArguments[gpuResource], "bin", "", 1))}, - plugin{ + conf.PluginOption{Name: fmt.Sprintf("gpu%s", strings.Replace(placementArguments[gpuResource], "bin", "", 1))}, + conf.PluginOption{ Name: "nodeplacement", Arguments: placementArguments, }, @@ -170,7 +171,7 @@ func (s *SchedulerForShard) configMapForShard( if placementArguments[gpuResource] == binpackStrategy { innerConfig.Tiers[0].Plugins = append( innerConfig.Tiers[0].Plugins, - plugin{Name: "gpusharingorder"}, + conf.PluginOption{Name: "gpusharingorder"}, ) } @@ -195,7 +196,7 @@ func (s *SchedulerForShard) configMapForShard( return schedulerConfig, nil } -func validateJobDepthMap(shard *kaiv1.SchedulingShard, innerConfig config, actions []string) error { +func validateJobDepthMap(shard *kaiv1.SchedulingShard, innerConfig conf.SchedulerConfiguration, actions []string) error { for actionToConfigure := range shard.Spec.QueueDepthPerAction { if !slices.Contains(actions, actionToConfigure) { return fmt.Errorf(invalidJobDepthMapError, innerConfig.Actions, actionToConfigure) @@ -294,12 +295,12 @@ func calculatePlacementArguments(placementStrategy *kaiv1.PlacementStrategy) map } } -func addMinRuntimePluginIfNeeded(plugins *[]plugin, minRuntime *kaiv1.MinRuntime) { +func addMinRuntimePluginIfNeeded(plugins *[]conf.PluginOption, minRuntime *kaiv1.MinRuntime) { if minRuntime == nil || (minRuntime.PreemptMinRuntime == nil && minRuntime.ReclaimMinRuntime == nil) { return } - minRuntimePlugin := plugin{Name: "minruntime", Arguments: map[string]string{}} + minRuntimePlugin := conf.PluginOption{Name: "minruntime", Arguments: map[string]string{}} if minRuntime.PreemptMinRuntime != nil { minRuntimePlugin.Arguments["defaultPreemptMinRuntime"] = *minRuntime.PreemptMinRuntime diff --git a/pkg/operator/operands/scheduler/resources_test.go b/pkg/operator/operands/scheduler/resources_test.go index 4f1c069a3..ebf729e91 100644 --- a/pkg/operator/operands/scheduler/resources_test.go +++ b/pkg/operator/operands/scheduler/resources_test.go @@ -17,6 +17,7 @@ import ( kaiv1 "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1" kaiv1qc "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1/queue_controller" kaiv1scheduler "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1/scheduler" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/pkg/scheduler/cache/usagedb/prometheus/prometheus.go b/pkg/scheduler/cache/usagedb/prometheus/prometheus.go index cb673e268..a8b4536e7 100644 --- a/pkg/scheduler/cache/usagedb/prometheus/prometheus.go +++ b/pkg/scheduler/cache/usagedb/prometheus/prometheus.go @@ -139,7 +139,7 @@ func (p *PrometheusClient) GetResourceUsage() (*queue_info.ClusterUsage, error) func (p *PrometheusClient) queryResourceCapacity(ctx context.Context, capacityMetric string, queryByWindow usageWindowQueryFunction) (float64, error) { decayedCapacityMetric := capacityMetric if p.usageParams.HalfLifePeriod != nil { - decayedCapacityMetric = fmt.Sprintf("((%s) * (%s))", capacityMetric, getExponentialDecayQuery(p.usageParams.HalfLifePeriod)) + decayedCapacityMetric = fmt.Sprintf("((%s) * (%s))", capacityMetric, getExponentialDecayQuery(&p.usageParams.HalfLifePeriod.Duration)) } capacityResult, warnings, err := queryByWindow(ctx, decayedCapacityMetric) @@ -170,7 +170,7 @@ func (p *PrometheusClient) queryResourceUsage( decayedAllocationMetric := allocationMetric if p.usageParams.HalfLifePeriod != nil { - decayedAllocationMetric = fmt.Sprintf("((%s) * (%s))", allocationMetric, getExponentialDecayQuery(p.usageParams.HalfLifePeriod)) + decayedAllocationMetric = fmt.Sprintf("((%s) * (%s))", allocationMetric, getExponentialDecayQuery(&p.usageParams.HalfLifePeriod.Duration)) } usageResult, warnings, err := queryByWindow(ctx, decayedAllocationMetric) @@ -226,7 +226,7 @@ func (p *PrometheusClient) queryTumblingTimeWindow(ctx context.Context, decayedA } func (p *PrometheusClient) getLatestUsageResetTime() time.Time { - maxWindowStartingPoint := time.Now().Add(-*p.usageParams.WindowSize) + maxWindowStartingPoint := time.Now().Add(-*&p.usageParams.WindowSize.Duration) lastUsageReset := maxWindowStartingPoint nextInWindowReset := maxWindowStartingPoint From 7ad90c5bc419da7bb3a79ae702e27857526621f7 Mon Sep 17 00:00:00 2001 From: itsomri Date: Tue, 11 Nov 2025 17:08:00 +0200 Subject: [PATCH 3/3] Fix pointer dereference --- pkg/scheduler/conf_util/scheduler_conf_util_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/conf_util/scheduler_conf_util_test.go b/pkg/scheduler/conf_util/scheduler_conf_util_test.go index 8e088925f..625cbff88 100644 --- a/pkg/scheduler/conf_util/scheduler_conf_util_test.go +++ b/pkg/scheduler/conf_util/scheduler_conf_util_test.go @@ -130,7 +130,7 @@ func TestResolveConfigurationFromFile(t *testing.T) { t.Errorf("ResolveConfigurationFromFile() error = %v, wantErr %v", err, tt.wantErr) return } - if !reflect.DeepEqual(*got, *tt.want) { + if !reflect.DeepEqual(got, tt.want) { t.Errorf("ResolveConfigurationFromFile() got: \n%+v, want: \n%+v", *got, *tt.want) t.Errorf("Actions equal: %v", got.Actions == tt.want.Actions) t.Errorf("Tiers equal: %v", reflect.DeepEqual(got.Tiers, tt.want.Tiers))