Skip to content
3,073 changes: 3,073 additions & 0 deletions cmd/time-aware-simulator/examples/oscillating.csv

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion cmd/time-aware-simulator/examples/plot_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
parser = argparse.ArgumentParser(description='Plot simulation results from CSV file')
parser.add_argument('input', nargs='?', default='simulation_results.csv',
help='Path to the CSV file (default: simulation_results.csv)')
parser.add_argument('--output', '-o', type=str, default=None,
help='Save plot to PNG file instead of displaying it')
args = parser.parse_args()

df = pd.read_csv(args.input)
Expand Down Expand Up @@ -38,5 +40,10 @@
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

if args.output:
plt.savefig(args.output, dpi=300, bbox_inches='tight')
print(f"Plot saved to {args.output}")
else:
plt.show()

Binary file added cmd/time-aware-simulator/examples/results.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
53 changes: 53 additions & 0 deletions deployments/kai-scheduler/crds/kai.scheduler_schedulingshards.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ spec:
* Only valid flags defined in the scheduler's flag set will be accepted
* Duplicated flags will override the behavior of flags generated by other fields
type: object
kValue:
description: KValue specifies the kValue for the proportion plugin.
Default is 1.0.
type: number
minRuntime:
description: MinRuntime specifies the minimum runtime of a jobs in
the shard
Expand Down Expand Up @@ -87,6 +91,55 @@ spec:
description: QueueDepthPerAction max number of jobs to try for action
per queue
type: object
usageDBConfig:
description: UsageDBConfig defines configuration for the usage db
client
properties:
clientType:
type: string
connectionString:
type: string
connectionStringEnvVar:
type: string
usageParams:
description: UsageParams defines common params for all usage db
clients. Some clients may not support all the params.
properties:
extraParams:
additionalProperties:
type: string
description: ExtraParams are extra parameters for the usage
db client, which are client specific.
type: object
fetchInterval:
description: Fetch interval of the usage. Default is 1 minute.
type: string
halfLifePeriod:
description: Half life period of the usage. If not set, or
set to 0, the usage will not be decayed.
type: string
stalenessPeriod:
description: Staleness period of the usage. Default is 5 minutes.
type: string
tumblingWindowCronString:
description: A cron string used to determine when to reset
resource usage for all queues.
type: string
waitTimeout:
description: Wait timeout of the usage. Default is 1 minute.
type: string
windowSize:
description: Window size of the usage. Default is 1 week.
type: string
windowType:
description: Window type for time-series aggregation. If not
set, defaults to sliding.
type: string
type: object
required:
- clientType
- connectionString
type: object
type: object
status:
description: SchedulingShardStatus defines the observed state of SchedulingShard
Expand Down
190 changes: 190 additions & 0 deletions docs/timeaware/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Time Aware fairness

Time aware fairness is a feature in KAI-Scheduler which makes use of historical resource usage by queues for making allocation and reclaim decisions. Key features are:

1. Consider past usage for order of allocation: all else being equal, queues with higher past usage will get to run jobs after queues with lower usage
2. Reclaim based on usage: queues which are starved over time will reclaim resources from queues which used a lot of resources.
1. Note: this does not effect in-quota allocation: deserved quota still takes precedence over time-aware fairness


> **Prerequisites**: Familiarity with [fairness](../fairness/README.md)

## How it works

In high level: resource usage data in the cluster is collected and persisted in prometheus. It is then used by the scheduler to make resource fairness calculations: the more resources consumed by a queue, the less over-quota resources it will get compared to other queues. This will eventually result in the queues' over-quota resources being reclaimed by more starved queues, thus achieving a more fair allocation of resources over time.

### Resource usage data

