Skip to content

Commit 56e6f85

Browse files
committed
Add support for a centralized controller to advertise IMEX channel
Signed-off-by: Kevin Klues <[email protected]>
1 parent e96f49f commit 56e6f85

File tree

8 files changed

+541
-189
lines changed

8 files changed

+541
-189
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
.cache/
22
.bash_history
3+
/nvidia-dra-controller
34
/nvidia-dra-plugin
45
.idea
56
[._]*.sw[a-p]

cmd/nvidia-dra-controller/imex.go

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*
2+
* Copyright (c) 2024 NVIDIA CORPORATION. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
v1 "k8s.io/api/core/v1"
25+
resourceapi "k8s.io/api/resource/v1alpha3"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/labels"
28+
"k8s.io/apimachinery/pkg/selection"
29+
"k8s.io/client-go/informers"
30+
"k8s.io/client-go/kubernetes"
31+
"k8s.io/client-go/tools/cache"
32+
"k8s.io/dynamic-resource-allocation/resourceslice"
33+
"k8s.io/klog/v2"
34+
"k8s.io/utils/ptr"
35+
)
36+
37+
const (
38+
DriverName = "gpu.nvidia.com"
39+
ImexDomainLabel = "nvidia.com/gpu.imex-domain"
40+
ImexChannelLimit = 128
41+
)
42+
43+
type DriverResources resourceslice.DriverResources
44+
45+
func StartIMEXManager(ctx context.Context, config *Config) error {
46+
// Build a client set config
47+
csconfig, err := config.flags.kubeClientConfig.NewClientSetConfig()
48+
if err != nil {
49+
return fmt.Errorf("error creating client set config: %w", err)
50+
}
51+
52+
// Create a new clientset
53+
clientset, err := kubernetes.NewForConfig(csconfig)
54+
if err != nil {
55+
return fmt.Errorf("error creating dynamic client: %w", err)
56+
}
57+
58+
// Fetch the current Pod object
59+
pod, err := clientset.CoreV1().Pods(config.flags.namespace).Get(ctx, config.flags.podName, metav1.GetOptions{})
60+
if err != nil {
61+
return fmt.Errorf("error fetching pod: %w", err)
62+
}
63+
64+
// Set the owner of the ResourceSlices we will create
65+
owner := resourceslice.Owner{
66+
APIVersion: "v1",
67+
Kind: "Pod",
68+
Name: pod.Name,
69+
UID: pod.UID,
70+
}
71+
72+
// Stream added/removed IMEX domains from nodes over time
73+
klog.Info("Start streaming IMEX domains from nodes...")
74+
addedDomainsCh, removedDomainsCh, err := streamImexDomains(ctx, clientset)
75+
if err != nil {
76+
return fmt.Errorf("error streaming IMEX domains: %w", err)
77+
}
78+
79+
// Add/Remove resource slices from IMEX domains as they come and go
80+
klog.Info("Start publishing IMEX channels to ResourceSlices...")
81+
err = manageResourceSlices(ctx, clientset, owner, addedDomainsCh, removedDomainsCh)
82+
if err != nil {
83+
return fmt.Errorf("error managing resource slices: %w", err)
84+
}
85+
86+
return nil
87+
}
88+
89+
// manageResourceSlices reacts to added and removed IMEX domains and triggers the creation / removal of resource slices accordingly.
90+
func manageResourceSlices(ctx context.Context, clientset kubernetes.Interface, owner resourceslice.Owner, addedDomainsCh <-chan string, removedDomainsCh <-chan string) error {
91+
driverResources := resourceslice.DriverResources{}
92+
controller, err := resourceslice.StartController(ctx, clientset, DriverName, owner, &driverResources)
93+
if err != nil {
94+
return fmt.Errorf("error starting resource slice controller: %w", err)
95+
}
96+
97+
go func() {
98+
for {
99+
select {
100+
case addedDomain := <-addedDomainsCh:
101+
klog.Infof("Adding channels for new IMEX domain: %v", addedDomain)
102+
newDriverResources := DriverResources(driverResources).DeepCopy()
103+
newDriverResources.Pools[addedDomain] = generateImexChannelPool(addedDomain, ImexChannelLimit)
104+
controller.Update(&newDriverResources)
105+
driverResources = newDriverResources
106+
case removedDomain := <-removedDomainsCh:
107+
klog.Infof("Removing channels for removed IMEX domain: %v", removedDomain)
108+
newDriverResources := DriverResources(driverResources).DeepCopy()
109+
delete(newDriverResources.Pools, removedDomain)
110+
controller.Update(&newDriverResources)
111+
driverResources = newDriverResources
112+
case <-ctx.Done():
113+
return
114+
}
115+
}
116+
}()
117+
118+
return nil
119+
}
120+
121+
// DeepCopy will perform a deep copy of the provided DriverResources.
122+
func (d DriverResources) DeepCopy() resourceslice.DriverResources {
123+
driverResources := resourceslice.DriverResources{
124+
Pools: make(map[string]resourceslice.Pool),
125+
}
126+
for p := range d.Pools {
127+
driverResources.Pools[p] = generateImexChannelPool(p, ImexChannelLimit)
128+
}
129+
return driverResources
130+
}
131+
132+
// streamImexDomains returns two channels that streams imexDomans that are added and removed from nodes over time.
133+
func streamImexDomains(ctx context.Context, clientset kubernetes.Interface) (<-chan string, <-chan string, error) {
134+
// Create channels to stream IMEX domain ids that are added / removed
135+
addedDomainCh := make(chan string)
136+
removedDomainCh := make(chan string)
137+
138+
// Use a map to track how many nodes are part of a given IMEX domain
139+
nodesPerImexDomain := make(map[string]int)
140+
141+
// Build a label selector to get all nodes with ImexDomainLabel set
142+
requirement, err := labels.NewRequirement(ImexDomainLabel, selection.Exists, nil)
143+
if err != nil {
144+
return nil, nil, fmt.Errorf("error building label selector requirement: %w", err)
145+
}
146+
labelSelector := labels.NewSelector().Add(*requirement).String()
147+
148+
// Create a shared informer factory for nodes
149+
informerFactory := informers.NewSharedInformerFactoryWithOptions(
150+
clientset,
151+
time.Minute*10, // Resync period
152+
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
153+
options.LabelSelector = labelSelector
154+
}),
155+
)
156+
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
157+
158+
// Set up event handlers for node events
159+
_, err = nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
160+
AddFunc: func(obj interface{}) {
161+
node := obj.(*v1.Node) // nolint:forcetypeassert
162+
imexDomain := node.Labels[ImexDomainLabel]
163+
if imexDomain != "" {
164+
nodesPerImexDomain[imexDomain]++
165+
if nodesPerImexDomain[imexDomain] == 1 {
166+
addedDomainCh <- imexDomain
167+
}
168+
}
169+
},
170+
DeleteFunc: func(obj interface{}) {
171+
node := obj.(*v1.Node) // nolint:forcetypeassert
172+
imexDomain := node.Labels[ImexDomainLabel]
173+
if imexDomain != "" {
174+
nodesPerImexDomain[imexDomain]--
175+
if nodesPerImexDomain[imexDomain] == 0 {
176+
removedDomainCh <- imexDomain
177+
}
178+
}
179+
},
180+
UpdateFunc: func(oldObj, newObj interface{}) {
181+
oldNode := oldObj.(*v1.Node) // nolint:forcetypeassert
182+
newNode := newObj.(*v1.Node) // nolint:forcetypeassert
183+
184+
oldImexDomain := oldNode.Labels[ImexDomainLabel]
185+
newImexDomain := newNode.Labels[ImexDomainLabel]
186+
187+
if oldImexDomain == newImexDomain {
188+
return
189+
}
190+
if oldImexDomain != "" {
191+
nodesPerImexDomain[oldImexDomain]--
192+
if nodesPerImexDomain[oldImexDomain] == 0 {
193+
removedDomainCh <- oldImexDomain
194+
}
195+
}
196+
if newImexDomain != "" {
197+
nodesPerImexDomain[newImexDomain]++
198+
if nodesPerImexDomain[newImexDomain] == 1 {
199+
addedDomainCh <- newImexDomain
200+
}
201+
}
202+
},
203+
})
204+
if err != nil {
205+
return nil, nil, fmt.Errorf("failed to create node informer: %w", err)
206+
}
207+
208+
// Start the informer and wait for it to sync
209+
go informerFactory.Start(ctx.Done())
210+
211+
// Wait for the informer caches to sync
212+
if !cache.WaitForCacheSync(ctx.Done(), nodeInformer.HasSynced) {
213+
return nil, nil, fmt.Errorf("failed to sync informer caches")
214+
}
215+
216+
return addedDomainCh, removedDomainCh, nil
217+
}
218+
219+
// generateImexChannelPool generates the contents of a ResourceSlice pool for a given IMEX domain.
220+
func generateImexChannelPool(imexDomain string, numChannels int) resourceslice.Pool {
221+
// Generate dchannels from 0 to numChannels
222+
var devices []resourceapi.Device
223+
for i := 0; i < numChannels; i++ {
224+
d := resourceapi.Device{
225+
Name: fmt.Sprintf("imex-channel-%d", i),
226+
Basic: &resourceapi.BasicDevice{
227+
Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
228+
"type": {
229+
StringValue: ptr.To("imex-channel"),
230+
},
231+
"channel": {
232+
IntValue: ptr.To(int64(i)),
233+
},
234+
},
235+
},
236+
}
237+
devices = append(devices, d)
238+
}
239+
240+
// Put them in a pool named after the IMEX domain with the IMEX domain label as a node selector
241+
pool := resourceslice.Pool{
242+
NodeSelector: &v1.NodeSelector{
243+
NodeSelectorTerms: []v1.NodeSelectorTerm{
244+
{
245+
MatchExpressions: []v1.NodeSelectorRequirement{
246+
{
247+
Key: ImexDomainLabel,
248+
Operator: v1.NodeSelectorOpIn,
249+
Values: []string{
250+
imexDomain,
251+
},
252+
},
253+
},
254+
},
255+
},
256+
},
257+
Devices: devices,
258+
}
259+
260+
return pool
261+
}

0 commit comments

Comments
 (0)