@@ -19,6 +19,8 @@ package main
1919import (
2020 "context"
2121 "fmt"
22+ "strings"
23+ "sync"
2224 "time"
2325
2426 v1 "k8s.io/api/core/v1"
@@ -35,30 +37,34 @@ import (
3537)
3638
3739const (
38- DriverName = "gpu.nvidia.com"
39- ImexDomainLabel = "nvidia.com/gpu.imex-domain"
40- ImexChannelLimit = 128
40+ DriverName = "gpu.nvidia.com"
41+ ImexDomainLabel = "nvidia.com/gpu.imex-domain"
42+ ImexChannelLimit = 128
43+ DriverChannelLimit = 2048
4144)
4245
43- type DriverResources resourceslice.DriverResources
46+ type ImexManager struct {
47+ waitGroup sync.WaitGroup
48+ clientset kubernetes.Interface
49+ }
4450
45- func StartIMEXManager (ctx context.Context , config * Config ) error {
51+ func StartIMEXManager (ctx context.Context , config * Config ) ( * ImexManager , error ) {
4652 // Build a client set config
4753 csconfig , err := config .flags .kubeClientConfig .NewClientSetConfig ()
4854 if err != nil {
49- return fmt .Errorf ("error creating client set config: %w" , err )
55+ return nil , fmt .Errorf ("error creating client set config: %w" , err )
5056 }
5157
5258 // Create a new clientset
5359 clientset , err := kubernetes .NewForConfig (csconfig )
5460 if err != nil {
55- return fmt .Errorf ("error creating dynamic client: %w" , err )
61+ return nil , fmt .Errorf ("error creating dynamic client: %w" , err )
5662 }
5763
5864 // Fetch the current Pod object
5965 pod , err := clientset .CoreV1 ().Pods (config .flags .namespace ).Get (ctx , config .flags .podName , metav1.GetOptions {})
6066 if err != nil {
61- return fmt .Errorf ("error fetching pod: %w" , err )
67+ return nil , fmt .Errorf ("error fetching pod: %w" , err )
6268 }
6369
6470 // Set the owner of the ResourceSlices we will create
@@ -69,46 +75,61 @@ func StartIMEXManager(ctx context.Context, config *Config) error {
6975 UID : pod .UID ,
7076 }
7177
78+ // Create the manager itself
79+ m := & ImexManager {
80+ clientset : clientset ,
81+ }
82+
7283 // Stream added/removed IMEX domains from nodes over time
7384 klog .Info ("Start streaming IMEX domains from nodes..." )
74- addedDomainsCh , removedDomainsCh , err := streamImexDomains (ctx , clientset )
85+ addedDomainsCh , removedDomainsCh , err := m . streamImexDomains (ctx )
7586 if err != nil {
76- return fmt .Errorf ("error streaming IMEX domains: %w" , err )
87+ return nil , fmt .Errorf ("error streaming IMEX domains: %w" , err )
7788 }
7889
7990 // Add/Remove resource slices from IMEX domains as they come and go
8091 klog .Info ("Start publishing IMEX channels to ResourceSlices..." )
81- err = manageResourceSlices (ctx , clientset , owner , addedDomainsCh , removedDomainsCh )
92+ err = m . manageResourceSlices (ctx , owner , addedDomainsCh , removedDomainsCh )
8293 if err != nil {
83- return fmt .Errorf ("error managing resource slices: %w" , err )
94+ return nil , fmt .Errorf ("error managing resource slices: %w" , err )
8495 }
8596
86- return nil
97+ return m , nil
8798}
8899
89100// manageResourceSlices reacts to added and removed IMEX domains and triggers the creation / removal of resource slices accordingly.
90- func manageResourceSlices (ctx context.Context , clientset kubernetes.Interface , owner resourceslice.Owner , addedDomainsCh <- chan string , removedDomainsCh <- chan string ) error {
91- driverResources := resourceslice.DriverResources {}
92- controller , err := resourceslice .StartController (ctx , clientset , DriverName , owner , & driverResources )
101+ func (m * ImexManager ) manageResourceSlices (ctx context.Context , owner resourceslice.Owner , addedDomainsCh <- chan string , removedDomainsCh <- chan string ) error {
102+ driverResources := resourceslice.DriverResources {
103+ Pools : make (map [string ]resourceslice.Pool ),
104+ }
105+ controller , err := resourceslice .StartController (ctx , m .clientset , DriverName , owner , & driverResources )
93106 if err != nil {
94107 return fmt .Errorf ("error starting resource slice controller: %w" , err )
95108 }
96109
110+ imexChannelOffsets := make (map [string ]map [string ]int )
111+
112+ m .waitGroup .Add (1 )
97113 go func () {
114+ defer m .waitGroup .Done ()
98115 for {
99116 select {
100117 case addedDomain := <- addedDomainsCh :
101118 klog .Infof ("Adding channels for new IMEX domain: %v" , addedDomain )
102- newDriverResources := DriverResources (driverResources ).DeepCopy ()
103- newDriverResources .Pools [addedDomain ] = generateImexChannelPool (addedDomain , ImexChannelLimit )
104- controller .Update (& newDriverResources )
105- driverResources = newDriverResources
119+ if err := calculateImexChannelLimit (addedDomain , & imexChannelOffsets ); err != nil {
120+ klog .Error (err )
121+ return
122+ }
123+ newDriverResources := driverResources .DeepCopy ()
124+ newDriverResources .Pools [addedDomain ] = generateImexChannelPool (addedDomain , imexChannelOffsets )
125+ controller .Update (newDriverResources )
126+ driverResources = * newDriverResources
106127 case removedDomain := <- removedDomainsCh :
107128 klog .Infof ("Removing channels for removed IMEX domain: %v" , removedDomain )
108- newDriverResources := DriverResources ( driverResources ) .DeepCopy ()
129+ newDriverResources := driverResources .DeepCopy ()
109130 delete (newDriverResources .Pools , removedDomain )
110- controller .Update (& newDriverResources )
111- driverResources = newDriverResources
131+ controller .Update (newDriverResources )
132+ driverResources = * newDriverResources
112133 case <- ctx .Done ():
113134 return
114135 }
@@ -118,19 +139,23 @@ func manageResourceSlices(ctx context.Context, clientset kubernetes.Interface, o
118139 return nil
119140}
120141
121- // DeepCopy will perform a deep copy of the provided DriverResources .
122- func (d DriverResources ) DeepCopy () resourceslice. DriverResources {
123- driverResources := resourceslice. DriverResources {
124- Pools : make ( map [ string ]resourceslice. Pool ),
142+ // Stop stops a running ImexManager .
143+ func (m * ImexManager ) Stop () error {
144+ if m == nil {
145+ return nil
125146 }
126- for p := range d .Pools {
127- driverResources .Pools [p ] = generateImexChannelPool (p , ImexChannelLimit )
147+
148+ m .waitGroup .Wait ()
149+ klog .Info ("Cleaning up all resourceSlices" )
150+ if err := m .cleanupResourceSlices (); err != nil {
151+ return fmt .Errorf ("error cleaning up resource slices: %w" , err )
128152 }
129- return driverResources
153+
154+ return nil
130155}
131156
132157// streamImexDomains returns two channels that streams imexDomans that are added and removed from nodes over time.
133- func streamImexDomains (ctx context.Context , clientset kubernetes. Interface ) (<- chan string , <- chan string , error ) {
158+ func ( m * ImexManager ) streamImexDomains (ctx context.Context ) (<- chan string , <- chan string , error ) {
134159 // Create channels to stream IMEX domain ids that are added / removed
135160 addedDomainCh := make (chan string )
136161 removedDomainCh := make (chan string )
@@ -147,7 +172,7 @@ func streamImexDomains(ctx context.Context, clientset kubernetes.Interface) (<-c
147172
148173 // Create a shared informer factory for nodes
149174 informerFactory := informers .NewSharedInformerFactoryWithOptions (
150- clientset ,
175+ m . clientset ,
151176 time .Minute * 10 , // Resync period
152177 informers .WithTweakListOptions (func (options * metav1.ListOptions ) {
153178 options .LabelSelector = labelSelector
@@ -206,7 +231,11 @@ func streamImexDomains(ctx context.Context, clientset kubernetes.Interface) (<-c
206231 }
207232
208233 // Start the informer and wait for it to sync
209- go informerFactory .Start (ctx .Done ())
234+ m .waitGroup .Add (1 )
235+ go func () {
236+ defer m .waitGroup .Done ()
237+ informerFactory .Start (ctx .Done ())
238+ }()
210239
211240 // Wait for the informer caches to sync
212241 if ! cache .WaitForCacheSync (ctx .Done (), nodeInformer .HasSynced ) {
@@ -217,10 +246,13 @@ func streamImexDomains(ctx context.Context, clientset kubernetes.Interface) (<-c
217246}
218247
219248// generateImexChannelPool generates the contents of a ResourceSlice pool for a given IMEX domain.
220- func generateImexChannelPool (imexDomain string , numChannels int ) resourceslice.Pool {
221- // Generate dchannels from 0 to numChannels
249+ func generateImexChannelPool (imexDomain string , imexChannelOffsets map [string ]map [string ]int ) resourceslice.Pool {
250+ id := strings .Split (imexDomain , "." )
251+ numChannels := imexChannelOffsets [id [0 ]][id [1 ]] + ImexChannelLimit
252+
253+ // Generate dchannels from offset to offset+ImexChannelLimit
222254 var devices []resourceapi.Device
223- for i := 0 ; i < numChannels ; i ++ {
255+ for i := imexChannelOffsets [ id [ 0 ]][ id [ 1 ]] ; i < numChannels ; i ++ {
224256 d := resourceapi.Device {
225257 Name : fmt .Sprintf ("imex-channel-%d" , i ),
226258 Basic : & resourceapi.BasicDevice {
@@ -259,3 +291,46 @@ func generateImexChannelPool(imexDomain string, numChannels int) resourceslice.P
259291
260292 return pool
261293}
294+
295+ // cleanupResourceSlices removes all resource slices created by the IMEX manager.
296+ func (m * ImexManager ) cleanupResourceSlices () error {
297+ // Delete all resource slices created by the IMEX manager
298+ ops := metav1.ListOptions {
299+ FieldSelector : fmt .Sprintf ("%s=%s" , resourceapi .ResourceSliceSelectorDriver , DriverName ),
300+ }
301+ l , err := m .clientset .ResourceV1alpha3 ().ResourceSlices ().List (context .Background (), ops )
302+ if err != nil {
303+ return fmt .Errorf ("error listing resource slices: %w" , err )
304+ }
305+
306+ for _ , rs := range l .Items {
307+ err := m .clientset .ResourceV1alpha3 ().ResourceSlices ().Delete (context .Background (), rs .Name , metav1.DeleteOptions {})
308+ if err != nil {
309+ return fmt .Errorf ("error deleting resource slice %s: %w" , rs .Name , err )
310+ }
311+ }
312+
313+ return nil
314+ }
315+
316+ // calculateImexChannelLimit calculates the number of IMEX channels that can be allocated to a given IMEX domain.
317+ // if the limit is reached, it will return an error.
318+ func calculateImexChannelLimit (addedDomainsCh string , imexChannelOffsets * map [string ]map [string ]int ) error {
319+ id := strings .Split (addedDomainsCh , "." )
320+
321+ // Check if the IMEX domain is already in the map
322+ if _ , ok := (* imexChannelOffsets )[id [0 ]]; ! ok {
323+ (* imexChannelOffsets )[id [0 ]] = make (map [string ]int )
324+ }
325+
326+ // Check if the clique is already in the map
327+ if _ , ok := (* imexChannelOffsets )[id [0 ]][id [1 ]]; ! ok {
328+ offset := (len ((* imexChannelOffsets )[id [0 ]]) * ImexChannelLimit )
329+ if offset >= DriverChannelLimit {
330+ return fmt .Errorf ("error adding IMEX domain %s: channel limit reached" , id [0 ])
331+ }
332+ (* imexChannelOffsets )[id [0 ]][id [1 ]] = offset
333+ }
334+
335+ return nil
336+ }
0 commit comments