Queue historical resource usage data is collected in a prometheus instance in the cluster *(external prometheus instances can be used - see [External prometheus](#external-prometheus))*. The scheduler configuration determines the time period that will be considered, as well as allows configuration for time-decay, which, if configured, gives more weight to recent usage than past usage.

The metrics are collected continuously: the pod-group-controller publishes resource usage for individual pod-groups on their status, which are then aggregated by the queue-controller and published as a metric, which gets collected and persisted by prometheus.

If configured, the scheduler applies an [exponential time decay](https://en.wikipedia.org/wiki/Exponential_decay) formula which is configured by a half-life period. This can be more intuitively understood with an example: for a half life of one hour, a usage (for example, 1 gpu-second) that occurred an hour ago will be considered half as significant as a gpu-second that was consumed just now.

Mathematically, the following formula is applied to historical usage:

$$U = 0.5^{\frac{\Delta{t}}{t_{1/2}}}*A$$

Where:

- $U$ is the usage
- $t_{1/2}$ is the half life constant set by the user
- $\Delta{t}$ is the time elapsed since that usage
- $A$ is the allocated resource

#### Normalization to cluster capacity

The aggregated usage for each queue is then normalized to the **cluster capacity** at the relevant time period: the scheduler looks at the available resources in the cluster for that time period, and normalizes all resource usage to it. For example, in a cluster with 10 GPUs, and considering a time period of 10 hours, a queue which consumed 24 GPU hours (wether it's 8 GPUs for 3 hours, or 12 GPUs for 2 hours), will get a normalized usage score of 0.24 (used 24 GPU hours out of a potential 100). This normalization ensures that a small amount of resource usage in a vacant cluster will not result in a heavy penalty.

### Effect on fair share

Usually, over quota resources is divided to each queue proportionally to it's Over Quota Weight. With time-aware fairness, queues with historical usage will get relatively less resources in over-quota. The significance of the resource usage in this calculation can be controlled with a parameter called "kValue": the bigger it is, the more significant the historical usage be.

Check out the [time aware simulator](../../cmd/time-aware-simulator/README.md) to understand scheduling behavior over time better.

### Example

The following plot demonstrates the GPU allocation over time in a 16 GPU cluster, with two queues, each having 0 deserved quota and 1 Over Quota weight for GPUs, each trying to run 16-GPU, single-pod Jobs.

![Time-aware fairness GPU allocation over time](./results.png)

*Time units are intentionally omitted*

## Setup and Configurations

> Note: this section is not finalized and is expected to change in an upcoming KAI release

### Enabling prometheus

> Using a kai-operated prometheus assumes that the [prometheus operator](https://prometheus-operator.dev/docs/getting-started/installation/) is installed in the cluster

To enable prometheus via kai-operator, apply the following patch:
```sh
kubectl patch config kai-config --type merge -p '{"spec":{"prometheus":{"enabled":true}}}'
```

You can also customize the following configurations:

```
externalPrometheusHealthProbe # defines the configuration for external Prometheus connectivity validation, with defaults.
externalPrometheusUrl # defines the URL of an external Prometheus instance to use. When set, KAI will not deploy its own Prometheus but will configure ServiceMonitors for the external instance and validate connectivity
retentionPeriod # defines how long to retain data (e.g., "2w", "1d", "30d")
sampleInterval # defines the interval of sampling (e.g., "1m", "30s", "5m")
serviceMonitor # defines ServiceMonitor configuration for KAI services
storageClassName # defines the name of the storageClass that will be used to store the TSDB data. defaults to "standard".
storageSize # defines the size of the storage (e.g., "20Gi", "30Gi")
```

If you choose to use your own prometheus, make sure that it's configured to watch the relevant service monitors with `accounting: kai` labels. For example:
``` yaml
apiVersion: monitoring.coreos.com/v1
kind: Prometheus
metadata:
name: external-prometheus
namespace: other-namespace
spec:
... # Other prometheus configurations..
serviceMonitorSelector:
matchLabels:
accounting: kai
...
```

### Scheduler configurations

In order to use time-aware fairness, you need to configure the scheduler to connect to prometheus. If using more than one scheduling shards in the cluster, each shard can be configured independently.

To edit the default scheduling shard:

```sh
kubectl edit schedulingshard default
```
*Replace `default` with the shard name if relevant*

Add the following section under `spec`:
```yaml
usageDBConfig:
clientType: prometheus
connectionString: http://prometheus-operated.kai-scheduler.svc.cluster.local:9090
usageParams:
halfLifePeriod: 10m # Change to the desired value
windowSize: 10m # Change to the desired value
windowType: sliding # Change to the desired value (sliding/tumbling)
```
*This configuration assumes using the kai operated prometheus. Change connectionString if relevant.*

Configure windowSize and halfLifePeriod to desired values.

### External prometheus

You can configure kai-scheduler to connect to any external DB that's compatible with the prometheus API - simply edit the connectionString accordingly. Note that it has to be accessible from the scheduler pod, and have access to queue controller and kube-state metrics.

### kValue

KValue is a parameter used by the proportion plugin to determine the significance of historical usage in fairness calculations - higher values mean more aggressive effects on fairness. To set it, add it to the scheduling shard spec:
```sh
kubectl edit schedulingshard default
```

```yaml
spec:
kValue: 0.5
```

### Advanced: overriding metrics

> *This configuration should not be changed under normal conditions*

In some cases, the admin might want to configure the scheduler to query different metrics for usage and capacity of certain resources. This can be done with the following config:

```sh
kubectl edit schedulingshard default
```

```yaml
usageDBConfig:
extraParams:
gpuAllocationMetric: kai_queue_allocated_gpus
cpuAllocationMetric: kai_queue_allocated_cpu_cores
memoryAllocationMetric: kai_queue_allocated_memory_bytes
gpuCapacityMetric: sum(kube_node_status_capacity{resource=\"nvidia_com_gpu\"})
cpuCapacityMetric: sum(kube_node_status_capacity{resource=\"cpu\"})
memoryCapacityMetric: sum(kube_node_status_capacity{resource=\"memory\"})
```

## Troubleshooting

### Dependencies

Before enabling prometheus in kai config, make sure that the prometheus is installed. If it's not, you will see the following condition in the kai config:

``` sh
kubectl describe config kai-config
```
```
Status:
Conditions:
...
Last Transition Time: 2025-11-10T11:25:48Z
Message: KAI-prometheus: no matches for kind "Prometheus" in version "monitoring.coreos.com/v1"
KAI-prometheus: not available
Observed Generation: 2
Reason: available
Status: False
Type: Available
```

Simply follow the [prometheus installation instructions](https://prometheus-operator.dev/docs/getting-started/installation/).

In order to collect cluster capacity metrics, [kube-state-metrics](https://artifacthub.io/packages/helm/prometheus-community/kube-state-metrics/) needs to be installed as well. By default, the kai operator creates a ServiceMonitor for it, assuming it's installed in `monitoring` or `default` namespace.

### Missing metrics

If the scheduler is unable to collect the usage metrics from prometheus, you will see a message in the logs, similar to this:

```
2025-11-10T12:33:07.318Z ERROR usagedb/usagedb.go:142 failed to fetch usage data: error querying nvidia.com/gpu and capacity: error querying cluster capacity metric ((sum(kube_node_status_capacity{resource="nvidia_com_gpu"})) * (0.5^((1762777987 - time()) / 600.000000))): bad_data: invalid parameter "query": 1:124: parse error: unexpected character in duration expression: '&'
```

Prometheus connectivity
Metrics availability
Binary file added docs/timeaware/results.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 9 additions & 0 deletions pkg/apis/kai/v1/schedulingshard_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/utils/ptr"

"github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1/common"
usagedbapi "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/cache/usagedb/api"
)

const (
Expand Down Expand Up @@ -58,6 +59,14 @@ type SchedulingShardSpec struct {
// MinRuntime specifies the minimum runtime of a jobs in the shard
// +kubebuilder:validation:Optional
MinRuntime *MinRuntime `json:"minRuntime,omitempty"`

// KValue specifies the kValue for the proportion plugin. Default is 1.0.
// +kubebuilder:validation:Optional
KValue *float64 `json:"kValue,omitempty"`

// UsageDBConfig defines configuration for the usage db client
// +kubebuilder:validation:Optional
UsageDBConfig *usagedbapi.UsageDBConfig `yaml:"usageDBConfig,omitempty" json:"usageDBConfig,omitempty"`
}

func (s *SchedulingShardSpec) SetDefaultsWhereNeeded() {
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/kai/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/env-tests/timeaware/timeaware.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ func setupControllers(backgroundCtx context.Context, cfg *rest.Config,
ClientType: "fake-with-history",
ConnectionString: "fake-connection",
UsageParams: &api.UsageParams{
WindowSize: &[]time.Duration{time.Second * time.Duration(*windowSize)}[0],
FetchInterval: &[]time.Duration{time.Millisecond}[0],
HalfLifePeriod: &[]time.Duration{time.Second * time.Duration(*halfLifePeriod)}[0],
WindowSize: &metav1.Duration{Duration: time.Second * time.Duration(*windowSize)},
FetchInterval: &metav1.Duration{Duration: time.Millisecond},
HalfLifePeriod: &metav1.Duration{Duration: time.Second * time.Duration(*halfLifePeriod)},
},
}
schedulerConf.UsageDBConfig.UsageParams.SetDefaults()
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/operands/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var KaiServicesForServiceMonitor = []struct {
Port string
JobLabel string
}{
{"queuecontroller", "metrics", "queuecontroller"},
{"queue-controller", "metrics", "queue-controller"},
}

func AllControllersAvailable(
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/operands/known_types/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ func getCurrentServiceMonitorState(ctx context.Context, runtimeClient client.Cli
if err != nil {
// If field indexer is not available, fall back to listing all ServiceMonitor resources
// and filter by owner reference manually
log.FromContext(ctx).Info("Failed to list ServiceMonitor. error: %v", err)
log.FromContext(ctx).Info("Failed to list ServiceMonitor", "error", err)
err = runtimeClient.List(ctx, serviceMonitorList)
if err != nil {
log.FromContext(ctx).Error(err, "Failed to manually list ServiceMonitor resource. error: %v", err)
log.FromContext(ctx).Error(err, "Failed to manually list ServiceMonitor resource", "error", err)
return nil, err
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/operator/operands/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ var _ = Describe("Prometheus", func() {
It("should return Prometheus object when Prometheus Operator is installed", func(ctx context.Context) {
objects, err := prometheus.DesiredState(ctx, fakeKubeClient, kaiConfig)
Expect(err).To(BeNil())
Expect(len(objects)).To(Equal(3)) // ServiceAccount, Prometheus, 1 ServiceMonitor
Expect(len(objects)).To(Equal(4)) // ServiceAccount, Prometheus, 2 ServiceMonitors

prometheusObj := test_utils.FindTypeInObjects[*monitoringv1.Prometheus](objects)
Expect(prometheusObj).NotTo(BeNil())
Expand All @@ -131,7 +131,7 @@ var _ = Describe("Prometheus", func() {

objects, err := prometheus.DesiredState(ctx, fakeKubeClient, kaiConfig)
Expect(err).To(BeNil())
Expect(len(objects)).To(Equal(3)) // ServiceAccount, Prometheus, 1 ServiceMonitor
Expect(len(objects)).To(Equal(4)) // ServiceAccount, Prometheus, 2 ServiceMonitor

prometheusObj := test_utils.FindTypeInObjects[*monitoringv1.Prometheus](objects)
Expect(prometheusObj).NotTo(BeNil())
Expand Down Expand Up @@ -474,11 +474,11 @@ var _ = Describe("prometheusForKAIConfig", func() {
// The function skips Prometheus CR creation and only creates ServiceMonitors
Expect(err).To(BeNil())
Expect(objects).NotTo(BeNil())
Expect(len(objects)).To(Equal(1)) // 1 ServiceMonitor
Expect(len(objects)).To(Equal(2)) // 2 ServiceMonitors

serviceMonitor := test_utils.FindTypeInObjects[*monitoringv1.ServiceMonitor](objects)
Expect(serviceMonitor).NotTo(BeNil())
Expect((*serviceMonitor).Name).To(Equal("queuecontroller"))
Expect((*serviceMonitor).Name).To(Equal("queue-controller"))
})

It("should return empty objects list when ServiceMonitors are disabled", func(ctx context.Context) {
Expand Down
Loading
Loading