Skip to content

Commit d62e5fe

Browse files
committed
Add time aware configs to scheduling shard
1 parent 62e2e1c commit d62e5fe

File tree

5 files changed

+167
-14
lines changed

5 files changed

+167
-14
lines changed

pkg/apis/kai/v1/schedulingshard_types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"k8s.io/utils/ptr"
2222

2323
"github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1/common"
24+
usagedbapi "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/api"
2425
)
2526

2627
const (
@@ -58,6 +59,14 @@ type SchedulingShardSpec struct {
5859
// MinRuntime specifies the minimum runtime of a jobs in the shard
5960
// +kubebuilder:validation:Optional
6061
MinRuntime *MinRuntime `json:"minRuntime,omitempty"`
62+
63+
// KValue specifies the kValue for the proportion plugin. Default is 1.0.
64+
// +kubebuilder:validation:Optional
65+
KValue *float64 `json:"kValue,omitempty"`
66+
67+
// UsageDBConfig defines configuration for the usage db client
68+
// +kubebuilder:validation:Optional
69+
UsageDBConfig *usagedbapi.UsageDBConfig `yaml:"usageDBConfig,omitempty" json:"usageDBConfig,omitempty"`
6170
}
6271

6372
func (s *SchedulingShardSpec) SetDefaultsWhereNeeded() {

pkg/apis/kai/v1/zz_generated.deepcopy.go

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/operator/operands/scheduler/resources_for_shard.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ import (
1212
"github.com/spf13/pflag"
1313
"golang.org/x/exp/slices"
1414

15-
"gopkg.in/yaml.v3"
1615
v1 "k8s.io/api/apps/v1"
1716
corev1 "k8s.io/api/core/v1"
1817
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1918
"k8s.io/apimachinery/pkg/util/intstr"
2019
"sigs.k8s.io/controller-runtime/pkg/client"
20+
"sigs.k8s.io/yaml"
2121

2222
"github.com/NVIDIA/KAI-scheduler/cmd/scheduler/app/options"
2323
kaiv1 "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1"
@@ -136,11 +136,18 @@ func (s *SchedulerForShard) configMapForShard(
136136

137137
innerConfig.Actions = strings.Join(actions, ", ")
138138

139+
var proportionArgs map[string]string
140+
if shard.Spec.KValue != nil {
141+
proportionArgs = map[string]string{
142+
"kValue": strconv.FormatFloat(*shard.Spec.KValue, 'f', -1, 64),
143+
}
144+
}
145+
139146
innerConfig.Tiers = []conf.Tier{
140147
{
141148
Plugins: []conf.PluginOption{
142149
{Name: "predicates"},
143-
{Name: "proportion"},
150+
{Name: "proportion", Arguments: proportionArgs},
144151
{Name: "priority"},
145152
{Name: "nodeavailability"},
146153
{Name: "resourcetype"},
@@ -185,6 +192,10 @@ func (s *SchedulerForShard) configMapForShard(
185192
innerConfig.QueueDepthPerAction = shard.Spec.QueueDepthPerAction
186193
}
187194

195+
if shard.Spec.UsageDBConfig != nil {
196+
innerConfig.UsageDBConfig = shard.Spec.UsageDBConfig
197+
}
198+
188199
data, marshalErr := yaml.Marshal(&innerConfig)
189200
if marshalErr != nil {
190201
return nil, marshalErr
@@ -300,15 +311,17 @@ func addMinRuntimePluginIfNeeded(plugins *[]conf.PluginOption, minRuntime *kaiv1
300311
return
301312
}
302313

303-
minRuntimePlugin := conf.PluginOption{Name: "minruntime", Arguments: map[string]string{}}
314+
minRuntimeArgs := make(map[string]string)
304315

305316
if minRuntime.PreemptMinRuntime != nil {
306-
minRuntimePlugin.Arguments["defaultPreemptMinRuntime"] = *minRuntime.PreemptMinRuntime
317+
minRuntimeArgs["defaultPreemptMinRuntime"] = *minRuntime.PreemptMinRuntime
307318
}
308319
if minRuntime.ReclaimMinRuntime != nil {
309-
minRuntimePlugin.Arguments["defaultReclaimMinRuntime"] = *minRuntime.ReclaimMinRuntime
320+
minRuntimeArgs["defaultReclaimMinRuntime"] = *minRuntime.ReclaimMinRuntime
310321
}
311322

323+
minRuntimePlugin := conf.PluginOption{Name: "minruntime", Arguments: minRuntimeArgs}
324+
312325
*plugins = append(*plugins, minRuntimePlugin)
313326
}
314327

pkg/operator/operands/scheduler/resources_test.go

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ import (
88
"fmt"
99
"strings"
1010
"testing"
11-
12-
"gopkg.in/yaml.v3"
11+
"time"
1312

1413
"github.com/spf13/pflag"
1514

1615
"github.com/NVIDIA/KAI-scheduler/cmd/scheduler/app/options"
1716
kaiv1 "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1"
1817
kaiv1qc "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1/queue_controller"
1918
kaiv1scheduler "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1/scheduler"
19+
usagedbapi "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/api"
2020
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/conf"
2121

2222
"github.com/stretchr/testify/assert"
@@ -26,6 +26,7 @@ import (
2626
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2727
"k8s.io/utils/ptr"
2828
"sigs.k8s.io/controller-runtime/pkg/client/fake"
29+
"sigs.k8s.io/yaml"
2930
)
3031

3132
func TestDeploymentForShard(t *testing.T) {
@@ -432,6 +433,59 @@ tiers:
432433
},
433434
expectedErr: true,
434435
},
436+
{
437+
name: "usage DB configuration",
438+
config: &kaiv1.Config{
439+
Spec: kaiv1.ConfigSpec{},
440+
},
441+
shard: &kaiv1.SchedulingShard{
442+
Spec: kaiv1.SchedulingShardSpec{
443+
UsageDBConfig: &usagedbapi.UsageDBConfig{
444+
ClientType: "prometheus",
445+
ConnectionString: "http://prometheus-operated.kai-scheduler.svc.cluster.local:9090",
446+
UsageParams: &usagedbapi.UsageParams{
447+
HalfLifePeriod: &metav1.Duration{Duration: 10 * time.Minute},
448+
WindowSize: &metav1.Duration{Duration: 10 * time.Minute},
449+
WindowType: ptr.To(usagedbapi.SlidingWindow),
450+
},
451+
},
452+
},
453+
},
454+
expected: map[string]string{
455+
"config.yaml": `actions: allocate,consolidation,reclaim,preempt,stalegangeviction
456+
tiers:
457+
- plugins:
458+
- name: predicates
459+
- name: proportion
460+
- name: priority
461+
- name: nodeavailability
462+
- name: resourcetype
463+
- name: podaffinity
464+
- name: elastic
465+
- name: kubeflow
466+
- name: ray
467+
- name: subgrouporder
468+
- name: taskorder
469+
- name: nominatednode
470+
- name: dynamicresources
471+
- name: minruntime
472+
- name: topology
473+
- name: snapshot
474+
- name: gpupack
475+
- name: nodeplacement
476+
arguments:
477+
cpu: binpack
478+
gpu: binpack
479+
- name: gpusharingorder
480+
usageDBConfig:
481+
clientType: prometheus
482+
connectionString: http://prometheus-operated.kai-scheduler.svc.cluster.local:9090
483+
usageParams:
484+
halfLifePeriod: 10m
485+
windowSize: 10m
486+
windowType: sliding`,
487+
},
488+
},
435489
}
436490

437491
for _, tt := range tests {
@@ -607,3 +661,61 @@ func TestServiceAccountForScheduler(t *testing.T) {
607661
})
608662
}
609663
}
664+
665+
func TestMarshalingShardVsConfig(t *testing.T) {
666+
shardSpecString := `
667+
spec:
668+
partitionLabelValue: ""
669+
placementStrategy:
670+
cpu: binpack
671+
gpu: binpack
672+
usageDBConfig:
673+
clientType: prometheus
674+
connectionString: http://prometheus-operated.kai-scheduler.svc.cluster.local:9090
675+
usageParams:
676+
halfLifePeriod: 10m
677+
windowSize: 10m
678+
windowType: sliding
679+
`
680+
681+
shardSpec := &kaiv1.SchedulingShardSpec{}
682+
err := yaml.Unmarshal([]byte(shardSpecString), shardSpec)
683+
assert.NoError(t, err)
684+
685+
configString := `actions: allocate,consolidation,reclaim,preempt,stalegangeviction
686+
tiers:
687+
- plugins:
688+
- name: predicates
689+
- name: proportion
690+
- name: priority
691+
- name: nodeavailability
692+
- name: resourcetype
693+
- name: podaffinity
694+
- name: elastic
695+
- name: kubeflow
696+
- name: ray
697+
- name: subgrouporder
698+
- name: taskorder
699+
- name: nominatednode
700+
- name: dynamicresources
701+
- name: minruntime
702+
- name: topology
703+
- name: snapshot
704+
- name: gpupack
705+
- name: nodeplacement
706+
arguments:
707+
cpu: binpack
708+
gpu: binpack
709+
- name: gpusharingorder
710+
usageDBConfig:
711+
clientType: prometheus
712+
connectionString: http://prometheus-operated.kai-scheduler.svc.cluster.local:9090
713+
usageParams:
714+
halfLifePeriod: 10m
715+
windowSize: 10m
716+
windowType: sliding
717+
`
718+
config := &conf.SchedulerConfiguration{}
719+
err = yaml.Unmarshal([]byte(configString), config)
720+
assert.NoError(t, err)
721+
}

pkg/scheduler/cache/usagedb/prometheus/prometheus.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
1919
"github.com/prometheus/common/model"
2020
v1 "k8s.io/api/core/v1"
21+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2122
)
2223

2324
const (
@@ -139,7 +140,7 @@ func (p *PrometheusClient) GetResourceUsage() (*queue_info.ClusterUsage, error)
139140
func (p *PrometheusClient) queryResourceCapacity(ctx context.Context, capacityMetric string, queryByWindow usageWindowQueryFunction) (float64, error) {
140141
decayedCapacityMetric := capacityMetric
141142
if p.usageParams.HalfLifePeriod != nil {
142-
decayedCapacityMetric = fmt.Sprintf("((%s) * (%s))", capacityMetric, getExponentialDecayQuery(&p.usageParams.HalfLifePeriod.Duration))
143+
decayedCapacityMetric = fmt.Sprintf("((%s) * (%s))", capacityMetric, getExponentialDecayQuery(p.usageParams.HalfLifePeriod))
143144
}
144145

145146
capacityResult, warnings, err := queryByWindow(ctx, decayedCapacityMetric)
@@ -170,7 +171,7 @@ func (p *PrometheusClient) queryResourceUsage(
170171

171172
decayedAllocationMetric := allocationMetric
172173
if p.usageParams.HalfLifePeriod != nil {
173-
decayedAllocationMetric = fmt.Sprintf("((%s) * (%s))", allocationMetric, getExponentialDecayQuery(&p.usageParams.HalfLifePeriod.Duration))
174+
decayedAllocationMetric = fmt.Sprintf("((%s) * (%s))", allocationMetric, getExponentialDecayQuery(p.usageParams.HalfLifePeriod))
174175
}
175176

176177
usageResult, warnings, err := queryByWindow(ctx, decayedAllocationMetric)
@@ -189,7 +190,8 @@ func (p *PrometheusClient) queryResourceUsage(
189190

190191
usageVector := usageResult.(model.Vector)
191192
if len(usageVector) == 0 {
192-
return nil, fmt.Errorf("no data returned for cluster usage metric %s", decayedAllocationMetric)
193+
log.InfraLogger.V(3).Warnf("No data returned for cluster usage metric %s", decayedAllocationMetric)
194+
return queueUsage, nil
193195
}
194196

195197
for _, usageSample := range usageVector {
@@ -205,11 +207,14 @@ func (p *PrometheusClient) queryResourceUsage(
205207
func (p *PrometheusClient) querySlidingTimeWindow(ctx context.Context, decayedAllocationMetric string) (model.Value, promv1.Warnings, error) {
206208
usageQuery := fmt.Sprintf("sum_over_time((%s)[%s:%s])",
207209
decayedAllocationMetric,
208-
p.usageParams.WindowSize.String(),
210+
p.usageParams.WindowSize.Duration.String(),
209211
p.queryResolution.String(),
210212
)
211213

212214
usageResult, warnings, err := p.client.Query(ctx, usageQuery, time.Now())
215+
if err != nil {
216+
err = fmt.Errorf("%w, full query: %s", err, usageQuery)
217+
}
213218
return usageResult, warnings, err
214219
}
215220

@@ -222,11 +227,16 @@ func (p *PrometheusClient) queryTumblingTimeWindow(ctx context.Context, decayedA
222227
End: time.Now(),
223228
Step: p.queryResolution,
224229
})
230+
231+
if err != nil {
232+
err = fmt.Errorf("%w, full query: %s", err, usageQuery)
233+
}
234+
225235
return usageResult, warnings, err
226236
}
227237

228238
func (p *PrometheusClient) getLatestUsageResetTime() time.Time {
229-
maxWindowStartingPoint := time.Now().Add(-*&p.usageParams.WindowSize.Duration)
239+
maxWindowStartingPoint := time.Now().Add(-p.usageParams.WindowSize.Duration)
230240
lastUsageReset := maxWindowStartingPoint
231241
nextInWindowReset := maxWindowStartingPoint
232242

@@ -237,12 +247,12 @@ func (p *PrometheusClient) getLatestUsageResetTime() time.Time {
237247
return lastUsageReset
238248
}
239249

240-
func getExponentialDecayQuery(halfLifePeriod *time.Duration) string {
250+
func getExponentialDecayQuery(halfLifePeriod *metav1.Duration) string {
241251
if halfLifePeriod == nil {
242252
return ""
243253
}
244254

245-
halfLifeSeconds := halfLifePeriod.Seconds()
255+
halfLifeSeconds := halfLifePeriod.Duration.Seconds()
246256
now := time.Now().Unix()
247257

248258
return fmt.Sprintf("0.5^((%d - time()) / %f)", now, halfLifeSeconds)

0 commit comments

Comments
 (0)