Skip to content

Commit b5f70d2

Browse files
authored
add queue controller tests (#231)
1 parent ac873ab commit b5f70d2

File tree

3 files changed

+221
-15
lines changed

3 files changed

+221
-15
lines changed

CHANGELOG.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
66

77
## [Unreleased]
88

9-
### Added
10-
- Adding queue controller that will update queues statuses with allocation data.
11-
129
### Fixes
1310
- Fixed pod status scheduled race condition between the scheduler and the pod binding
1411
- Removed redundant `replicas` key for binder from `values.yaml` as it is not used and not supported
@@ -17,7 +14,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
1714
- Removed `runai-job-id` and `runai/job-id` annotations from pods and podgroups
1815

1916
### Added
20-
- Added [minruntime](docs/plugins/minruntime.md) plugin, allowing PodGroups to run for a configurable amount of time without being reclaimed/preempted.
17+
- Added [minruntime](docs/plugins/minruntime.md) plugin, allowing PodGroups to run for a configurable amount of time without being reclaimed/preempted.
18+
- Queue Controller that will update queues statuses with allocation data.
2119

2220

2321
## [v0.5.1] - 2025-05-20

pkg/queuecontroller/controllers/resource_updater/resource_updater.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list;watch
1919
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues/status,verbs=get;update;patch
20+
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=podgroups,verbs=get;list
2021

2122
type ResourceUpdater struct {
2223
client.Client

pkg/queuecontroller/controllers/suite_test.go

Lines changed: 218 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,44 @@ limitations under the License.
1717
package controllers
1818

1919
import (
20+
"context"
2021
"path/filepath"
2122
"testing"
23+
"time"
2224

2325
. "github.com/onsi/ginkgo/v2"
26+
"github.com/onsi/gomega"
2427
. "github.com/onsi/gomega"
2528

29+
v1 "k8s.io/api/core/v1"
30+
"k8s.io/apimachinery/pkg/api/resource"
31+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/apimachinery/pkg/types"
2633
"k8s.io/client-go/kubernetes/scheme"
2734
"k8s.io/client-go/rest"
35+
ctrl "sigs.k8s.io/controller-runtime"
2836
"sigs.k8s.io/controller-runtime/pkg/client"
2937
"sigs.k8s.io/controller-runtime/pkg/envtest"
30-
//"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
3138
logf "sigs.k8s.io/controller-runtime/pkg/log"
3239
"sigs.k8s.io/controller-runtime/pkg/log/zap"
33-
//+kubebuilder:scaffold:imports
40+
41+
v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2"
42+
"github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
3443
)
3544

36-
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
37-
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
45+
const (
46+
timeout = time.Second * 10
47+
interval = time.Millisecond * 250
48+
)
3849

39-
var cfg *rest.Config
40-
var k8sClient client.Client
41-
var testEnv *envtest.Environment
50+
var (
51+
cfg *rest.Config
52+
k8sClient client.Client
53+
testEnv *envtest.Environment
54+
ctx context.Context
55+
cancel context.CancelFunc
56+
managerDone chan struct{}
57+
)
4258

4359
func TestAPIs(t *testing.T) {
4460
RegisterFailHandler(Fail)
@@ -51,17 +67,20 @@ var _ = BeforeSuite(func() {
5167

5268
By("bootstrapping test environment")
5369
testEnv = &envtest.Environment{
54-
CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")},
55-
ErrorIfCRDPathMissing: false,
70+
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "deployments", "kai-scheduler", "crds")},
71+
ErrorIfCRDPathMissing: true,
5672
}
5773

5874
var err error
59-
// cfg is defined in this file globally.
6075
cfg, err = testEnv.Start()
6176
Expect(err).NotTo(HaveOccurred())
6277
Expect(cfg).NotTo(BeNil())
6378

64-
//+kubebuilder:scaffold:scheme
79+
err = v2.AddToScheme(scheme.Scheme)
80+
Expect(err).NotTo(HaveOccurred())
81+
82+
err = v2alpha2.AddToScheme(scheme.Scheme)
83+
Expect(err).NotTo(HaveOccurred())
6584

6685
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
6786
Expect(err).NotTo(HaveOccurred())
@@ -74,3 +93,191 @@ var _ = AfterSuite(func() {
7493
err := testEnv.Stop()
7594
Expect(err).NotTo(HaveOccurred())
7695
})
96+
97+
var _ = Describe("QueueController", Ordered, func() {
98+
var (
99+
mgr ctrl.Manager
100+
controller *QueueReconciler
101+
)
102+
103+
BeforeAll(func() {
104+
ctx, cancel = context.WithCancel(context.Background())
105+
106+
var err error
107+
mgr, err = ctrl.NewManager(cfg, ctrl.Options{
108+
Scheme: scheme.Scheme,
109+
})
110+
Expect(err).ToNot(HaveOccurred())
111+
112+
controller = &QueueReconciler{
113+
Client: mgr.GetClient(),
114+
Scheme: mgr.GetScheme(),
115+
}
116+
117+
err = controller.SetupWithManager(mgr, "kai.scheduler/queue")
118+
Expect(err).ToNot(HaveOccurred())
119+
120+
managerDone = make(chan struct{})
121+
go func() {
122+
defer close(managerDone)
123+
err := mgr.Start(ctx)
124+
Expect(err).ToNot(HaveOccurred())
125+
}()
126+
})
127+
128+
AfterAll(func() {
129+
cancel()
130+
<-managerDone
131+
})
132+
133+
Context("When managing child queues", func() {
134+
It("Should update parent queue's childQueues field", func() {
135+
parentQueue := &v2.Queue{
136+
ObjectMeta: metav1.ObjectMeta{
137+
Name: "parent-queue",
138+
},
139+
Spec: v2.QueueSpec{},
140+
}
141+
Expect(k8sClient.Create(ctx, parentQueue)).Should(Succeed())
142+
143+
childQueue1 := &v2.Queue{
144+
ObjectMeta: metav1.ObjectMeta{
145+
Name: "child-queue-1",
146+
},
147+
Spec: v2.QueueSpec{
148+
ParentQueue: "parent-queue",
149+
},
150+
}
151+
Expect(k8sClient.Create(ctx, childQueue1)).Should(Succeed())
152+
153+
childQueue2 := &v2.Queue{
154+
ObjectMeta: metav1.ObjectMeta{
155+
Name: "child-queue-2",
156+
},
157+
Spec: v2.QueueSpec{
158+
ParentQueue: "parent-queue",
159+
},
160+
}
161+
Expect(k8sClient.Create(ctx, childQueue2)).Should(Succeed())
162+
163+
Eventually(func() []string {
164+
var updatedParentQueue v2.Queue
165+
err := k8sClient.Get(ctx, types.NamespacedName{Name: "parent-queue"}, &updatedParentQueue)
166+
if err != nil {
167+
return nil
168+
}
169+
return updatedParentQueue.Status.ChildQueues
170+
}, timeout, interval).Should(ContainElements("child-queue-1", "child-queue-2"))
171+
})
172+
})
173+
174+
Context("When managing pod groups", func() {
175+
It("Should update queue status with pod group resources", func() {
176+
queue := &v2.Queue{
177+
ObjectMeta: metav1.ObjectMeta{
178+
Name: "resource-queue",
179+
},
180+
Spec: v2.QueueSpec{},
181+
}
182+
Expect(k8sClient.Create(ctx, queue)).Should(Succeed())
183+
184+
podGroup1 := &v2alpha2.PodGroup{
185+
ObjectMeta: metav1.ObjectMeta{
186+
Name: "pod-group-1",
187+
Namespace: "default",
188+
Labels: map[string]string{
189+
"kai.scheduler/queue": "resource-queue",
190+
},
191+
},
192+
Spec: v2alpha2.PodGroupSpec{
193+
Queue: "resource-queue",
194+
MinMember: 1,
195+
},
196+
}
197+
Expect(k8sClient.Create(ctx, podGroup1)).Should(Succeed())
198+
199+
podGroup2 := &v2alpha2.PodGroup{
200+
ObjectMeta: metav1.ObjectMeta{
201+
Name: "pod-group-2",
202+
Namespace: "default",
203+
Labels: map[string]string{
204+
"kai.scheduler/queue": "resource-queue",
205+
},
206+
},
207+
Spec: v2alpha2.PodGroupSpec{
208+
Queue: "resource-queue",
209+
MinMember: 1,
210+
},
211+
}
212+
Expect(k8sClient.Create(ctx, podGroup2)).Should(Succeed())
213+
214+
createdPodGroup1 := &v2alpha2.PodGroup{}
215+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "pod-group-1", Namespace: "default"}, createdPodGroup1)).Should(Succeed())
216+
217+
createdPodGroup2 := &v2alpha2.PodGroup{}
218+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "pod-group-2", Namespace: "default"}, createdPodGroup2)).Should(Succeed())
219+
220+
createdPodGroup1.Status = v2alpha2.PodGroupStatus{
221+
Running: 1,
222+
ResourcesStatus: v2alpha2.PodGroupResourcesStatus{
223+
Allocated: v1.ResourceList{
224+
"cpu": resource.MustParse("2"),
225+
"memory": resource.MustParse("4Gi"),
226+
},
227+
AllocatedNonPreemptible: v1.ResourceList{
228+
"cpu": resource.MustParse("1"),
229+
"memory": resource.MustParse("2Gi"),
230+
},
231+
Requested: v1.ResourceList{
232+
"cpu": resource.MustParse("2"),
233+
"memory": resource.MustParse("4Gi"),
234+
},
235+
},
236+
}
237+
Expect(k8sClient.Status().Update(ctx, createdPodGroup1)).Should(Succeed())
238+
239+
createdPodGroup2.Status = v2alpha2.PodGroupStatus{
240+
Running: 1,
241+
ResourcesStatus: v2alpha2.PodGroupResourcesStatus{
242+
Allocated: v1.ResourceList{
243+
"nvidia.com/gpu": resource.MustParse("2"),
244+
"cpu": resource.MustParse("3"),
245+
"memory": resource.MustParse("6Gi"),
246+
},
247+
AllocatedNonPreemptible: v1.ResourceList{
248+
"nvidia.com/gpu": resource.MustParse("1"),
249+
"cpu": resource.MustParse("2"),
250+
"memory": resource.MustParse("4Gi"),
251+
},
252+
Requested: v1.ResourceList{
253+
"nvidia.com/gpu": resource.MustParse("2"),
254+
"cpu": resource.MustParse("3"),
255+
"memory": resource.MustParse("6Gi"),
256+
},
257+
},
258+
}
259+
Expect(k8sClient.Status().Update(ctx, createdPodGroup2)).Should(Succeed())
260+
261+
Eventually(func(q gomega.Gomega) bool {
262+
var updatedQueue v2.Queue
263+
err := k8sClient.Get(ctx, types.NamespacedName{Name: "resource-queue"}, &updatedQueue)
264+
if err != nil {
265+
return false
266+
}
267+
// GinkgoLogr.Info("EREZ TEST", "allocated", updatedQueue.Status.Allocated)
268+
q.Expect(updatedQueue.Status.Allocated["cpu"]).To(Equal(resource.MustParse("5")))
269+
q.Expect(updatedQueue.Status.Allocated["memory"]).To(Equal(resource.MustParse("10Gi")))
270+
q.Expect(updatedQueue.Status.Allocated["nvidia.com/gpu"]).To(Equal(resource.MustParse("2")))
271+
272+
q.Expect(updatedQueue.Status.AllocatedNonPreemptible["cpu"]).To(Equal(resource.MustParse("3")))
273+
q.Expect(updatedQueue.Status.AllocatedNonPreemptible["memory"]).To(Equal(resource.MustParse("6Gi")))
274+
q.Expect(updatedQueue.Status.AllocatedNonPreemptible["nvidia.com/gpu"]).To(Equal(resource.MustParse("1")))
275+
276+
q.Expect(updatedQueue.Status.Requested["cpu"]).To(Equal(resource.MustParse("5")))
277+
q.Expect(updatedQueue.Status.Requested["memory"]).To(Equal(resource.MustParse("10Gi")))
278+
q.Expect(updatedQueue.Status.Requested["nvidia.com/gpu"]).To(Equal(resource.MustParse("2")))
279+
return true
280+
}, timeout, interval).Should(BeTrue())
281+
})
282+
})
283+
})

0 commit comments

Comments
 (0)