From 052252a2bd2afc06f52a851b5ce7c746d3f9e033 Mon Sep 17 00:00:00 2001 From: davidLif Date: Mon, 24 Mar 2025 23:21:19 +0200 Subject: [PATCH] Add The first e2e tests to the repo - better coverage will be added in the next PRS --- .gitignore | 3 +- build/makefile/testenv.mk | 5 +- go.mod | 12 +- go.sum | 14 +- hack/e2e-kind-config.yaml | 16 ++ hack/run-e2e-kind.sh | 42 ++++ pkg/common/constants/constants.go | 5 + pkg/common/gpu_operator_discovery/cdi.go | 46 ++++ test/e2e/modules/constant/constant.go | 17 ++ test/e2e/modules/constant/labels/labels.go | 9 + test/e2e/modules/context/cluster.go | 51 ++++ test/e2e/modules/context/connectivity.go | 111 ++++++++ test/e2e/modules/context/context.go | 129 ++++++++++ test/e2e/modules/environment/environment.go | 33 +++ .../modules/resources/capacity/calculator.go | 183 ++++++++++++++ .../e2e/modules/resources/capacity/structs.go | 105 ++++++++ .../modules/resources/capacity/validators.go | 155 ++++++++++++ .../modules/resources/fillers/filler_jobs.go | 165 ++++++++++++ test/e2e/modules/resources/rd/batch_job.go | 82 ++++++ test/e2e/modules/resources/rd/namespace.go | 41 +++ .../modules/resources/rd/persistent_volume.go | 127 ++++++++++ test/e2e/modules/resources/rd/pod.go | 236 ++++++++++++++++++ .../resources/rd/pod_group/distributed_job.go | 47 ++++ .../resources/rd/pod_group/pod_group.go | 100 ++++++++ test/e2e/modules/resources/rd/queue/queue.go | 126 ++++++++++ test/e2e/modules/resources/rd/role_binding.go | 39 +++ .../modules/resources/rd/security_context.go | 26 ++ test/e2e/modules/utils/logs.go | 69 +++++ test/e2e/modules/utils/name_generators.go | 27 ++ test/e2e/modules/utils/random.go | 11 + test/e2e/modules/wait/pod.go | 149 +++++++++++ test/e2e/modules/wait/system_pods.go | 81 ++++++ test/e2e/modules/wait/watcher/generic.go | 67 +++++ test/e2e/modules/wait/watcher/pods.go | 95 +++++++ test/e2e/modules/wait/watcher/poll.go | 42 ++++ test/e2e/modules/wait/watcher/watcher.go | 106 ++++++++ .../appying_options_suite_test.go | 20 ++ .../applying_options/namespace_test.go | 115 +++++++++ 38 files changed, 2700 insertions(+), 7 deletions(-) create mode 100644 hack/e2e-kind-config.yaml create mode 100755 hack/run-e2e-kind.sh create mode 100644 pkg/common/gpu_operator_discovery/cdi.go create mode 100644 test/e2e/modules/constant/constant.go create mode 100644 test/e2e/modules/constant/labels/labels.go create mode 100644 test/e2e/modules/context/cluster.go create mode 100644 test/e2e/modules/context/connectivity.go create mode 100644 test/e2e/modules/context/context.go create mode 100644 test/e2e/modules/environment/environment.go create mode 100644 test/e2e/modules/resources/capacity/calculator.go create mode 100644 test/e2e/modules/resources/capacity/structs.go create mode 100644 test/e2e/modules/resources/capacity/validators.go create mode 100644 test/e2e/modules/resources/fillers/filler_jobs.go create mode 100644 test/e2e/modules/resources/rd/batch_job.go create mode 100644 test/e2e/modules/resources/rd/namespace.go create mode 100644 test/e2e/modules/resources/rd/persistent_volume.go create mode 100644 test/e2e/modules/resources/rd/pod.go create mode 100644 test/e2e/modules/resources/rd/pod_group/distributed_job.go create mode 100644 test/e2e/modules/resources/rd/pod_group/pod_group.go create mode 100644 test/e2e/modules/resources/rd/queue/queue.go create mode 100644 test/e2e/modules/resources/rd/role_binding.go create mode 100644 test/e2e/modules/resources/rd/security_context.go create mode 100644 test/e2e/modules/utils/logs.go create mode 100644 test/e2e/modules/utils/name_generators.go create mode 100644 test/e2e/modules/utils/random.go create mode 100644 test/e2e/modules/wait/pod.go create mode 100644 test/e2e/modules/wait/system_pods.go create mode 100644 test/e2e/modules/wait/watcher/generic.go create mode 100644 test/e2e/modules/wait/watcher/pods.go create mode 100644 test/e2e/modules/wait/watcher/poll.go create mode 100644 test/e2e/modules/wait/watcher/watcher.go create mode 100644 test/e2e/suites/allocate/applying_options/appying_options_suite_test.go create mode 100644 test/e2e/suites/allocate/applying_options/namespace_test.go diff --git a/.gitignore b/.gitignore index b60c46139..d5e2e96e3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ bin/ .idea/ charts/ -cover.out \ No newline at end of file +cover.out +.DS_Store diff --git a/build/makefile/testenv.mk b/build/makefile/testenv.mk index 4754a1c0b..37c2e840e 100644 --- a/build/makefile/testenv.mk +++ b/build/makefile/testenv.mk @@ -2,6 +2,9 @@ ENVTEST_K8S_VERSION = 1.32.0 ENVTEST_VERSION=release-0.20 +E2E_TESTS_DIR = "test/e2e/" +TEST_TARGETS = $(shell go list ./... | grep -v "${E2E_TESTS_DIR}") + envtest-docker-go: gocache @ ${ECHO_COMMAND} ${GREEN_CONSOLE} "${CONSOLE_PREFIX} Running unit-tests" ${BASE_CONSOLE} ${DOCKER_GO_COMMAND} make envtest-go || ${FAILURE_MESSAGE_HANDLER} @@ -10,7 +13,7 @@ envtest-docker-go: gocache envtest-go: envtest @ ${ECHO_COMMAND} ${GREEN_CONSOLE} "${CONSOLE_PREFIX} Running unit-tests" ${BASE_CONSOLE} KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path --bin-dir $(LOCALBIN))" \ - go test ./... -timeout 30m || ${FAILURE_MESSAGE_HANDLER} + go test ${TEST_TARGETS} -timeout 30m || ${FAILURE_MESSAGE_HANDLER} ${SUCCESS_MESSAGE_HANDLER} ENVTEST = $(LOCALBIN)/setup-envtest diff --git a/go.mod b/go.mod index 498b61748..ddd8cfbf8 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,12 @@ module github.com/NVIDIA/KAI-scheduler go 1.23.4 require ( + github.com/NVIDIA/gpu-operator v1.8.3-0.20240812232433-87286e93f2c9 github.com/argoproj/argo-workflows/v3 v3.6.4 github.com/dustin/go-humanize v1.0.1 github.com/gin-contrib/pprof v1.5.2 github.com/gin-gonic/gin v1.10.0 + github.com/go-logr/logr v1.4.2 github.com/golang/glog v1.2.4 github.com/grafana/pyroscope-go v1.2.1 github.com/onsi/ginkgo v1.16.5 @@ -14,6 +16,7 @@ require ( github.com/onsi/gomega v1.36.2 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.5 + github.com/run-ai/kwok-operator v0.0.0-20240926063032-05b6364bc7c7 github.com/spf13/pflag v1.0.6 github.com/stretchr/testify v1.10.0 github.com/xyproto/randomstring v1.2.0 @@ -51,6 +54,7 @@ require ( k8s.io/pod-security-admission v0.32.1 k8s.io/sample-apiserver v0.32.1 k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 + knative.dev/pkg v0.0.0-20250117084104-c43477f0052b knative.dev/serving v0.44.0 sigs.k8s.io/controller-runtime v0.20.0 sigs.k8s.io/karpenter v1.2.0 @@ -59,9 +63,11 @@ require ( require ( cel.dev/expr v0.18.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/NVIDIA/k8s-kata-manager v0.2.0 // indirect + github.com/NVIDIA/k8s-operator-libs v0.0.0-20240627150410-078e3039ecf7 // indirect github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect - github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect + github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/awslabs/operatorpkg v0.0.0-20241205163410-0fff9f28d115 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect @@ -83,7 +89,6 @@ require ( github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.7 // indirect github.com/gin-contrib/sse v0.1.0 // indirect - github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect @@ -132,6 +137,7 @@ require ( github.com/opencontainers/selinux v1.11.1 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.73.2 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect @@ -157,6 +163,7 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/arch v0.12.0 // indirect golang.org/x/crypto v0.35.0 // indirect + golang.org/x/mod v0.22.0 // indirect golang.org/x/net v0.36.0 // indirect golang.org/x/oauth2 v0.25.0 // indirect golang.org/x/sync v0.11.0 // indirect @@ -183,7 +190,6 @@ require ( k8s.io/kube-scheduler v0.32.1 // indirect k8s.io/kubelet v0.32.1 // indirect knative.dev/networking v0.0.0-20250117155906-67d1c274ba6a // indirect - knative.dev/pkg v0.0.0-20250117084104-c43477f0052b // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect diff --git a/go.sum b/go.sum index 9305f2283..920fce4f3 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,12 @@ contrib.go.opencensus.io/exporter/prometheus v0.4.2/go.mod h1:dvEHbiKmgvbr5pjaF9 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/NVIDIA/gpu-operator v1.8.3-0.20240812232433-87286e93f2c9 h1:9yw3Jkto9ZqtNwlnOzxAlKufMNplaWPRfkXhCUFHXgI= +github.com/NVIDIA/gpu-operator v1.8.3-0.20240812232433-87286e93f2c9/go.mod h1:lOgoRYbt1dtCVGX+EhxuZgPonfcIs41BXrnIPk3fE3I= +github.com/NVIDIA/k8s-kata-manager v0.2.0 h1:K+BFkXTOvXXj/kmbNfxFCXM+GkdOVZj2WTHQ7b2uQA0= +github.com/NVIDIA/k8s-kata-manager v0.2.0/go.mod h1:fVUz0DLzwW9RQBE59cLNTi3LrzVwSXWIogr9y5FocPM= +github.com/NVIDIA/k8s-operator-libs v0.0.0-20240627150410-078e3039ecf7 h1:NaClubDuTKoXy4Ev4ZkmpVy3u6xdwd3I1XFJdoI1r+M= +github.com/NVIDIA/k8s-operator-libs v0.0.0-20240627150410-078e3039ecf7/go.mod h1:d8YV6Am03Z9VS4fh6virN/ltOSasCZtAhkwMRU1X6Vs= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/Pallinder/go-randomdata v1.2.0 h1:DZ41wBchNRb/0GfsePLiSwb0PHZmT67XY00lCDlaYPg= @@ -20,8 +26,8 @@ github.com/argoproj/argo-workflows/v3 v3.6.4 h1:5+Cc1UwaQE5ka3w7R3hxZ1TK3M6VjDEX github.com/argoproj/argo-workflows/v3 v3.6.4/go.mod h1:2f5zB8CkbNCCO1od+kd1dWkVokqcuyvu+tc+Jwx1MZg= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= -github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= -github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= +github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so= +github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/awslabs/operatorpkg v0.0.0-20241205163410-0fff9f28d115 h1:9nhjY3dzCpEmhpQ0vMlhB7wqucAiftLjAIEQu8uT2J4= @@ -258,6 +264,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.73.2 h1:GwlGJPK6vf1UIohpc72KJVkKYlzki1UgE3xC4bWbf20= +github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.73.2/go.mod h1:yJ3CawR/A5qEYFEeCOUVYLTwYxmacfHQhJS+b/2QiaM= github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -274,6 +282,8 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/run-ai/kwok-operator v0.0.0-20240926063032-05b6364bc7c7 h1:7sUlviMShcd8g6sf2Q93ix+geV0staOMk42Rs0rAJqA= +github.com/run-ai/kwok-operator v0.0.0-20240926063032-05b6364bc7c7/go.mod h1:vih5aAo7hS8Mt6NXsIUI8SYW+WfB4GhZOuLZzLjZWcc= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= diff --git a/hack/e2e-kind-config.yaml b/hack/e2e-kind-config.yaml new file mode 100644 index 000000000..d8da0c1f6 --- /dev/null +++ b/hack/e2e-kind-config.yaml @@ -0,0 +1,16 @@ +# Copyright 2025 NVIDIA CORPORATION +# SPDX-License-Identifier: Apache-2.0 + +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +nodes: +- role: control-plane +- role: worker + labels: + run.ai/simulated-gpu-node-pool: default +- role: worker + labels: + run.ai/simulated-gpu-node-pool: default +- role: worker + labels: + run.ai/simulated-gpu-node-pool: default diff --git a/hack/run-e2e-kind.sh b/hack/run-e2e-kind.sh new file mode 100755 index 000000000..a1ddb6b12 --- /dev/null +++ b/hack/run-e2e-kind.sh @@ -0,0 +1,42 @@ +#!/bin/bash +# Copyright 2025 NVIDIA CORPORATION +# SPDX-License-Identifier: Apache-2.0 + + +CLUSTER_NAME=${CLUSTER_NAME:-e2e-kai-scheduler} + +REPO_ROOT=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/.. +KIND_CONFIG=${REPO_ROOT}/hack/e2e-kind-config.yaml +GOPATH=${HOME}/go +GOBIN=${GOPATH}/bin + +NVCR_SECRET_FILE_PATH=${1} +if [ -z "$NVCR_SECRET_FILE_PATH" ]; then + echo "Must a path to an appropriate secret file, that contains the credentials for the nvstaging-runai helm and docker image repository" + exit 1 +fi + +kind create cluster --config ${KIND_CONFIG} --name $CLUSTER_NAME + +kubectl create namespace kai-scheduler +# Set an appropriate secret to allow the kube-ai system pods to pull from nvstaging-runai and pull test images for the e2e tests +kubectl apply -f ${NVCR_SECRET_FILE_PATH} -n kai-scheduler + + +# Install the fake-gpu-operator to provide a fake GPU resources for the e2e tests +helm upgrade -i gpu-operator fake-gpu-operator/fake-gpu-operator --namespace gpu-operator --create-namespace --version 0.0.53 --set topology.nodePools.default.gpuCount=8 + +helm upgrade -i kai-scheduler nvstaging-runai/kai-scheduler -n kai-scheduler --create-namespace --set "global.imagePullSecrets[0].name=nvcr-secret" --set "global.gpuSharing=true" --set "global.registry=nvcr.io/nvstaging/runai" --version v0.2.0 + +# Allow all the pods in the fake-gpu-operator and kai-scheduler to start +sleep 30 + +# Install ginkgo if it's not installed +if [ ! -f ${GOBIN}/ginkgo ]; then + echo "Installing ginkgo" + GOBIN=${GOBIN} go install github.com/onsi/ginkgo/v2/ginkgo@v2.22.2 +fi + +${GOBIN}/ginkgo -r --keep-going --randomize-all --randomize-suites --trace -vv ${REPO_ROOT}/test/e2e/suites --label-filter '!autoscale', '!scale' + +kind delete cluster --name $CLUSTER_NAME diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index 85602ccf5..a8afa3d16 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -33,4 +33,9 @@ const ( MigEnabledLabel = "node-role.kubernetes.io/runai-mig-enabled" MigStrategyLabel = "nvidia.com/mig.strategy" GpuCountLabel = "nvidia.com/gpu.count" + QueueLabelKey = "runai/queue" + + // Namespaces + SystemPodsNamespace = "kai-scheduler" + RunaiReservationNamespace = "runai-reservation" ) diff --git a/pkg/common/gpu_operator_discovery/cdi.go b/pkg/common/gpu_operator_discovery/cdi.go new file mode 100644 index 000000000..bdc489810 --- /dev/null +++ b/pkg/common/gpu_operator_discovery/cdi.go @@ -0,0 +1,46 @@ +// Copyright 2025 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package gpu_operator_discovery + +import ( + "context" + "fmt" + + nvidiav1 "github.com/NVIDIA/gpu-operator/api/nvidia/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +func IsCdiEnabled(ctx context.Context, readerClient client.Reader) (bool, error) { + nvidiaClusterPolicies := &nvidiav1.ClusterPolicyList{} + err := readerClient.List(ctx, nvidiaClusterPolicies) + if err != nil { + if meta.IsNoMatchError(err) || kerrors.IsNotFound(err) { + return false, nil + } + log := logf.FromContext(ctx) + log.Error(err, "cannot list nvidia cluster policy") + return false, err + } + + if len(nvidiaClusterPolicies.Items) == 0 { + return false, nil + } + if len(nvidiaClusterPolicies.Items) > 1 { + log := logf.FromContext(ctx) + log.Info(fmt.Sprintf("Cluster has %d clusterpolicies.nvidia.com/v1 objects."+ + " First one is queried for the cdi configuration", len(nvidiaClusterPolicies.Items))) + } + + nvidiaClusterPolicy := nvidiaClusterPolicies.Items[0] + if nvidiaClusterPolicy.Spec.CDI.Enabled != nil && *nvidiaClusterPolicy.Spec.CDI.Enabled { + if nvidiaClusterPolicy.Spec.CDI.Default != nil && *nvidiaClusterPolicy.Spec.CDI.Default { + return true, nil + } + } + + return false, nil +} diff --git a/test/e2e/modules/constant/constant.go b/test/e2e/modules/constant/constant.go new file mode 100644 index 000000000..a7625796c --- /dev/null +++ b/test/e2e/modules/constant/constant.go @@ -0,0 +1,17 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ + +package constant + +const ( + NvidiaGPUMemoryLabelName = "nvidia.com/gpu.memory" + NodeNamePodLabelName = "kubernetes.io/hostname" + RunaiSchedulerName = "kai-scheduler" + RunaiReservationNamespace = "runai-reservation" + SystemPodsNamespace = "kai-scheduler" + NonPreemptiblePriorityThreshold = 100 + EngineTestPodsApp = "engine-e2e" + QueueLabelKey = "runai/queue" +) diff --git a/test/e2e/modules/constant/labels/labels.go b/test/e2e/modules/constant/labels/labels.go new file mode 100644 index 000000000..6f3be1c89 --- /dev/null +++ b/test/e2e/modules/constant/labels/labels.go @@ -0,0 +1,9 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package labels + +const ( + ReservationPod = "reservationPod" +) diff --git a/test/e2e/modules/context/cluster.go b/test/e2e/modules/context/cluster.go new file mode 100644 index 000000000..6bd5c7582 --- /dev/null +++ b/test/e2e/modules/context/cluster.go @@ -0,0 +1,51 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package context + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/queue" +) + +const ( + defaultServiceAccountName = "default" +) + +func (tc *TestContext) createClusterQueues(ctx context.Context) error { + for _, testQueue := range tc.Queues { + err := createQueueContext(ctx, testQueue) + if err != nil { + return err + } + } + return nil +} + +func createQueueContext(ctx context.Context, q *v2.Queue) error { + _, err := queue.Create(kubeAiSchedClientset, ctx, q, metav1.CreateOptions{}) + if err != nil { + return err + } + + namespaceName := queue.GetConnectedNamespaceToQueue(q) + ns := rd.CreateNamespaceObject(namespaceName, q.Name) + _, err = kubeClientset. + CoreV1(). + Namespaces(). + Create(ctx, ns, metav1.CreateOptions{}) + if err != nil { + return err + } + + // TODO: add RBAC role bindings + // TODO: patch the namespace to add appropriate secret to the service account + + return nil +} diff --git a/test/e2e/modules/context/connectivity.go b/test/e2e/modules/context/connectivity.go new file mode 100644 index 000000000..04225eb9a --- /dev/null +++ b/test/e2e/modules/context/connectivity.go @@ -0,0 +1,111 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package context + +import ( + "fmt" + "os" + "strings" + + nvidiav1 "github.com/NVIDIA/gpu-operator/api/nvidia/v1" + "k8s.io/api/node/v1alpha1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2" + kubeAiSchedulerV2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2" + + kwokopv1beta1 "github.com/run-ai/kwok-operator/api/v1beta1" + + kubeAiSchedClient "github.com/NVIDIA/KAI-scheduler/pkg/apis/client/clientset/versioned" +) + +var kubeConfig *rest.Config +var kubeClientset *kubernetes.Clientset +var kubeAiSchedClientset *kubeAiSchedClient.Clientset +var controllerClient runtimeClient.WithWatch = nil + +func initConnectivity() error { + if kubeConfig == nil { + err := initKubeConfig() + if err != nil { + return err + } + } + if kubeClientset == nil { + kubeClientset = kubernetes.NewForConfigOrDie(kubeConfig) + } + if kubeAiSchedClientset == nil { + kubeAiSchedClientset = kubeAiSchedClient.NewForConfigOrDie(kubeConfig) + } + if controllerClient == nil { + var err error + controllerClient, err = runtimeClient.NewWithWatch(kubeConfig, runtimeClient.Options{}) + if err != nil { + panic(err) + } + if err = v1alpha1.AddToScheme(controllerClient.Scheme()); err != nil { + return fmt.Errorf("failed to add engine v1alpha1 to scheme: %w", err) + } + if err = v2alpha2.AddToScheme(controllerClient.Scheme()); err != nil { + return fmt.Errorf("failed to add scheduling v2alpha2 to scheme: %w", err) + } + if err = v2.AddToScheme(controllerClient.Scheme()); err != nil { + return fmt.Errorf("failed to add scheduling v2 to scheme: %w", err) + } + if err = kubeAiSchedulerV2alpha2.AddToScheme(controllerClient.Scheme()); err != nil { + return fmt.Errorf("failed to add KubeAiScheduler scheduling v2alpha2 to scheme: %w", err) + } + if err = kwokopv1beta1.AddToScheme(controllerClient.Scheme()); err != nil { + return fmt.Errorf("failed to add scheduling v1beta1 to scheme: %w", err) + } + if err = nvidiav1.AddToScheme(controllerClient.Scheme()); err != nil { + return fmt.Errorf("failed to add nvidiav1 to scheme: %w", err) + } + } + + return nil +} + +func initKubeConfig() error { + config, err := clientcmd.BuildConfigFromFlags("", getKubeConfigPath()) + if err != nil { + return err + } + + // Update throttling parameters matching to the current (v1.25) kubectl + config.QPS = 50 + config.Burst = 300 + + kubeConfig = config + return nil +} + +func validateClientSets() error { + _, err := kubeClientset.ServerVersion() + if err != nil { + return fmt.Errorf("failed connectivity check on kube client-set. Inner error: %w", err) + } + + _, err = kubeAiSchedClientset.ServerVersion() + if err != nil { + return fmt.Errorf("failed connectivity check on KubeAiScheduler client-set. Inner error: %w", err) + } + + return nil +} + +func getKubeConfigPath() string { + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig == "" { + kubeconfig = fmt.Sprintf("%s/.kube/config", os.Getenv("HOME")) + } + kubeconfig = strings.Split(kubeconfig, ":")[0] + + return kubeconfig +} diff --git a/test/e2e/modules/context/context.go b/test/e2e/modules/context/context.go new file mode 100644 index 000000000..0e916be25 --- /dev/null +++ b/test/e2e/modules/context/context.go @@ -0,0 +1,129 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package context + +import ( + "context" + + "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + kubeAiSchedClient "github.com/NVIDIA/KAI-scheduler/pkg/apis/client/clientset/versioned" + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/pod_group" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/queue" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/wait" +) + +type TestContext struct { + KubeConfig *rest.Config + KubeClientset *kubernetes.Clientset + KubeAiSchedClientset *kubeAiSchedClient.Clientset + ControllerClient runtimeClient.WithWatch + Queues []*v2.Queue + + goContext context.Context + asserter gomega.Gomega +} + +func GetConnectivity(ctx context.Context, asserter gomega.Gomega) *TestContext { + err := initConnectivity() + if err != nil { + asserter.Expect(err).NotTo(gomega.HaveOccurred(), "failed to get connectivity objects") + } + + err = validateClientSets() + if err != nil { + asserter.Expect(err).NotTo(gomega.HaveOccurred(), "client-sets validation failure") + } + + return &TestContext{ + KubeConfig: kubeConfig, + KubeClientset: kubeClientset, + KubeAiSchedClientset: kubeAiSchedClientset, + ControllerClient: controllerClient, + Queues: []*v2.Queue{}, + + goContext: ctx, + asserter: asserter, + } +} + +func (tc *TestContext) InitQueues(queues []*v2.Queue) { + if queues != nil { + tc.Queues = queues + } + + err := tc.createClusterQueues(tc.goContext) + if err != nil { + tc.asserter. + Expect(err).NotTo(gomega.HaveOccurred(), "failed to create test cluster context") + } +} + +func (tc *TestContext) AddQueues(ctx context.Context, newQueues []*v2.Queue) { + allQueues := append(tc.Queues, newQueues...) + tc.goContext = ctx + tc.InitQueues(newQueues) + tc.Queues = allQueues +} + +func (tc *TestContext) TestContextCleanup(ctx context.Context) { + namespaces, err := rd.GetE2ENamespaces(ctx, tc.KubeClientset) + tc.asserter.Expect(err).To(gomega.Succeed()) + + for _, namespace := range namespaces.Items { + tc.deleteAllObjectsInNamespace(ctx, namespace.Name) + } + + wait.ForNoE2EPods(ctx, tc.ControllerClient) + wait.ForNoReservationPods(ctx, tc.ControllerClient) + + wait.ForRunningBinderPodEvent(ctx, tc.ControllerClient) + wait.ForRunningSchedulerPodEvent(ctx, tc.ControllerClient, "scheduler") +} + +func (tc *TestContext) ClusterCleanup(ctx context.Context) { + tc.TestContextCleanup(ctx) + namespaces, err := rd.GetE2ENamespaces(ctx, tc.KubeClientset) + tc.asserter.Expect(err).To(gomega.Succeed()) + + for _, namespace := range namespaces.Items { + err = rd.DeleteNamespace(ctx, tc.KubeClientset, namespace.Name) + tc.asserter.Expect(err).To(gomega.Succeed()) + } + tc.deleteAllQueues(ctx) + + err = rd.DeleteAllStorageObjects(ctx, tc.ControllerClient) + tc.asserter.Expect(err).To(gomega.Succeed()) +} + +func (tc *TestContext) deleteAllObjectsInNamespace(ctx context.Context, namespace string) { + err := rd.DeleteAllJobsInNamespace(ctx, tc.ControllerClient, namespace) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = rd.DeleteAllPodsInNamespace(ctx, tc.ControllerClient, namespace) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = pod_group.DeleteAllInNamespace(ctx, tc.ControllerClient, namespace) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = rd.DeleteAllConfigMapsInNamespace(ctx, tc.ControllerClient, namespace) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) +} + +func (tc *TestContext) deleteAllQueues(ctx context.Context) { + queues, err := queue.GetAllQueues(tc.KubeAiSchedClientset, ctx) + tc.asserter.Expect(err).To(gomega.Succeed()) + for _, clusterQueue := range queues.Items { + err = queue.Delete(tc.KubeAiSchedClientset, ctx, clusterQueue.Name, + metav1.DeleteOptions{}) + tc.asserter.Expect(err).To(gomega.Succeed()) + } +} diff --git a/test/e2e/modules/environment/environment.go b/test/e2e/modules/environment/environment.go new file mode 100644 index 000000000..e0ed852ad --- /dev/null +++ b/test/e2e/modules/environment/environment.go @@ -0,0 +1,33 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package environment + +import ( + "context" + + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var isFakeGpuEnv *bool + +func HasFakeGPUNodes(ctx context.Context, k8sClient client.Reader) (bool, error) { + if isFakeGpuEnv != nil { + return *isFakeGpuEnv, nil + } + + var nodes v1.NodeList + + err := k8sClient.List( + ctx, &nodes, + client.MatchingLabels{ + "run.ai/fake.gpu": "true", + }) + if err != nil { + return false, err + } + + return len(nodes.Items) > 0, nil +} diff --git a/test/e2e/modules/resources/capacity/calculator.go b/test/e2e/modules/resources/capacity/calculator.go new file mode 100644 index 000000000..84a420a22 --- /dev/null +++ b/test/e2e/modules/resources/capacity/calculator.go @@ -0,0 +1,183 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package capacity + +import ( + "context" + "fmt" + "strconv" + + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/constant" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + resourcehelper "k8s.io/kubectl/pkg/util/resource" +) + +func getClusterResources(clientset kubernetes.Interface) (*ResourceList, error) { + perNodeResources, err := GetNodesIdleResources(clientset) + if err != nil { + return nil, err + } + + rl := initEmptyResourcesList() + for _, nodeResources := range perNodeResources { + rl.Add(nodeResources) + } + return rl, nil +} + +func GetNodesIdleResources(clientset kubernetes.Interface) (map[string]*ResourceList, error) { + podList, nodeList, err := getPodsAndNodes(clientset) + if err != nil { + return nil, err + } + + return calcNodesResources(podList, nodeList), nil +} + +func GetNodesAllocatableResources(clientset kubernetes.Interface) (map[string]*ResourceList, error) { + nodeList, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("error listing Nodes: %w", err) + } + + perNodeResources := map[string]*ResourceList{} + + for _, node := range nodeList.Items { + perNodeResources[node.Name] = &ResourceList{ + Cpu: node.Status.Allocatable.Cpu().DeepCopy(), + Memory: node.Status.Allocatable.Memory().DeepCopy(), + Gpu: node.Status.Allocatable[constants.GpuResource].DeepCopy(), + GpuMemory: calcNodeGpuMemory(node), + PodCount: int(node.Status.Allocatable.Pods().Value()), + OtherResources: getOtherResourcesFromResourceList(node.Status.Allocatable), + } + } + + return perNodeResources, nil +} + +func getPodsAndNodes(clientset kubernetes.Interface) ( + *corev1.PodList, *corev1.NodeList, error) { + nodeList, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, nil, fmt.Errorf("error listing Nodes: %w", err) + } + + podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, nil, fmt.Errorf("error listing Pods: %w", err) + } + + nodes := map[string]bool{} + for _, node := range nodeList.Items { + nodes[node.GetName()] = true + } + + var newPodItems []corev1.Pod + for _, pod := range podList.Items { + if !nodes[pod.Spec.NodeName] { + continue + } + + newPodItems = append(newPodItems, pod) + } + podList.Items = newPodItems + + return podList, nodeList, nil +} + +func calcNodesResources(podList *corev1.PodList, nodeList *corev1.NodeList) map[string]*ResourceList { + perNodeResources := map[string]*ResourceList{} + + for _, node := range nodeList.Items { + perNodeResources[node.Name] = &ResourceList{ + Cpu: node.Status.Allocatable.Cpu().DeepCopy(), + Memory: node.Status.Allocatable.Memory().DeepCopy(), + Gpu: node.Status.Allocatable[constants.GpuResource].DeepCopy(), + GpuMemory: calcNodeGpuMemory(node), + PodCount: int(node.Status.Allocatable.Pods().Value()), + OtherResources: getOtherResourcesFromResourceList(node.Status.Allocatable), + } + } + + for _, pod := range podList.Items { + if pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed && + pod.Spec.NodeName != "" { + subtractPodResources(perNodeResources[pod.Spec.NodeName], &pod) + } + } + + return perNodeResources +} + +func calcNodeGpuMemory(node corev1.Node) resource.Quantity { + var nodeGpusMemory resource.Quantity + nodeGpus := node.Status.Allocatable[constants.GpuResource] + if nodeGpusMemoryStr := node.Labels[constant.NvidiaGPUMemoryLabelName]; nodeGpusMemoryStr != "" { + singleGpuMemory, _ := strconv.Atoi(nodeGpusMemoryStr) + totalNodeGpuMemory := strconv.Itoa(singleGpuMemory * int(nodeGpus.Value())) + nodeGpusMemory = resource.MustParse(totalNodeGpuMemory + "Mi") + } else { + nodeGpusMemory = resource.MustParse("0") + } + return nodeGpusMemory +} + +func subtractPodResources(nodeResourceList *ResourceList, pod *corev1.Pod) { + req, _ := resourcehelper.PodRequestsAndLimits(pod) + + podResourceList := ResourcesRequestToList(req, pod.Annotations) + nodeResourceList.Sub(podResourceList) +} + +func ResourcesRequestToList(req corev1.ResourceList, podAnnotations map[string]string) *ResourceList { + podResourceList := &ResourceList{ + Cpu: *req.Cpu(), + Memory: *req.Memory(), + Gpu: calcPodGpus(podAnnotations, req), + GpuMemory: calcPodGpuMemory(podAnnotations), + PodCount: 1, + OtherResources: getOtherResourcesFromResourceList(req), + } + return podResourceList +} + +func calcPodGpus(podAnnotations map[string]string, req corev1.ResourceList) resource.Quantity { + gpuReq := req[constants.GpuResource].DeepCopy() + + if fractionalGpuStr := podAnnotations[constants.RunaiGpuFraction]; fractionalGpuStr != "" { + fractionalGpu := resource.MustParse(fractionalGpuStr) + + gpuReq.Add(fractionalGpu) + } + return gpuReq +} + +func calcPodGpuMemory(podAnnotations map[string]string) resource.Quantity { + if gpuMemoryStr := podAnnotations["gpu-memory"]; gpuMemoryStr != "" { + return resource.MustParse(gpuMemoryStr) + } else { + return resource.MustParse("0") + } +} + +func getOtherResourcesFromResourceList(list corev1.ResourceList) map[corev1.ResourceName]resource.Quantity { + otherResources := map[corev1.ResourceName]resource.Quantity{} + for key, value := range list { + if key == corev1.ResourceCPU || + key == corev1.ResourceMemory || + key == corev1.ResourcePods || + key == constants.GpuResource { + continue + } + otherResources[key] = value + } + return otherResources +} diff --git a/test/e2e/modules/resources/capacity/structs.go b/test/e2e/modules/resources/capacity/structs.go new file mode 100644 index 000000000..993a2f1b4 --- /dev/null +++ b/test/e2e/modules/resources/capacity/structs.go @@ -0,0 +1,105 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package capacity + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + + "k8s.io/apimachinery/pkg/api/resource" +) + +type ResourceList struct { + Cpu resource.Quantity + Memory resource.Quantity + Gpu resource.Quantity + GpuMemory resource.Quantity + PodCount int + OtherResources map[v1.ResourceName]resource.Quantity +} + +func initEmptyResourcesList() *ResourceList { + return &ResourceList{ + Cpu: resource.MustParse("0"), + Memory: resource.MustParse("0"), + Gpu: resource.MustParse("0"), + GpuMemory: resource.MustParse("0"), + PodCount: 0, + OtherResources: map[v1.ResourceName]resource.Quantity{}, + } +} + +func FromK8sResourceList(list v1.ResourceList) *ResourceList { + return &ResourceList{ + Cpu: *list.Cpu(), + Memory: *list.Memory(), + Gpu: list[v1.ResourceName("nvidia.com/gpu")], + GpuMemory: resource.Quantity{}, + } +} + +func (rl *ResourceList) Add(toAddRl *ResourceList) { + rl.Cpu.Add(toAddRl.Cpu) + rl.Memory.Add(toAddRl.Memory) + rl.Gpu.Add(toAddRl.Gpu) + rl.GpuMemory.Add(toAddRl.GpuMemory) + rl.PodCount += toAddRl.PodCount + for resourceName, quantity := range toAddRl.OtherResources { + if rl.OtherResources == nil { + rl.OtherResources = map[v1.ResourceName]resource.Quantity{} + } + if _, exists := rl.OtherResources[resourceName]; !exists { + rl.OtherResources[resourceName] = resource.MustParse("0") + } + res := rl.OtherResources[resourceName] + res.Add(quantity) + rl.OtherResources[resourceName] = res + } +} + +func (rl *ResourceList) Sub(toSubRl *ResourceList) { + rl.Cpu.Sub(toSubRl.Cpu) + rl.Memory.Sub(toSubRl.Memory) + rl.Gpu.Sub(toSubRl.Gpu) + rl.GpuMemory.Sub(toSubRl.GpuMemory) + rl.PodCount -= toSubRl.PodCount + for resourceName, quantity := range toSubRl.OtherResources { + if rl.OtherResources == nil { + rl.OtherResources = map[v1.ResourceName]resource.Quantity{} + } + if _, exists := rl.OtherResources[resourceName]; !exists { + rl.OtherResources[resourceName] = resource.MustParse("0") + } + res := rl.OtherResources[resourceName] + res.Sub(quantity) + rl.OtherResources[resourceName] = res + } +} + +func (rl *ResourceList) LessOrEqual(toCmpRl *ResourceList) bool { + if regularResource := !(rl.Cpu.Cmp(toCmpRl.Cpu) > 0) && + !(rl.Memory.Cmp(toCmpRl.Memory) > 0) && + !(rl.Gpu.Cmp(toCmpRl.Gpu) > 0) && + !(rl.GpuMemory.Cmp(toCmpRl.GpuMemory) > 0) && + !(rl.PodCount > toCmpRl.PodCount); !regularResource { + return false + } + + otherResources := true + for resourceName, quantity := range rl.OtherResources { + res := resource.MustParse("0") + if _, exists := toCmpRl.OtherResources[resourceName]; exists { + res = toCmpRl.OtherResources[resourceName] + } + otherResources = otherResources && !(quantity.Cmp(res) > 0) + } + return otherResources +} + +func (rl ResourceList) String() string { + return fmt.Sprintf("{Cpu: %v, Memory: %v, gpus: %v, GpuMemory: %v, PodCount: %d, OtherResources: %v}", + rl.Cpu.String(), rl.Memory.String(), rl.Gpu.String(), rl.GpuMemory.String(), rl.PodCount, rl.OtherResources) +} diff --git a/test/e2e/modules/resources/capacity/validators.go b/test/e2e/modules/resources/capacity/validators.go new file mode 100644 index 000000000..d0ba98bdd --- /dev/null +++ b/test/e2e/modules/resources/capacity/validators.go @@ -0,0 +1,155 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package capacity + +import ( + "context" + "fmt" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "golang.org/x/exp/maps" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" + + gpuoperatordiscovery "github.com/NVIDIA/KAI-scheduler/pkg/common/gpu_operator_discovery" +) + +func SkipIfInsufficientClusterResources(clientset kubernetes.Interface, resourceRequest *ResourceList) { + hasResources, clusterMetrics, err := hasSufficientClusterResources(clientset, resourceRequest) + if err != nil { + gomega.Expect(err).NotTo(gomega.HaveOccurred(), + "Failed to validate sufficient resources with error") + } + if !hasResources { + ginkgo.Skip( + fmt.Sprintf( + "The current cluster doesn't have enough resources to run the test. "+ + "Requested resources: %v. Cluster resources: %v", + resourceRequest.String(), clusterMetrics.String(), + ), + ) + } +} + +func SkipIfInsufficientClusterTopologyResources(client kubernetes.Interface, nodeResourceRequests []ResourceList) { + hasResources, resources, err := hasSufficientClusterTopologyResources(client, nodeResourceRequests) + if err != nil { + gomega.Expect(err).NotTo(gomega.HaveOccurred(), + "Failed to validate sufficient resources with error") + } + if !hasResources { + ginkgo.Skip( + fmt.Sprintf( + "The current cluster doesn't have enough resources to run the test, taking into account the required topology. "+ + "Requested resources: %v, Cluster resources: %v", nodeResourceRequests, resources, + ), + ) + } +} + +// SkipIfNonHomogeneousGpuCounts skips the test if the cluster has nodes with different GPU counts, not considering cpu-only nodes. +func SkipIfNonHomogeneousGpuCounts(clientset kubernetes.Interface) int { + resources, err := GetNodesAllocatableResources(clientset) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to validate homogeneous gpu resources") + + gpuCounts := map[int]bool{} + for _, resource := range resources { + gpuCounts[int(resource.Gpu.Value())] = true + } + + delete(gpuCounts, 0) + if len(gpuCounts) > 1 { + ginkgo.Skip( + fmt.Sprintf( + "The current cluster has nodes with different GPU counts: %v", gpuCounts, + ), + ) + } + + return maps.Keys(gpuCounts)[0] +} + +func SkipIfCDIEnabled(ctx context.Context, client client.Client) { + cdiEnabled, err := gpuoperatordiscovery.IsCdiEnabled(ctx, client) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to check if CDI is enabled") + + if cdiEnabled { + ginkgo.Skip("CDI is enabled") + } +} + +func SkipIfCSIDriverIsMissing(ctx context.Context, client kubernetes.Interface, name string) { + _, err := client.StorageV1().CSIDrivers().Get(ctx, name, metav1.GetOptions{}) + if err != nil && errors.IsNotFound(err) { + ginkgo.Skip( + fmt.Sprintf( + "The current cluster doesn't have %v CSIDriver", name, + ), + ) + } +} + +func SkipIfCSICapacitiesAreMissing(ctx context.Context, client kubernetes.Interface) { + capacities, err := client.StorageV1().CSIStorageCapacities("openebs").List(ctx, metav1.ListOptions{}) + if err != nil || len(capacities.Items) == 0 { + ginkgo.Skip( + fmt.Sprintf("Found %d CSIStorageCapacities, err: %s", len(capacities.Items), err), + ) + } +} + +func hasSufficientClusterResources(client kubernetes.Interface, resourceRequest *ResourceList) ( + bool, *ResourceList, error) { + clusterMetrics, err := getClusterResources(client) + if err != nil { + return false, nil, err + } + + if resourceRequest.LessOrEqual(clusterMetrics) { + return true, clusterMetrics, nil + } + return false, clusterMetrics, nil +} + +func hasSufficientClusterTopologyResources(client kubernetes.Interface, nodeResourceRequests []ResourceList, +) (bool, map[string]*ResourceList, error) { + nodesResourcesMap, err := GetNodesIdleResources(client) + if err != nil { + return false, nil, err + } + var nodeResources []ResourceList + for _, nodeResource := range nodesResourcesMap { + nodeResources = append(nodeResources, *nodeResource) + } + return theseNodesSufficientForTheseRequirementsRec(nodeResourceRequests, nodeResources), nodesResourcesMap, nil +} + +func theseNodesSufficientForTheseRequirementsRec(reqs, nodes []ResourceList) bool { + if len(reqs) == 0 { + return true + } + if len(nodes) == 0 { + return false + } + + nodeReq := reqs[0] + newReqs := removeItemFromList(reqs, 0) + for index, node := range nodes { + if nodeReq.LessOrEqual(&node) { + newNodes := removeItemFromList(nodes, index) + if theseNodesSufficientForTheseRequirementsRec(newReqs, newNodes) { + return true + } + } + } + return false +} + +func removeItemFromList(list []ResourceList, index int) []ResourceList { + return append(list[:index], list[index+1:]...) +} diff --git a/test/e2e/modules/resources/fillers/filler_jobs.go b/test/e2e/modules/resources/fillers/filler_jobs.go new file mode 100644 index 000000000..b10847e68 --- /dev/null +++ b/test/e2e/modules/resources/fillers/filler_jobs.go @@ -0,0 +1,165 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package fillers + +import ( + "context" + "fmt" + + . "github.com/onsi/gomega" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/constant" + + testcontext "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/context" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/capacity" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/queue" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/wait" +) + +var ( + maxFillerPodsPerNode = 1000 +) + +func FillAllNodesWithJobs( + ctx context.Context, testCtx *testcontext.TestContext, testQueue *v2.Queue, resources v1.ResourceRequirements, + annotations map[string]string, labels map[string]string, priorityClass string, targetNodes ...string) ([]*batchv1.Job, []*v1.Pod, error) { + var jobs []*batchv1.Job + var jobPods []*v1.Pod + + podFillerSize, err := calcNumOfFillerPods(testCtx, resources, annotations, targetNodes...) + if err != nil { + return nil, nil, err + } + + for i := 0; i < podFillerSize; i++ { + job, jobPod := createFillerJob(ctx, testCtx, testQueue, resources, annotations, labels, priorityClass, targetNodes...) + jobs = append(jobs, job) + jobPods = append(jobPods, jobPod) + } + + namespace := queue.GetConnectedNamespaceToQueue(testQueue) + wait.ForPodsScheduled(ctx, testCtx.ControllerClient, namespace, jobPods) + + // Validate that we filled the relevant nodes to capacity + extraJob, extraJobPod := createFillerJob(ctx, testCtx, testQueue, resources, annotations, labels, priorityClass, + targetNodes...) + defer rd.DeleteJob(ctx, testCtx.KubeClientset, extraJob) + wait.ForPodUnschedulable(ctx, testCtx.ControllerClient, extraJobPod) + + return jobs, jobPods, nil +} + +func createFillerJob(ctx context.Context, testCtx *testcontext.TestContext, testQueue *v2.Queue, + resources v1.ResourceRequirements, annotations, labels map[string]string, priorityClass string, targetNodes ...string) ( + *batchv1.Job, *v1.Pod) { + namespace := queue.GetConnectedNamespaceToQueue(testQueue) + + job := rd.CreateBatchJobObject(testQueue, resources) + if len(annotations) > 0 { + maps.Copy(job.Spec.Template.Annotations, annotations) + } + if len(labels) > 0 { + maps.Copy(job.Spec.Template.Labels, labels) + } + if len(targetNodes) > 0 { + setTargetNodeAffinity(targetNodes, job) + } + job.Spec.Template.Spec.PriorityClassName = priorityClass + job, err := testCtx.KubeClientset.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + + labelSelector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + rd.BatchJobAppLabel: job.Labels[rd.BatchJobAppLabel], + }, + } + wait.ForAtLeastOnePodCreation(ctx, testCtx.ControllerClient, labelSelector) + + pods, err := testCtx.KubeClientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", rd.BatchJobAppLabel, job.Labels[rd.BatchJobAppLabel]), + }) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(BeNumerically(">", 0)) + + job, err = testCtx.KubeClientset.BatchV1().Jobs(namespace).Get(ctx, job.Name, metav1.GetOptions{}) + Expect(err).To(Succeed()) + return job, &pods.Items[0] +} + +func setTargetNodeAffinity(targetNodes []string, job *batchv1.Job) { + var nodeSelectorRequirements []v1.NodeSelectorRequirement + for _, nodeName := range targetNodes { + nodeSelectorRequirements = append(nodeSelectorRequirements, v1.NodeSelectorRequirement{ + Key: constant.NodeNamePodLabelName, + Operator: v1.NodeSelectorOpIn, + Values: []string{nodeName}, + }) + } + job.Spec.Template.Spec.Affinity = &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: nodeSelectorRequirements, + }, + }, + }, + }, + } +} + +func calcNumOfFillerPods(testCtx *testcontext.TestContext, resources v1.ResourceRequirements, + annotations map[string]string, targetNodes ...string) (int, error) { + nodesIdleResources, err := capacity.GetNodesIdleResources(testCtx.KubeClientset) + if err != nil { + return -1, err + } + + podFillerSize := 0 + for nodeName, nodeIdleResources := range nodesIdleResources { + if len(targetNodes) > 0 && !slices.Contains(targetNodes, nodeName) { + continue + } + perNodeFillerPods, err := calcNumOfFillerPodsForNode(nodeIdleResources, resources, annotations) + if err != nil { + return -1, err + } + podFillerSize += perNodeFillerPods + } + return podFillerSize, nil +} + +func calcNumOfFillerPodsForNode( + nodeIdleResources *capacity.ResourceList, resources v1.ResourceRequirements, + annotations map[string]string) (int, error) { + fillerPodResources := resources.Requests + if fillerPodResources == nil { + fillerPodResources = resources.Limits + } + singleFillerPodResources := capacity.ResourcesRequestToList(fillerPodResources, annotations) + nodeFillerResources := &capacity.ResourceList{} + nodeFillerResources.Add(singleFillerPodResources) + + perNodeFillerPods := 0 + for nodeFillerResources.LessOrEqual(nodeIdleResources) { + perNodeFillerPods += 1 + nodeFillerResources.Add(singleFillerPodResources) + + if perNodeFillerPods > maxFillerPodsPerNode { + return -1, fmt.Errorf( + "the number of filler pods (according to resource calculation) is bigger then "+ + "the max filler pods per node\n. free resources: %v, filler pod resources: %v, max filler pods: %d", + nodeFillerResources, fillerPodResources, maxFillerPodsPerNode) + } + } + return perNodeFillerPods, nil +} diff --git a/test/e2e/modules/resources/rd/batch_job.go b/test/e2e/modules/resources/rd/batch_job.go new file mode 100644 index 000000000..375c4d193 --- /dev/null +++ b/test/e2e/modules/resources/rd/batch_job.go @@ -0,0 +1,82 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package rd + +import ( + "context" + "fmt" + + "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "knative.dev/pkg/ptr" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/queue" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/utils" +) + +const BatchJobAppLabel = "batch-job-app-name" + +func CreateBatchJobObject(podQueue *v2.Queue, resources v1.ResourceRequirements) *batchv1.Job { + namespace := queue.GetConnectedNamespaceToQueue(podQueue) + matchLabelValue := utils.GenerateRandomK8sName(10) + + pod := CreatePodObject(podQueue, resources) + pod.Labels[BatchJobAppLabel] = matchLabelValue + pod.Spec.RestartPolicy = v1.RestartPolicyNever + + return &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "batch/v1", + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: utils.GenerateRandomK8sName(10), + Namespace: namespace, + Labels: map[string]string{ + constants.AppLabelName: "engine-e2e", + BatchJobAppLabel: matchLabelValue, + }, + }, + Spec: batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: pod.ObjectMeta, + Spec: pod.Spec, + }, + }, + } +} + +func GetJobPods(ctx context.Context, client *kubernetes.Clientset, job *batchv1.Job) []v1.Pod { + pods, err := client.CoreV1().Pods(job.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", BatchJobAppLabel, job.Labels[BatchJobAppLabel]), + }) + gomega.Expect(err).To(gomega.Succeed()) + return pods.Items +} + +func DeleteJob(ctx context.Context, client *kubernetes.Clientset, job *batchv1.Job) { + propagationPolicy := metav1.DeletePropagationForeground + err := client.BatchV1().Jobs(job.Namespace).Delete(ctx, job.Name, metav1.DeleteOptions{ + PropagationPolicy: &propagationPolicy, + GracePeriodSeconds: ptr.Int64(0), + }) + gomega.Expect(err).To(gomega.Succeed()) +} + +func DeleteAllJobsInNamespace(ctx context.Context, client runtimeClient.Client, namespace string) error { + err := client.DeleteAllOf( + ctx, &batchv1.Job{}, + runtimeClient.InNamespace(namespace), + runtimeClient.GracePeriodSeconds(0), + runtimeClient.PropagationPolicy(metav1.DeletePropagationForeground), + ) + return runtimeClient.IgnoreNotFound(err) +} diff --git a/test/e2e/modules/resources/rd/namespace.go b/test/e2e/modules/resources/rd/namespace.go new file mode 100644 index 000000000..2c6f716f7 --- /dev/null +++ b/test/e2e/modules/resources/rd/namespace.go @@ -0,0 +1,41 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package rd + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" +) + +func CreateNamespaceObject(name, queueName string) *corev1.Namespace { + return &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "project": queueName, + constants.QueueLabelKey: queueName, + constants.AppLabelName: "engine-e2e", + }, + }, + } +} + +func GetE2ENamespaces(ctx context.Context, kubeClient *kubernetes.Clientset) (*corev1.NamespaceList, error) { + return kubeClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=engine-e2e", constants.AppLabelName), + }) +} + +func DeleteNamespace(ctx context.Context, kubeClient *kubernetes.Clientset, namespace string) error { + err := kubeClient.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}) + return client.IgnoreNotFound(err) +} diff --git a/test/e2e/modules/resources/rd/persistent_volume.go b/test/e2e/modules/resources/rd/persistent_volume.go new file mode 100644 index 000000000..bfa1e95ac --- /dev/null +++ b/test/e2e/modules/resources/rd/persistent_volume.go @@ -0,0 +1,127 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package rd + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" +) + +func CreatePersistentVolumeObject(name string, path string) *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + constants.AppLabelName: "engine-e2e", + }, + }, + Spec: v1.PersistentVolumeSpec{ + StorageClassName: "manual", + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + Capacity: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("10k"), + }, + PersistentVolumeSource: v1.PersistentVolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: path, + }, + }, + }, + } +} + +func CreatePersistentVolumeClaimObject(name string, storageClassName string, + quantity resource.Quantity) *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + constants.AppLabelName: "engine-e2e", + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: ptr.To(storageClassName), + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: quantity, + }, + }, + }, + } +} + +// CreateStorageClass creates StorageClass matching CSI driver for provisioning local PVs backed by LVM +// documentation: https://github.com/openebs/lvm-localpv +func CreateStorageClass(name string) *storagev1.StorageClass { + volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer + return &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + constants.AppLabelName: "engine-e2e", + }, + }, + Provisioner: "local.csi.openebs.io", + Parameters: map[string]string{ + "storage": "lvm", + "volgroup": "lvmvg", + }, + VolumeBindingMode: &volumeBindingMode, + } +} + +func DeleteAllStorageObjects(ctx context.Context, k8sClient client.Client) error { + err := DeleteAllPersistentVolumes(ctx, k8sClient) + if err != nil { + return err + } + + return DeleteAllStorageClasses(ctx, k8sClient) +} + +func DeleteAllPersistentVolumes(ctx context.Context, k8sClient client.Client) error { + pv := &v1.PersistentVolume{} + return k8sClient.DeleteAllOf(ctx, pv, client.MatchingLabels{constants.AppLabelName: "engine-e2e"}) +} + +func DeleteAllStorageClasses(ctx context.Context, k8sClient client.Client) error { + storageClass := &storagev1.StorageClass{} + return k8sClient.DeleteAllOf(ctx, storageClass, client.MatchingLabels{constants.AppLabelName: "engine-e2e"}) +} + +func GetCSICapacity(ctx context.Context, k8sClient client.Client, storageclass *storagev1.StorageClass) (storageCapacity *storagev1.CSIStorageCapacity, err error) { + var capacities storagev1.CSIStorageCapacityList + err = k8sClient.List(ctx, &capacities) + if err != nil { + return nil, err + } + + if len(capacities.Items) == 0 { + return nil, fmt.Errorf("no CSIStorageCapacities found") + } + + for i, capacity := range capacities.Items { + if capacity.StorageClassName == storageclass.Name { + return &capacities.Items[i], nil + + } + } + + return nil, fmt.Errorf("no CSIStorageCapacity found for storage class %s", storageclass.Name) +} diff --git a/test/e2e/modules/resources/rd/pod.go b/test/e2e/modules/resources/rd/pod.go new file mode 100644 index 000000000..d8fa956ed --- /dev/null +++ b/test/e2e/modules/resources/rd/pod.go @@ -0,0 +1,236 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package rd + +import ( + "bytes" + "context" + "fmt" + "io" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/ptr" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/constant" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/queue" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/utils" +) + +const ( + PodGroupLabelName = "pod-group-name" + numCreatePodRetries = 10 +) + +func IsPodScheduled(pod *v1.Pod) bool { + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodScheduled && condition.Status == v1.ConditionTrue { + return true + } + } + return false +} + +func IsPodUnschedulable(pod *v1.Pod) bool { + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodScheduled && condition.Status == v1.ConditionFalse && + condition.Reason == v1.PodReasonUnschedulable { + return true + } + } + return false +} + +func IsPodReady(pod *v1.Pod) bool { + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue { + return true + } + } + return false +} + +func IsPodRunning(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodRunning +} + +func IsPodEnded(pod *v1.Pod) bool { + return IsPodSucceeded(pod) || IsPodFailed(pod) +} + +func IsPodSucceeded(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodSucceeded +} + +func IsPodFailed(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodFailed +} + +func GetPod(ctx context.Context, client *kubernetes.Clientset, namespace string, name string) (*v1.Pod, error) { + pod, err := client. + CoreV1(). + Pods(namespace). + Get(ctx, name, metav1.GetOptions{}) + return pod, err +} + +func GetPodLogs(ctx context.Context, client *kubernetes.Clientset, namespace string, name string) (string, error) { + req := client. + CoreV1(). + Pods(namespace). + GetLogs(name, &v1.PodLogOptions{}) + podLogs, err := req.Stream(ctx) + if err != nil { + return "", err + } + + defer podLogs.Close() + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + return "", err + } + + return buf.String(), nil +} + +func CreatePod(ctx context.Context, client *kubernetes.Clientset, pod *v1.Pod) (*v1.Pod, error) { + _, err := GetPod(ctx, client, pod.Namespace, pod.Name) + if err == nil { + // pod is not expected to exist in the cluster + return nil, fmt.Errorf("pod %s/%s already exists in the cluster", pod.Namespace, pod.Name) + } + + for range numCreatePodRetries { + actualPod, err := client. + CoreV1(). + Pods(pod.Namespace). + Create(ctx, pod, metav1.CreateOptions{}) + if err == nil { + return actualPod, nil + } + if errors.IsAlreadyExists(err) { + return GetPod(ctx, client, pod.Namespace, pod.Name) + } + time.Sleep(time.Second * 2) + } + return nil, fmt.Errorf("failed to create pod <%s/%s>, error: %s", + pod.Namespace, pod.Name, err) +} + +func CreatePodObject(podQueue *v2.Queue, resources v1.ResourceRequirements) *v1.Pod { + namespace := queue.GetConnectedNamespaceToQueue(podQueue) + pod := &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: utils.GenerateRandomK8sName(10), + Namespace: namespace, + Annotations: map[string]string{}, + Labels: map[string]string{ + constants.AppLabelName: "engine-e2e", + constants.QueueLabelKey: podQueue.Name, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: "nvcr.io/nvidia/base/ubuntu:22.04_20240212", + Name: "ubuntu-container", + Args: []string{ + "sleep", + "infinity", + }, + Resources: resources, + SecurityContext: DefaultSecurityContext(), + ImagePullPolicy: v1.PullIfNotPresent, + }, + }, + TerminationGracePeriodSeconds: ptr.To(int64(0)), + SchedulerName: constant.RunaiSchedulerName, + Tolerations: []v1.Toleration{ + { + Key: "nvidia.com/gpu", + Operator: v1.TolerationOpExists, + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + } + + return pod +} + +func CreatePodWithPodGroupReference(queue *v2.Queue, podGroupName string, + resources v1.ResourceRequirements) *v1.Pod { + pod := CreatePodObject(queue, resources) + pod.Annotations[PodGroupLabelName] = podGroupName + pod.Labels[PodGroupLabelName] = podGroupName + return pod +} + +func NodeAffinity(nodeName string, operator v1.NodeSelectorOperator) *v1.Affinity { + return &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: constant.NodeNamePodLabelName, + Operator: operator, + Values: []string{nodeName}, + }, + }, + }, + }, + }, + }, + } +} + +func DeleteAllPodsInNamespace( + ctx context.Context, client runtimeClient.Client, namespace string, +) error { + err := client.DeleteAllOf( + ctx, &v1.Pod{}, + runtimeClient.InNamespace(namespace), + runtimeClient.GracePeriodSeconds(0), + ) + return runtimeClient.IgnoreNotFound(err) +} + +func DeleteAllConfigMapsInNamespace( + ctx context.Context, client runtimeClient.Client, namespace string, +) error { + err := client.DeleteAllOf( + ctx, &v1.ConfigMap{}, + runtimeClient.InNamespace(namespace), + runtimeClient.GracePeriodSeconds(0), + ) + return runtimeClient.IgnoreNotFound(err) +} + +func GetPodNodeAffinitySelector(pod *v1.Pod) *v1.NodeSelector { + if pod.Spec.Affinity == nil { + pod.Spec.Affinity = &v1.Affinity{} + } + if pod.Spec.Affinity.NodeAffinity == nil { + pod.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{} + } + if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{} + } + return pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution +} diff --git a/test/e2e/modules/resources/rd/pod_group/distributed_job.go b/test/e2e/modules/resources/rd/pod_group/distributed_job.go new file mode 100644 index 000000000..fe4b8018f --- /dev/null +++ b/test/e2e/modules/resources/rd/pod_group/distributed_job.go @@ -0,0 +1,47 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package pod_group + +import ( + "context" + + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/queue" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/utils" +) + +func CreateDistributedJob( + ctx context.Context, clientset *kubernetes.Clientset, k8sClient runtimeClient.WithWatch, + ownerQueue *v2.Queue, count int, resources v1.ResourceRequirements, + priorityClassName string, +) (*v2alpha2.PodGroup, []*v1.Pod) { + podGroup := Create( + queue.GetConnectedNamespaceToQueue(ownerQueue), "distributed-pod-group"+utils.GenerateRandomK8sName(10), ownerQueue.Name) + podGroup.Spec.PriorityClassName = priorityClassName + podGroup.Spec.MinMember = int32(count) + + pods := []*v1.Pod{} + + Expect(k8sClient.Create(ctx, podGroup)).To(Succeed()) + for i := 0; i < count; i++ { + pod := rd.CreatePodObject(ownerQueue, resources) + pod.Name = "distributed-pod-" + utils.GenerateRandomK8sName(10) + pod.Annotations[PodGroupNameAnnotation] = podGroup.Name + pod.Labels[PodGroupNameAnnotation] = podGroup.Name + pod.Spec.PriorityClassName = priorityClassName + _, err := rd.CreatePod(ctx, clientset, pod) + Expect(err).To(Succeed()) + pods = append(pods, pod) + } + + return podGroup, pods +} diff --git a/test/e2e/modules/resources/rd/pod_group/pod_group.go b/test/e2e/modules/resources/rd/pod_group/pod_group.go new file mode 100644 index 000000000..7956d240c --- /dev/null +++ b/test/e2e/modules/resources/rd/pod_group/pod_group.go @@ -0,0 +1,100 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package pod_group + +import ( + "context" + "regexp" + + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + runaiClient "github.com/NVIDIA/KAI-scheduler/pkg/apis/client/clientset/versioned" + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/queue" +) + +const ( + PodGroupNameAnnotation = "pod-group-name" +) + +func Create(namespace, name, queue string) *v2alpha2.PodGroup { + podGroup := &v2alpha2.PodGroup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "scheduling.run.ai/v2alpha2", + Kind: "PodGroup", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: map[string]string{}, + Labels: map[string]string{ + constants.AppLabelName: "engine-e2e", + }, + }, + Spec: v2alpha2.PodGroupSpec{ + MinMember: 1, + Queue: queue, + }, + } + return &*podGroup +} + +func DeleteAllInNamespace( + ctx context.Context, client runtimeClient.Client, namespace string, +) error { + err := client.DeleteAllOf( + ctx, &v2alpha2.PodGroup{}, + runtimeClient.InNamespace(namespace), + runtimeClient.GracePeriodSeconds(0), + ) + return runtimeClient.IgnoreNotFound(err) +} + +func CreateWithPods(ctx context.Context, client *kubernetes.Clientset, runaiClient *runaiClient.Clientset, + podGroupName string, q *v2.Queue, numPods int, priorityClassName *string, + requirements v1.ResourceRequirements) (*v2alpha2.PodGroup, []*v1.Pod) { + namespace := queue.GetConnectedNamespaceToQueue(q) + podGroup := Create(namespace, podGroupName, q.Name) + if priorityClassName != nil { + podGroup.Spec.PriorityClassName = *priorityClassName + } + podGroup, err := runaiClient.SchedulingV2alpha2().PodGroups(namespace).Create(ctx, podGroup, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + + var pods []*v1.Pod + for i := 0; i < numPods; i++ { + pod := createPod(ctx, client, q, podGroupName, requirements) + pods = append(pods, pod) + } + return podGroup, pods +} + +func IsNotReadyForScheduling(event *v1.Event) bool { + if event.Type != v1.EventTypeNormal || event.Reason != "NotReady" { + return false + } + match, err := regexp.MatchString( + "Job is not ready for scheduling. Waiting for \\d pods, currently \\d existing", + event.Message) + Expect(err).To(Succeed()) + return match +} + +func createPod(ctx context.Context, client *kubernetes.Clientset, queue *v2.Queue, + podGroupName string, requirements v1.ResourceRequirements) *v1.Pod { + pod := rd.CreatePodObject(queue, requirements) + pod.Annotations[PodGroupNameAnnotation] = podGroupName + pod.Labels[PodGroupNameAnnotation] = podGroupName + pod, err := rd.CreatePod(ctx, client, pod) + Expect(err).To(Succeed()) + return pod +} diff --git a/test/e2e/modules/resources/rd/queue/queue.go b/test/e2e/modules/resources/rd/queue/queue.go new file mode 100644 index 000000000..dab766f47 --- /dev/null +++ b/test/e2e/modules/resources/rd/queue/queue.go @@ -0,0 +1,126 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package queue + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + + runaiClient "github.com/NVIDIA/KAI-scheduler/pkg/apis/client/clientset/versioned" + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" +) + +func Create(runaiClientset *runaiClient.Clientset, ctx context.Context, queue *v2.Queue, + opts metav1.CreateOptions) (result *v2.Queue, err error) { + result = &v2.Queue{} + err = runaiClientset.SchedulingV2().RESTClient().Post(). + Resource("queues"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(queue). + Do(ctx). + Into(result) + return result, err +} + +func Delete(runaiClientset *runaiClient.Clientset, ctx context.Context, name string, + opts metav1.DeleteOptions) error { + return runaiClientset.SchedulingV2().RESTClient().Delete(). + Resource("queues"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +func GetAllQueues(runaiClientset *runaiClient.Clientset, ctx context.Context) (*v2.QueueList, error) { + return runaiClientset.SchedulingV2().Queues("").List(ctx, metav1.ListOptions{}) +} + +func CreateQueueObject(name string, parentQueueName string) *v2.Queue { + queue := &v2.Queue{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "scheduling.run.ai/v2", + Kind: "Queue", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "project": name, + constants.AppLabelName: "engine-e2e", + }, + }, + Spec: v2.QueueSpec{ + ParentQueue: parentQueueName, + Resources: &v2.QueueResources{ + GPU: v2.QueueResource{ + Quota: -1, + OverQuotaWeight: 1, + Limit: -1, + }, + CPU: v2.QueueResource{ + Quota: -1, + OverQuotaWeight: 1, + Limit: -1, + }, + Memory: v2.QueueResource{ + Quota: -1, + OverQuotaWeight: 1, + Limit: -1, + }, + }, + }, + } + return queue +} + +func CreateQueueObjectWithGpuResource(name string, gpuResource v2.QueueResource, + parentQueueName string) *v2.Queue { + queue := &v2.Queue{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "scheduling.run.ai/v2", + Kind: "Queue", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "project": name, + constants.AppLabelName: "engine-e2e", + }, + }, + Spec: v2.QueueSpec{ + ParentQueue: parentQueueName, + Resources: &v2.QueueResources{ + GPU: gpuResource, + CPU: v2.QueueResource{ + Quota: -1, + OverQuotaWeight: 1, + Limit: -1, + }, + Memory: v2.QueueResource{ + Quota: -1, + OverQuotaWeight: 1, + Limit: -1, + }, + }, + }, + } + return queue +} + +func GetConnectedNamespaceToQueue(q *v2.Queue) string { + return "runai-" + q.Name +} + +func ConnectQueuesWithSharedParent(parentQueue *v2.Queue, childrenQueues ...*v2.Queue) { + parentQueue.Spec.ParentQueue = "" + parentQueue.Labels["run.ai/department-queue"] = "true" + + for _, childQueue := range childrenQueues { + childQueue.Spec.ParentQueue = parentQueue.Name + } +} diff --git a/test/e2e/modules/resources/rd/role_binding.go b/test/e2e/modules/resources/rd/role_binding.go new file mode 100644 index 000000000..845d588b0 --- /dev/null +++ b/test/e2e/modules/resources/rd/role_binding.go @@ -0,0 +1,39 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package rd + +import ( + "fmt" + + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func CreateRoleBindingObject(name string, clusterRoleRefName string, serviceAccountName string, + namespace string) *rbacv1.RoleBinding { + rb := &rbacv1.RoleBinding{ + TypeMeta: metav1.TypeMeta{ + APIVersion: fmt.Sprintf("%s/%s", + rbacv1.SchemeGroupVersion.Group, rbacv1.SchemeGroupVersion.Version), + Kind: "RoleBinding", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: rbacv1.SchemeGroupVersion.Group, + Kind: "ClusterRole", + Name: clusterRoleRefName, + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: serviceAccountName, + Namespace: namespace, + }, + }, + } + return rb +} diff --git a/test/e2e/modules/resources/rd/security_context.go b/test/e2e/modules/resources/rd/security_context.go new file mode 100644 index 000000000..7fc874a67 --- /dev/null +++ b/test/e2e/modules/resources/rd/security_context.go @@ -0,0 +1,26 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package rd + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/utils/ptr" +) + +// DefaultSecurityContext Set container security context that is valid by default openshift clusters +func DefaultSecurityContext() *v1.SecurityContext { + return &v1.SecurityContext{ + AllowPrivilegeEscalation: ptr.To(false), + RunAsNonRoot: ptr.To(true), + RunAsUser: ptr.To(int64(1000)), + Capabilities: &v1.Capabilities{ + Add: []v1.Capability{}, + Drop: []v1.Capability{}, + }, + SeccompProfile: &v1.SeccompProfile{ + Type: v1.SeccompProfileTypeRuntimeDefault, + }, + } +} diff --git a/test/e2e/modules/utils/logs.go b/test/e2e/modules/utils/logs.go new file mode 100644 index 000000000..a1f841aed --- /dev/null +++ b/test/e2e/modules/utils/logs.go @@ -0,0 +1,69 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package utils + +import ( + "context" + "flag" + "fmt" + "strings" + + "github.com/go-logr/logr" + "go.uber.org/zap/zapcore" + v1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" +) + +func SetLogger() { + opts := zap.Options{ + Development: true, + TimeEncoder: zapcore.ISO8601TimeEncoder, + } + opts.BindFlags(flag.CommandLine) + flag.Parse() + ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) +} + +func LogClusterState(client runtimeClient.WithWatch, logger logr.Logger) { + e2ePods := &v1.PodList{} + err := client.List(context.Background(), e2ePods, runtimeClient.MatchingLabels(map[string]string{ + constants.AppLabelName: "engine-e2e", + })) + if err != nil { + return + } + logger.Info(fmt.Sprintf("Falied test cluster state - E2e pods: \n%v", podListPrinting(e2ePods))) +} + +func podListPrinting(podList *v1.PodList) string { + var podListRepresentationString strings.Builder + + for _, pod := range podList.Items { + podListRepresentationString.WriteString( + fmt.Sprintf("Namespace: %s, Name: %s, Phase: %s, Node:%s, Annotations: %v\n", + pod.Namespace, pod.Name, + pod.Status.Phase, pod.Spec.NodeName, pod.Annotations), + ) + for _, container := range pod.Spec.Containers { + podListRepresentationString.WriteString( + fmt.Sprintf("\tContainer: %s, resources: %v\n", container.Name, container.Resources), + ) + } + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodScheduled && condition.Status == v1.ConditionFalse { + podListRepresentationString.WriteString( + fmt.Sprintf("\tScheduling falied: %s\n", condition.Message), + ) + } + } + + } + + return podListRepresentationString.String() +} diff --git a/test/e2e/modules/utils/name_generators.go b/test/e2e/modules/utils/name_generators.go new file mode 100644 index 000000000..abb195dd4 --- /dev/null +++ b/test/e2e/modules/utils/name_generators.go @@ -0,0 +1,27 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package utils + +import ( + "math/rand" + "time" +) + +var generatedNames = make(map[string]bool) + +func GenerateRandomK8sName(l int) string { + str := "abcdefghijklmnopqrstuvwxyz" + bytes := []byte(str) + var result []byte + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < l; i++ { + result = append(result, bytes[r.Intn(len(bytes))]) + } + if generatedNames[string(result)] { + return GenerateRandomK8sName(l) + } + generatedNames[string(result)] = true + return string(result) +} diff --git a/test/e2e/modules/utils/random.go b/test/e2e/modules/utils/random.go new file mode 100644 index 000000000..6475b1ec0 --- /dev/null +++ b/test/e2e/modules/utils/random.go @@ -0,0 +1,11 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package utils + +import "math/rand" + +func RandomIntBetween(min, max int) int { + return rand.Intn(max-min) + min +} diff --git a/test/e2e/modules/wait/pod.go b/test/e2e/modules/wait/pod.go new file mode 100644 index 000000000..2e10b368d --- /dev/null +++ b/test/e2e/modules/wait/pod.go @@ -0,0 +1,149 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package wait + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/wait/watcher" +) + +type checkCondition func(watch.Event) bool +type checkPodCondition func(*v1.Pod) bool + +func podEventCheck(condition checkPodCondition) func(event watch.Event) bool { + return func(event watch.Event) bool { + pod, ok := event.Object.(*v1.Pod) + if !ok { + return false + } + return condition(pod) + } +} + +func ForAtLeastNPodsScheduled(ctx context.Context, client runtimeClient.WithWatch, namespace string, + pods []*v1.Pod, minRequired int) { + pw := watcher.NewPodsWatcher(client, podEventCheck(rd.IsPodScheduled), namespace, pods, minRequired) + if !watcher.ForEvent(ctx, client, pw) { + Fail(fmt.Sprintf("Failed to wait for at least %d pods to be scheduled in namespace %s", + minRequired, namespace)) + } +} + +func ForAtLeastNPodsUnschedulable(ctx context.Context, client runtimeClient.WithWatch, namespace string, + pods []*v1.Pod, minRequired int) { + pw := watcher.NewPodsWatcher(client, podEventCheck(rd.IsPodUnschedulable), namespace, pods, minRequired) + if !watcher.ForEvent(ctx, client, pw) { + Fail(fmt.Sprintf("Failed to wait for at least %d pods to be unschedulable in namespace %s", + minRequired, namespace)) + } +} + +func ForPodsScheduled(ctx context.Context, client runtimeClient.WithWatch, namespace string, pods []*v1.Pod) { + pw := watcher.NewPodsWatcher(client, podEventCheck(rd.IsPodScheduled), namespace, pods, len(pods)) + if !watcher.ForEvent(ctx, client, pw) { + Fail(fmt.Sprintf("Failed to wait for pods to be scheduled in namespace %s", namespace)) + } +} + +func ForPodsReady(ctx context.Context, client runtimeClient.WithWatch, namespace string, pods []*v1.Pod) { + pw := watcher.NewPodsWatcher(client, podEventCheck(rd.IsPodReady), namespace, pods, len(pods)) + if !watcher.ForEvent(ctx, client, pw) { + Fail(fmt.Sprintf("Failed to wait for pods to be ready in namespace %s", namespace)) + } +} + +func ForPodScheduled(ctx context.Context, client runtimeClient.WithWatch, pod *v1.Pod) { + pw := watcher.NewPodsWatcher(client, podEventCheck(rd.IsPodScheduled), pod.Namespace, []*v1.Pod{pod}, 1) + if !watcher.ForEvent(ctx, client, pw) { + Fail(fmt.Sprintf("Failed to wait for pod %s/%s to be scheduled", pod.Namespace, pod.Name)) + } +} + +func ForPodReady(ctx context.Context, client runtimeClient.WithWatch, pod *v1.Pod) { + pw := watcher.NewPodsWatcher(client, podEventCheck(rd.IsPodReady), pod.Namespace, []*v1.Pod{pod}, 1) + if !watcher.ForEvent(ctx, client, pw) { + Fail(fmt.Sprintf("Failed to wait for pod %s/%s to be ready", pod.Namespace, pod.Name)) + } +} + +func ForPodUnschedulable(ctx context.Context, client runtimeClient.WithWatch, pod *v1.Pod) { + pw := watcher.NewPodsWatcher(client, podEventCheck(rd.IsPodUnschedulable), pod.Namespace, []*v1.Pod{pod}, 1) + if !watcher.ForEvent(ctx, client, pw) { + Fail(fmt.Sprintf("Failed to wait for pod %s/%s to be unschedulable", pod.Namespace, pod.Name)) + } +} + +func ForPodSucceededOrError(ctx context.Context, client runtimeClient.WithWatch, pod *v1.Pod) { + pw := watcher.NewPodsWatcher(client, podEventCheck(rd.IsPodEnded), pod.Namespace, []*v1.Pod{pod}, 1) + if !watcher.ForEvent(ctx, client, pw) { + Fail(fmt.Sprintf("Failed to wait for pod %s/%s to finish (success or error)", pod.Namespace, pod.Name)) + } +} + +func ForAtLeastOnePodCreation(ctx context.Context, client runtimeClient.WithWatch, selector metav1.LabelSelector) { + ForAtLeastNPodCreation(ctx, client, selector, 1) +} + +func ForAtLeastNPodCreation(ctx context.Context, client runtimeClient.WithWatch, selector metav1.LabelSelector, n int) { + condition := func(event watch.Event) bool { + podsListObj, ok := event.Object.(*v1.PodList) + if !ok { + return false + } + return len(podsListObj.Items) >= n + } + pw := watcher.NewGenericWatcher[v1.PodList](client, condition, runtimeClient.MatchingLabels(selector.MatchLabels)) + if !watcher.ForEvent(ctx, client, pw) { + Fail(fmt.Sprintf("Failed to watch for %d pods creation with selector <%v>", n, selector)) + } +} + +func ForPodsWithCondition( + ctx context.Context, client runtimeClient.WithWatch, checkCondition checkCondition, + listOptions ...runtimeClient.ListOption) { + condition := func(event watch.Event) bool { + _, ok := event.Object.(*v1.PodList) + if !ok { + return false + } + return checkCondition(event) + } + pw := watcher.NewGenericWatcher[v1.PodList](client, condition, listOptions...) + if !watcher.ForEvent(ctx, client, pw) { + Fail("Failed to watch pods for condition") + } +} + +func ForPodsToBeDeleted(ctx context.Context, client runtimeClient.WithWatch, listOptions ...runtimeClient.ListOption) { + condition := func(event watch.Event) bool { + podsListObj, ok := event.Object.(*v1.PodList) + if !ok { + return false + } + return len(podsListObj.Items) == 0 + } + pw := watcher.NewGenericWatcher[v1.PodList](client, condition, listOptions...) + if !watcher.ForEvent(ctx, client, pw) { + Fail("Failed to wait for pods to be deleted") + } +} + +func ForNoE2EPods(ctx context.Context, client runtimeClient.WithWatch) { + ForPodsToBeDeleted(ctx, client, runtimeClient.MatchingLabels{constants.AppLabelName: "engine-e2e"}) +} + +func ForNoReservationPods(ctx context.Context, client runtimeClient.WithWatch) { + ForPodsToBeDeleted(ctx, client, runtimeClient.InNamespace(constants.RunaiReservationNamespace)) +} diff --git a/test/e2e/modules/wait/system_pods.go b/test/e2e/modules/wait/system_pods.go new file mode 100644 index 000000000..e3e2a55bf --- /dev/null +++ b/test/e2e/modules/wait/system_pods.go @@ -0,0 +1,81 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package wait + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/constant" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/wait/watcher" +) + +func ForRunaiComponentPod( + ctx context.Context, client runtimeClient.WithWatch, + appLabelComponentName string, condition checkCondition, +) { + pw := watcher.NewGenericWatcher[v1.PodList](client, watcher.CheckCondition(condition), + runtimeClient.InNamespace(constant.SystemPodsNamespace), + runtimeClient.MatchingLabels{constants.AppLabelName: appLabelComponentName}) + + if !watcher.ForEvent(ctx, client, pw) { + Fail(fmt.Sprintf("Failed to wait for %s pod", appLabelComponentName)) + } +} + +func ForExactlyNComponentPodsToExist( + ctx context.Context, client runtimeClient.WithWatch, + appLabelComponentName string, n int, +) { + ForPodsWithCondition(ctx, client, func(event watch.Event) bool { + pods, ok := event.Object.(*v1.PodList) + if !ok { + return false + } + return len(pods.Items) == n + }, + runtimeClient.InNamespace(constants.SystemPodsNamespace), + runtimeClient.MatchingLabels{constants.AppLabelName: appLabelComponentName}, + ) +} + +func ForRunningSchedulerPodEvent(ctx context.Context, client runtimeClient.WithWatch, schedulerAppName string) { + runningCondition := func(event watch.Event) bool { + podListObj, ok := event.Object.(*v1.PodList) + if !ok { + Fail(fmt.Sprintf("Failed to process event for pod %s", event.Object)) + } + if len(podListObj.Items) != 1 { + return false + } + + objPod := &podListObj.Items[0] + return rd.IsPodRunning(objPod) + } + ForRunaiComponentPod(ctx, client, schedulerAppName, runningCondition) +} + +func ForRunningBinderPodEvent(ctx context.Context, client runtimeClient.WithWatch) { + runningCondition := func(event watch.Event) bool { + podListObj, ok := event.Object.(*v1.PodList) + if !ok { + Fail(fmt.Sprintf("Failed to process event for pod %s", event.Object)) + } + if len(podListObj.Items) != 1 { + return false + } + + objPod := &podListObj.Items[0] + return rd.IsPodRunning(objPod) + } + ForRunaiComponentPod(ctx, client, "binder", runningCondition) +} diff --git a/test/e2e/modules/wait/watcher/generic.go b/test/e2e/modules/wait/watcher/generic.go new file mode 100644 index 000000000..d13ee1f03 --- /dev/null +++ b/test/e2e/modules/wait/watcher/generic.go @@ -0,0 +1,67 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package watcher + +import ( + "context" + "fmt" + "reflect" + + . "github.com/onsi/ginkgo/v2" + "k8s.io/apimachinery/pkg/watch" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" +) + +type GenericWatcher[T any, PT interface { + *T + runtimeClient.ObjectList +}] struct { + client runtimeClient.WithWatch + condition CheckCondition + listOptions []runtimeClient.ListOption + isSatisfied bool +} + +func NewGenericWatcher[T any, PT interface { + *T + runtimeClient.ObjectList +}]( + client runtimeClient.WithWatch, condition CheckCondition, listOptions ...runtimeClient.ListOption, +) *GenericWatcher[T, PT] { + return &GenericWatcher[T, PT]{ + client, + condition, + listOptions, + false, + } +} + +func (w *GenericWatcher[T, PT]) watch(ctx context.Context) watch.Interface { + var objectList T + pObjectList := PT(&objectList) + watcher, err := w.client.Watch(ctx, pObjectList, w.listOptions...) + if err != nil { + Fail(fmt.Sprintf("Failed to create watcher for type: %v. err: %v", reflect.TypeOf(pObjectList), err)) + } + return watcher +} + +func (w *GenericWatcher[T, PT]) sync(ctx context.Context) { + var objectList T + pObjectList := PT(&objectList) + err := w.client.List(ctx, pObjectList, w.listOptions...) + if err != nil { + Fail(fmt.Sprintf("Failed to list type: %v, err: %v", reflect.TypeOf(pObjectList), err)) + } + w.isSatisfied = w.condition(watch.Event{Object: pObjectList}) +} + +func (w *GenericWatcher[T, PT]) processEvent(ctx context.Context, _ watch.Event) { + w.sync(ctx) +} + +func (w *GenericWatcher[T, PT]) satisfied() bool { + return w.isSatisfied +} diff --git a/test/e2e/modules/wait/watcher/pods.go b/test/e2e/modules/wait/watcher/pods.go new file mode 100644 index 000000000..7b253b0d3 --- /dev/null +++ b/test/e2e/modules/wait/watcher/pods.go @@ -0,0 +1,95 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package watcher + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" +) + +type PodsWatcher struct { + client runtimeClient.WithWatch + condition CheckCondition + namespace string + satisfiedMap map[string]bool + minRequired int +} + +func NewPodsWatcher( + client runtimeClient.WithWatch, condition CheckCondition, namespace string, pods []*corev1.Pod, minRequired int, +) *PodsWatcher { + return &PodsWatcher{ + client, + condition, + namespace, + initSatisfiedPodsMap(pods), + minRequired, + } +} + +func initSatisfiedPodsMap(pods []*corev1.Pod) map[string]bool { + satisfiedPodsMap := map[string]bool{} + for _, pod := range pods { + podKey := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + satisfiedPodsMap[podKey.String()] = false + } + return satisfiedPodsMap +} + +func (w *PodsWatcher) watch(ctx context.Context) watch.Interface { + podsList := &corev1.PodList{} + watcher, err := w.client.Watch(ctx, podsList, runtimeClient.InNamespace(w.namespace)) + if err != nil { + Fail(fmt.Sprintf("Failed to create watcher for pods: %v", err)) + } + return watcher +} + +func (w *PodsWatcher) sync(ctx context.Context) { + podsList := &corev1.PodList{} + err := w.client.List(ctx, podsList, runtimeClient.InNamespace(w.namespace)) + if err != nil { + Fail(fmt.Sprintf("Failed to list pods: %v", err)) + } + + for _, pod := range podsList.Items { + podKey := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + alreadySatisfied, relevant := w.satisfiedMap[podKey.String()] + if !relevant || alreadySatisfied { + continue + } + w.satisfiedMap[podKey.String()] = w.condition(watch.Event{Object: &pod}) + } +} + +func (w *PodsWatcher) processEvent(_ context.Context, event watch.Event) { + pod, ok := event.Object.(*corev1.Pod) + if !ok { + return + } + + podKey := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + alreadySatisfied, relevant := w.satisfiedMap[podKey.String()] + if !relevant || alreadySatisfied { + return + } + w.satisfiedMap[podKey.String()] = w.condition(event) +} + +func (w *PodsWatcher) satisfied() bool { + counter := 0 + for _, satisfied := range w.satisfiedMap { + if satisfied { + counter += 1 + } + } + return counter >= w.minRequired +} diff --git a/test/e2e/modules/wait/watcher/poll.go b/test/e2e/modules/wait/watcher/poll.go new file mode 100644 index 000000000..760dae2f4 --- /dev/null +++ b/test/e2e/modules/wait/watcher/poll.go @@ -0,0 +1,42 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package watcher + +import ( + "context" + + "k8s.io/apimachinery/pkg/watch" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" +) + +type PollCondition func() bool + +type PollWatcher struct { + condition PollCondition + isSatisfied bool +} + +func NewPollWatcher(client runtimeClient.WithWatch, condition PollCondition) *PollWatcher { + return &PollWatcher{ + condition, + false, + } +} + +func (w *PollWatcher) watch(context.Context) watch.Interface { + return watch.NewEmptyWatch() +} + +func (w *PollWatcher) sync(context.Context) { + w.isSatisfied = w.condition() +} + +func (w *PollWatcher) processEvent(ctx context.Context, _ watch.Event) { + w.sync(ctx) +} + +func (w *PollWatcher) satisfied() bool { + return w.isSatisfied +} diff --git a/test/e2e/modules/wait/watcher/watcher.go b/test/e2e/modules/wait/watcher/watcher.go new file mode 100644 index 000000000..382715d61 --- /dev/null +++ b/test/e2e/modules/wait/watcher/watcher.go @@ -0,0 +1,106 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package watcher + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "k8s.io/apimachinery/pkg/watch" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/utils" +) + +type Interface interface { + watch(ctx context.Context) watch.Interface + sync(ctx context.Context) + processEvent(ctx context.Context, event watch.Event) + satisfied() bool +} + +type CheckCondition func(watch.Event) bool + +var ( + FlowTimeout = 5 * time.Minute +) + +func ForEvent(ctx context.Context, client runtimeClient.WithWatch, eventWatcher Interface) bool { + return ForEventCustomTimeout(ctx, client, eventWatcher, FlowTimeout) +} + +func ForEventCustomTimeout(ctx context.Context, client runtimeClient.WithWatch, eventWatcher Interface, + waitTime time.Duration) bool { + logger := log.FromContext(ctx) + + watcher := eventWatcher.watch(ctx) + defer watcher.Stop() + + eventWatcher.sync(ctx) + if eventWatcher.satisfied() { + return true + } + + timer := time.NewTimer(waitTime) + defer timer.Stop() + + pollInterval := 2 * time.Second + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + logger.Error(ctx.Err(), "WaitForEvent has been canceled") + utils.LogClusterState(client, logger) + return true + } + return false + case <-timer.C: + eventWatcher.sync(ctx) + if eventWatcher.satisfied() { + return true + } + logger.Error(nil, "WaitForEvent timed out") + utils.LogClusterState(client, logger) + return false + case <-ticker.C: + eventWatcher.sync(ctx) + if eventWatcher.satisfied() { + return true + } + case event := <-watcher.ResultChan(): + if event.Type == watch.Error { + err := ignoreContextCancelled(event) + if err != nil { + logger.Error(err, "Error event received for WaitForEvent function") + utils.LogClusterState(client, logger) + return false + } + return true + } + + eventWatcher.processEvent(ctx, event) + if eventWatcher.satisfied() { + return true + } + } + } +} + +func ignoreContextCancelled(event watch.Event) error { + objectAsString := fmt.Sprintf("%v", event.Object) + if strings.Contains(objectAsString, "context cancelled") { + fmt.Printf("Cancellation event received %v, %v", event.Type, event.Object) + return nil + } + + return fmt.Errorf("error event received %v, %v", event.Type, event.Object) +} diff --git a/test/e2e/suites/allocate/applying_options/appying_options_suite_test.go b/test/e2e/suites/allocate/applying_options/appying_options_suite_test.go new file mode 100644 index 000000000..944be13e0 --- /dev/null +++ b/test/e2e/suites/allocate/applying_options/appying_options_suite_test.go @@ -0,0 +1,20 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package applying_options + +import ( + "testing" + + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/utils" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestPredicates(t *testing.T) { + utils.SetLogger() + RegisterFailHandler(Fail) + RunSpecs(t, "Applying options Suite") +} diff --git a/test/e2e/suites/allocate/applying_options/namespace_test.go b/test/e2e/suites/allocate/applying_options/namespace_test.go new file mode 100644 index 000000000..800215b32 --- /dev/null +++ b/test/e2e/suites/allocate/applying_options/namespace_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package applying_options + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/constant/labels" + testcontext "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/context" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/capacity" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/queue" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/utils" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/wait" +) + +const numCreatePodRetries = 10 + +var _ = Describe("Namespace options", Ordered, func() { + var ( + testCtx *testcontext.TestContext + ) + + BeforeAll(func(ctx context.Context) { + testCtx = testcontext.GetConnectivity(ctx, Default) + parentQueue := queue.CreateQueueObject(utils.GenerateRandomK8sName(10), "") + childQueue := queue.CreateQueueObject(utils.GenerateRandomK8sName(10), parentQueue.Name) + testCtx.InitQueues([]*v2.Queue{childQueue, parentQueue}) + }) + + AfterAll(func(ctx context.Context) { + testCtx.ClusterCleanup(ctx) + }) + + AfterEach(func(ctx context.Context) { + testCtx.TestContextCleanup(ctx) + }) + + It("Namespace in Request", func(ctx context.Context) { + pod := rd.CreatePodObject(testCtx.Queues[0], v1.ResourceRequirements{}) + + targetNamespace := pod.Namespace + pod.Namespace = "" // Clean target namespace from the pod object + + actualPod, err := createPodInNamespace(ctx, testCtx.KubeClientset, pod, targetNamespace) + Expect(err).NotTo(HaveOccurred()) + + wait.ForPodScheduled(ctx, testCtx.ControllerClient, actualPod) + }) + + Context("Binding Webhook", func() { + BeforeAll(func(ctx context.Context) { + capacity.SkipIfInsufficientClusterResources(testCtx.KubeClientset, + &capacity.ResourceList{ + Gpu: resource.MustParse("1"), + PodCount: 1, + }, + ) + }) + + It("Fraction GPU request", Label(labels.ReservationPod), func(ctx context.Context) { + pod := rd.CreatePodObject(testCtx.Queues[0], v1.ResourceRequirements{}) + pod.Annotations = map[string]string{ + constants.RunaiGpuFraction: "0.5", + } + + targetNamespace := pod.Namespace + pod.Namespace = "" // Clean target namespace from the pod object + + actualPod, err := createPodInNamespace(ctx, testCtx.KubeClientset, pod, targetNamespace) + Expect(err).NotTo(HaveOccurred()) + + wait.ForPodReady(ctx, testCtx.ControllerClient, actualPod) + }) + }) +}) + +func createPodInNamespace(ctx context.Context, client *kubernetes.Clientset, pod *v1.Pod, + namespace string) (*v1.Pod, error) { + actualPod, err := rd.GetPod(ctx, client, pod.Namespace, pod.Name) + if err == nil { + // pod is not expected to exist in the cluster + return nil, fmt.Errorf("pod %s/%s already exists in the cluster", namespace, pod.Name) + } + + for range numCreatePodRetries { + actualPod, err = client. + CoreV1(). + Pods(namespace). + Create(ctx, pod, metav1.CreateOptions{}) + if err == nil { + return actualPod, nil + } + if errors.IsAlreadyExists(err) { + return rd.GetPod(ctx, client, pod.Namespace, pod.Name) + } + time.Sleep(time.Second * 2) + } + return nil, fmt.Errorf("failed to create pod <%s/%s>, error: %s", + pod.Namespace, pod.Name, err) +}