Skip to content

Commit 8735165

Browse files
committed
Add support for a centralized controller to advertise IMEX channel
Signed-off-by: Kevin Klues <[email protected]>
1 parent 2746f19 commit 8735165

File tree

8 files changed

+535
-189
lines changed

8 files changed

+535
-189
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
.cache/
22
.bash_history
3+
/nvidia-dra-controller
34
/nvidia-dra-plugin
45
.idea
56
[._]*.sw[a-p]

cmd/nvidia-dra-controller/imex.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* Copyright (c) 2024 NVIDIA CORPORATION. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"k8s.io/api/core/v1"
25+
resourceapi "k8s.io/api/resource/v1alpha3"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/labels"
28+
"k8s.io/apimachinery/pkg/selection"
29+
"k8s.io/client-go/informers"
30+
"k8s.io/client-go/kubernetes"
31+
"k8s.io/client-go/tools/cache"
32+
"k8s.io/dynamic-resource-allocation/resourceslice"
33+
"k8s.io/klog/v2"
34+
"k8s.io/utils/ptr"
35+
)
36+
37+
const (
38+
DriverName = "gpu.nvidia.com"
39+
ImexDomainLabel = "nvidia.com/gpu.imex-domain"
40+
ImexChannelLimit = 128
41+
)
42+
43+
type DriverResources resourceslice.DriverResources
44+
45+
func StartIMEXManager(ctx context.Context, config *Config) error {
46+
// Build a client set config
47+
csconfig, err := config.flags.kubeClientConfig.NewClientSetConfig()
48+
if err != nil {
49+
return fmt.Errorf("error creating client set config: %w", err)
50+
}
51+
52+
// Create a new clientset
53+
clientset, err := kubernetes.NewForConfig(csconfig)
54+
if err != nil {
55+
return fmt.Errorf("error creating dynamic client: %w", err)
56+
}
57+
58+
// Fetch the current Pod object
59+
pod, err := clientset.CoreV1().Pods(config.flags.namespace).Get(ctx, config.flags.podName, metav1.GetOptions{})
60+
if err != nil {
61+
return fmt.Errorf("error fetching pod: %w", err)
62+
}
63+
64+
// Set the owner of the ResourceSlices we will create
65+
owner := resourceslice.Owner{
66+
APIVersion: "v1",
67+
Kind: "Pod",
68+
Name: pod.Name,
69+
UID: pod.UID,
70+
}
71+
72+
// Stream added/removed IMEX domains from nodes over time
73+
klog.Info("Start streaming IMEX domains from nodes...")
74+
addedDomainsCh, removedDomainsCh, err := streamImexDomains(ctx, clientset)
75+
if err != nil {
76+
return fmt.Errorf("error streaming IMEX domains: %w", err)
77+
}
78+
79+
// Add/Remove resource slices from IMEX domains as they come and go
80+
klog.Info("Start publishing IMEX channels to ResourceSlices...")
81+
err = manageResourceSlices(ctx, clientset, owner, addedDomainsCh, removedDomainsCh)
82+
if err != nil {
83+
return fmt.Errorf("error managing resource slices: %w", err)
84+
}
85+
86+
return nil
87+
}
88+
89+
// manageResourceSlices reacts to added and removed IMEX domains and triggers the creation / removal of resource slices accordingly.
90+
func manageResourceSlices(ctx context.Context, clientset kubernetes.Interface, owner resourceslice.Owner, addedDomainsCh <-chan string, removedDomainsCh <-chan string) error {
91+
driverResources := resourceslice.DriverResources{}
92+
controller := resourceslice.StartController(ctx, clientset, DriverName, owner, &driverResources)
93+
94+
go func() {
95+
for {
96+
select {
97+
case addedDomain := <-addedDomainsCh:
98+
klog.Infof("Adding channels for new IMEX domain: %v", addedDomain)
99+
newDriverResources := DriverResources(driverResources).DeepCopy()
100+
newDriverResources.Pools[addedDomain] = generateImexChannelPool(addedDomain, ImexChannelLimit)
101+
controller.Update(&newDriverResources)
102+
driverResources = newDriverResources
103+
case removedDomain := <-removedDomainsCh:
104+
klog.Infof("Removing channels for removed IMEX domain: %v", removedDomain)
105+
newDriverResources := DriverResources(driverResources).DeepCopy()
106+
delete(newDriverResources.Pools, removedDomain)
107+
controller.Update(&newDriverResources)
108+
driverResources = newDriverResources
109+
case <-ctx.Done():
110+
return
111+
}
112+
}
113+
}()
114+
115+
return nil
116+
}
117+
118+
// DeepCopy will perform a deep copy of the provided DriverResources
119+
func (d DriverResources) DeepCopy() resourceslice.DriverResources {
120+
driverResources := resourceslice.DriverResources{
121+
Pools: make(map[string]resourceslice.Pool),
122+
}
123+
for p := range d.Pools {
124+
driverResources.Pools[p] = generateImexChannelPool(p, ImexChannelLimit)
125+
}
126+
return driverResources
127+
}
128+
129+
// streamImexDomains returns two channels that streams imexDomans that are added and removed from nodes over time.
130+
func streamImexDomains(ctx context.Context, clientset kubernetes.Interface) (<-chan string, <-chan string, error) {
131+
// Create channels to stream IMEX domain ids that are added / removed
132+
addedDomainCh := make(chan string)
133+
removedDomainCh := make(chan string)
134+
135+
// Use a map to track how many nodes are part of a given IMEX domain
136+
nodesPerImexDomain := make(map[string]int)
137+
138+
// Build a label selector to get all nodes with ImexDomainLabel set
139+
requirement, err := labels.NewRequirement(ImexDomainLabel, selection.Exists, nil)
140+
if err != nil {
141+
return nil, nil, fmt.Errorf("error building label selector requirement: %w", err)
142+
}
143+
labelSelector := labels.NewSelector().Add(*requirement).String()
144+
145+
// Create a shared informer factory for nodes
146+
informerFactory := informers.NewSharedInformerFactoryWithOptions(
147+
clientset,
148+
time.Minute*10, // Resync period
149+
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
150+
options.LabelSelector = labelSelector
151+
}),
152+
)
153+
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
154+
155+
// Set up event handlers for node events
156+
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
157+
AddFunc: func(obj interface{}) {
158+
node := obj.(*v1.Node)
159+
imexDomain := node.Labels[ImexDomainLabel]
160+
if imexDomain != "" {
161+
nodesPerImexDomain[imexDomain]++
162+
if nodesPerImexDomain[imexDomain] == 1 {
163+
addedDomainCh <- imexDomain
164+
}
165+
}
166+
},
167+
DeleteFunc: func(obj interface{}) {
168+
node := obj.(*v1.Node)
169+
imexDomain := node.Labels[ImexDomainLabel]
170+
if imexDomain != "" {
171+
nodesPerImexDomain[imexDomain]--
172+
if nodesPerImexDomain[imexDomain] == 0 {
173+
removedDomainCh <- imexDomain
174+
}
175+
}
176+
},
177+
UpdateFunc: func(oldObj, newObj interface{}) {
178+
oldNode := oldObj.(*v1.Node)
179+
newNode := newObj.(*v1.Node)
180+
181+
oldImexDomain := oldNode.Labels[ImexDomainLabel]
182+
newImexDomain := newNode.Labels[ImexDomainLabel]
183+
184+
if oldImexDomain == newImexDomain {
185+
return
186+
}
187+
if oldImexDomain != "" {
188+
nodesPerImexDomain[oldImexDomain]--
189+
if nodesPerImexDomain[oldImexDomain] == 0 {
190+
removedDomainCh <- oldImexDomain
191+
}
192+
}
193+
if newImexDomain != "" {
194+
nodesPerImexDomain[newImexDomain]++
195+
if nodesPerImexDomain[newImexDomain] == 1 {
196+
addedDomainCh <- newImexDomain
197+
}
198+
}
199+
},
200+
})
201+
202+
// Start the informer and wait for it to sync
203+
go informerFactory.Start(ctx.Done())
204+
205+
// Wait for the informer caches to sync
206+
if !cache.WaitForCacheSync(ctx.Done(), nodeInformer.HasSynced) {
207+
return nil, nil, fmt.Errorf("failed to sync informer caches")
208+
}
209+
210+
return addedDomainCh, removedDomainCh, nil
211+
}
212+
213+
// generateImexChannelPool generates the contents of a ResourceSlice pool for a given IMEX domain.
214+
func generateImexChannelPool(imexDomain string, numChannels int) resourceslice.Pool {
215+
// Generate dchannels from 0 to numChannels
216+
var devices []resourceapi.Device
217+
for i := 0; i < numChannels; i++ {
218+
d := resourceapi.Device{
219+
Name: fmt.Sprintf("imex-channel-%d", i),
220+
Basic: &resourceapi.BasicDevice{
221+
Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
222+
"type": {
223+
StringValue: ptr.To("imex-channel"),
224+
},
225+
"channel": {
226+
IntValue: ptr.To(int64(i)),
227+
},
228+
},
229+
},
230+
}
231+
devices = append(devices, d)
232+
}
233+
234+
// Put them in a pool named after the IMEX domain with the IMEX domain label as a node selector
235+
pool := resourceslice.Pool{
236+
NodeSelector: &v1.NodeSelector{
237+
NodeSelectorTerms: []v1.NodeSelectorTerm{
238+
{
239+
MatchExpressions: []v1.NodeSelectorRequirement{
240+
{
241+
Key: ImexDomainLabel,
242+
Operator: v1.NodeSelectorOpIn,
243+
Values: []string{
244+
imexDomain,
245+
},
246+
},
247+
},
248+
},
249+
},
250+
},
251+
Devices: devices,
252+
}
253+
254+
return pool
255+
}

0 commit comments

Comments
 (0)