Skip to content

Commit 6656c1b

Browse files
committed
Add time aware configs to scheduling shard
1 parent 8e26997 commit 6656c1b

File tree

5 files changed

+175
-21
lines changed

5 files changed

+175
-21
lines changed

pkg/apis/kai/v1/schedulingshard_types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"k8s.io/utils/ptr"
2222

2323
"github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1/common"
24+
usagedbapi "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/api"
2425
)
2526

2627
const (
@@ -58,6 +59,14 @@ type SchedulingShardSpec struct {
5859
// MinRuntime specifies the minimum runtime of a jobs in the shard
5960
// +kubebuilder:validation:Optional
6061
MinRuntime *MinRuntime `json:"minRuntime,omitempty"`
62+
63+
// KValue specifies the kValue for the proportion plugin. Default is 1.0.
64+
// +kubebuilder:validation:Optional
65+
KValue *float64 `json:"kValue,omitempty"`
66+
67+
// UsageDBConfig defines configuration for the usage db client
68+
// +kubebuilder:validation:Optional
69+
UsageDBConfig *usagedbapi.UsageDBConfig `yaml:"usageDBConfig,omitempty" json:"usageDBConfig,omitempty"`
6170
}
6271

6372
func (s *SchedulingShardSpec) SetDefaultsWhereNeeded() {

pkg/apis/kai/v1/zz_generated.deepcopy.go

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/operator/operands/scheduler/resources_for_shard.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,18 @@ import (
1111

1212
"golang.org/x/exp/slices"
1313

14-
"gopkg.in/yaml.v3"
1514
v1 "k8s.io/api/apps/v1"
1615
corev1 "k8s.io/api/core/v1"
1716
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1817
"k8s.io/apimachinery/pkg/util/intstr"
1918
"sigs.k8s.io/controller-runtime/pkg/client"
19+
"sigs.k8s.io/yaml"
2020

2121
"github.com/NVIDIA/KAI-scheduler/cmd/scheduler/app/options"
2222
kaiv1 "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1"
2323
kaiConfigUtils "github.com/NVIDIA/KAI-scheduler/pkg/operator/config"
2424
"github.com/NVIDIA/KAI-scheduler/pkg/operator/operands/common"
25+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf"
2526
"github.com/spf13/pflag"
2627
)
2728

@@ -125,7 +126,7 @@ func (s *SchedulerForShard) configMapForShard(
125126
APIVersion: "v1",
126127
}
127128
placementArguments := calculatePlacementArguments(shard.Spec.PlacementStrategy)
128-
innerConfig := config{}
129+
innerConfig := conf.SchedulerConfiguration{}
129130

130131
actions := []string{"allocate"}
131132
if placementArguments[gpuResource] != spreadStrategy && placementArguments[cpuResource] != spreadStrategy {
@@ -135,11 +136,18 @@ func (s *SchedulerForShard) configMapForShard(
135136

136137
innerConfig.Actions = strings.Join(actions, ", ")
137138

138-
innerConfig.Tiers = []tier{
139+
var proportionArgs map[string]string
140+
if shard.Spec.KValue != nil {
141+
proportionArgs = map[string]string{
142+
"kValue": strconv.FormatFloat(*shard.Spec.KValue, 'f', -1, 64),
143+
}
144+
}
145+
146+
innerConfig.Tiers = []conf.Tier{
139147
{
140-
Plugins: []plugin{
148+
Plugins: []conf.PluginOption{
141149
{Name: "predicates"},
142-
{Name: "proportion"},
150+
{Name: "proportion", Arguments: proportionArgs},
143151
{Name: "priority"},
144152
{Name: "nodeavailability"},
145153
{Name: "resourcetype"},
@@ -160,8 +168,8 @@ func (s *SchedulerForShard) configMapForShard(
160168

161169
innerConfig.Tiers[0].Plugins = append(
162170
innerConfig.Tiers[0].Plugins,
163-
plugin{Name: fmt.Sprintf("gpu%s", strings.Replace(placementArguments[gpuResource], "bin", "", 1))},
164-
plugin{
171+
conf.PluginOption{Name: fmt.Sprintf("gpu%s", strings.Replace(placementArguments[gpuResource], "bin", "", 1))},
172+
conf.PluginOption{
165173
Name: "nodeplacement",
166174
Arguments: placementArguments,
167175
},
@@ -170,7 +178,7 @@ func (s *SchedulerForShard) configMapForShard(
170178
if placementArguments[gpuResource] == binpackStrategy {
171179
innerConfig.Tiers[0].Plugins = append(
172180
innerConfig.Tiers[0].Plugins,
173-
plugin{Name: "gpusharingorder"},
181+
conf.PluginOption{Name: "gpusharingorder"},
174182
)
175183
}
176184

@@ -184,6 +192,10 @@ func (s *SchedulerForShard) configMapForShard(
184192
innerConfig.QueueDepthPerAction = shard.Spec.QueueDepthPerAction
185193
}
186194

195+
if shard.Spec.UsageDBConfig != nil {
196+
innerConfig.UsageDBConfig = shard.Spec.UsageDBConfig
197+
}
198+
187199
data, marshalErr := yaml.Marshal(&innerConfig)
188200
if marshalErr != nil {
189201
return nil, marshalErr
@@ -195,7 +207,7 @@ func (s *SchedulerForShard) configMapForShard(
195207
return schedulerConfig, nil
196208
}
197209

198-
func validateJobDepthMap(shard *kaiv1.SchedulingShard, innerConfig config, actions []string) error {
210+
func validateJobDepthMap(shard *kaiv1.SchedulingShard, innerConfig conf.SchedulerConfiguration, actions []string) error {
199211
for actionToConfigure := range shard.Spec.QueueDepthPerAction {
200212
if !slices.Contains(actions, actionToConfigure) {
201213
return fmt.Errorf(invalidJobDepthMapError, innerConfig.Actions, actionToConfigure)
@@ -294,20 +306,21 @@ func calculatePlacementArguments(placementStrategy *kaiv1.PlacementStrategy) map
294306
}
295307
}
296308

297-
func addMinRuntimePluginIfNeeded(plugins *[]plugin, minRuntime *kaiv1.MinRuntime) {
309+
func addMinRuntimePluginIfNeeded(plugins *[]conf.PluginOption, minRuntime *kaiv1.MinRuntime) {
298310
if minRuntime == nil || (minRuntime.PreemptMinRuntime == nil && minRuntime.ReclaimMinRuntime == nil) {
299311
return
300312
}
301313

302-
minRuntimePlugin := plugin{Name: "minruntime", Arguments: map[string]string{}}
303-
314+
minRuntimeArgs := make(map[string]string)
304315
if minRuntime.PreemptMinRuntime != nil {
305-
minRuntimePlugin.Arguments["defaultPreemptMinRuntime"] = *minRuntime.PreemptMinRuntime
316+
minRuntimeArgs["defaultPreemptMinRuntime"] = *minRuntime.PreemptMinRuntime
306317
}
307318
if minRuntime.ReclaimMinRuntime != nil {
308-
minRuntimePlugin.Arguments["defaultReclaimMinRuntime"] = *minRuntime.ReclaimMinRuntime
319+
minRuntimeArgs["defaultReclaimMinRuntime"] = *minRuntime.ReclaimMinRuntime
309320
}
310321

322+
minRuntimePlugin := conf.PluginOption{Name: "minruntime", Arguments: minRuntimeArgs}
323+
311324
*plugins = append(*plugins, minRuntimePlugin)
312325
}
313326

pkg/operator/operands/scheduler/resources_test.go

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@ import (
88
"fmt"
99
"strings"
1010
"testing"
11-
12-
"gopkg.in/yaml.v3"
11+
"time"
1312

1413
"github.com/spf13/pflag"
1514

1615
"github.com/NVIDIA/KAI-scheduler/cmd/scheduler/app/options"
1716
kaiv1 "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1"
1817
kaiv1qc "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1/queue_controller"
1918
kaiv1scheduler "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1/scheduler"
19+
usagedbapi "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/api"
20+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf"
2021

2122
"github.com/stretchr/testify/assert"
2223
"github.com/stretchr/testify/require"
@@ -25,6 +26,7 @@ import (
2526
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2627
"k8s.io/utils/ptr"
2728
"sigs.k8s.io/controller-runtime/pkg/client/fake"
29+
"sigs.k8s.io/yaml"
2830
)
2931

3032
func TestDeploymentForShard(t *testing.T) {
@@ -431,6 +433,59 @@ tiers:
431433
},
432434
expectedErr: true,
433435
},
436+
{
437+
name: "usage DB configuration",
438+
config: &kaiv1.Config{
439+
Spec: kaiv1.ConfigSpec{},
440+
},
441+
shard: &kaiv1.SchedulingShard{
442+
Spec: kaiv1.SchedulingShardSpec{
443+
UsageDBConfig: &usagedbapi.UsageDBConfig{
444+
ClientType: "prometheus",
445+
ConnectionString: "http://prometheus-operated.kai-scheduler.svc.cluster.local:9090",
446+
UsageParams: &usagedbapi.UsageParams{
447+
HalfLifePeriod: &metav1.Duration{Duration: 10 * time.Minute},
448+
WindowSize: &metav1.Duration{Duration: 10 * time.Minute},
449+
WindowType: ptr.To(usagedbapi.SlidingWindow),
450+
},
451+
},
452+
},
453+
},
454+
expected: map[string]string{
455+
"config.yaml": `actions: allocate,consolidation,reclaim,preempt,stalegangeviction
456+
tiers:
457+
- plugins:
458+
- name: predicates
459+
- name: proportion
460+
- name: priority
461+
- name: nodeavailability
462+
- name: resourcetype
463+
- name: podaffinity
464+
- name: elastic
465+
- name: kubeflow
466+
- name: ray
467+
- name: subgrouporder
468+
- name: taskorder
469+
- name: nominatednode
470+
- name: dynamicresources
471+
- name: minruntime
472+
- name: topology
473+
- name: snapshot
474+
- name: gpupack
475+
- name: nodeplacement
476+
arguments:
477+
cpu: binpack
478+
gpu: binpack
479+
- name: gpusharingorder
480+
usageDBConfig:
481+
clientType: prometheus
482+
connectionString: http://prometheus-operated.kai-scheduler.svc.cluster.local:9090
483+
usageParams:
484+
halfLifePeriod: 10m
485+
windowSize: 10m
486+
windowType: sliding`,
487+
},
488+
},
434489
}
435490

436491
for _, tt := range tests {
@@ -606,3 +661,61 @@ func TestServiceAccountForScheduler(t *testing.T) {
606661
})
607662
}
608663
}
664+
665+
func TestMarshalingShardVsConfig(t *testing.T) {
666+
shardSpecString := `
667+
spec:
668+
partitionLabelValue: ""
669+
placementStrategy:
670+
cpu: binpack
671+
gpu: binpack
672+
usageDBConfig:
673+
clientType: prometheus
674+
connectionString: http://prometheus-operated.kai-scheduler.svc.cluster.local:9090
675+
usageParams:
676+
halfLifePeriod: 10m
677+
windowSize: 10m
678+
windowType: sliding
679+
`
680+
681+
shardSpec := &kaiv1.SchedulingShardSpec{}
682+
err := yaml.Unmarshal([]byte(shardSpecString), shardSpec)
683+
assert.NoError(t, err)
684+
685+
configString := `actions: allocate,consolidation,reclaim,preempt,stalegangeviction
686+
tiers:
687+
- plugins:
688+
- name: predicates
689+
- name: proportion
690+
- name: priority
691+
- name: nodeavailability
692+
- name: resourcetype
693+
- name: podaffinity
694+
- name: elastic
695+
- name: kubeflow
696+
- name: ray
697+
- name: subgrouporder
698+
- name: taskorder
699+
- name: nominatednode
700+
- name: dynamicresources
701+
- name: minruntime
702+
- name: topology
703+
- name: snapshot
704+
- name: gpupack
705+
- name: nodeplacement
706+
arguments:
707+
cpu: binpack
708+
gpu: binpack
709+
- name: gpusharingorder
710+
usageDBConfig:
711+
clientType: prometheus
712+
connectionString: http://prometheus-operated.kai-scheduler.svc.cluster.local:9090
713+
usageParams:
714+
halfLifePeriod: 10m
715+
windowSize: 10m
716+
windowType: sliding
717+
`
718+
config := &conf.SchedulerConfiguration{}
719+
err = yaml.Unmarshal([]byte(configString), config)
720+
assert.NoError(t, err)
721+
}

pkg/scheduler/cache/usagedb/prometheus/prometheus.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
1919
"github.com/prometheus/common/model"
2020
v1 "k8s.io/api/core/v1"
21+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2122
)
2223

2324
const (
@@ -189,7 +190,8 @@ func (p *PrometheusClient) queryResourceUsage(
189190

190191
usageVector := usageResult.(model.Vector)
191192
if len(usageVector) == 0 {
192-
return nil, fmt.Errorf("no data returned for cluster usage metric %s", decayedAllocationMetric)
193+
log.InfraLogger.V(3).Warnf("No data returned for cluster usage metric %s", decayedAllocationMetric)
194+
return queueUsage, nil
193195
}
194196

195197
for _, usageSample := range usageVector {
@@ -205,11 +207,14 @@ func (p *PrometheusClient) queryResourceUsage(
205207
func (p *PrometheusClient) querySlidingTimeWindow(ctx context.Context, decayedAllocationMetric string) (model.Value, promv1.Warnings, error) {
206208
usageQuery := fmt.Sprintf("sum_over_time((%s)[%s:%s])",
207209
decayedAllocationMetric,
208-
p.usageParams.WindowSize.String(),
210+
p.usageParams.WindowSize.Duration.String(),
209211
p.queryResolution.String(),
210212
)
211213

212214
usageResult, warnings, err := p.client.Query(ctx, usageQuery, time.Now())
215+
if err != nil {
216+
err = fmt.Errorf("%w, full query: %s", err, usageQuery)
217+
}
213218
return usageResult, warnings, err
214219
}
215220

@@ -222,11 +227,16 @@ func (p *PrometheusClient) queryTumblingTimeWindow(ctx context.Context, decayedA
222227
End: time.Now(),
223228
Step: p.queryResolution,
224229
})
230+
231+
if err != nil {
232+
err = fmt.Errorf("%w, full query: %s", err, usageQuery)
233+
}
234+
225235
return usageResult, warnings, err
226236
}
227237

228238
func (p *PrometheusClient) getLatestUsageResetTime() time.Time {
229-
maxWindowStartingPoint := time.Now().Add(-*p.usageParams.WindowSize)
239+
maxWindowStartingPoint := time.Now().Add(-p.usageParams.WindowSize.Duration)
230240
lastUsageReset := maxWindowStartingPoint
231241
nextInWindowReset := maxWindowStartingPoint
232242

@@ -237,12 +247,12 @@ func (p *PrometheusClient) getLatestUsageResetTime() time.Time {
237247
return lastUsageReset
238248
}
239249

240-
func getExponentialDecayQuery(halfLifePeriod *time.Duration) string {
250+
func getExponentialDecayQuery(halfLifePeriod *metav1.Duration) string {
241251
if halfLifePeriod == nil {
242252
return ""
243253
}
244254

245-
halfLifeSeconds := halfLifePeriod.Seconds()
255+
halfLifeSeconds := halfLifePeriod.Duration.Seconds()
246256
now := time.Now().Unix()
247257

248258
return fmt.Sprintf("0.5^((%d - time()) / %f)", now, halfLifeSeconds)

0 commit comments

Comments
 (0)