@@ -19,6 +19,7 @@ package main
1919import (
2020 "context"
2121 "fmt"
22+ "strings"
2223 "sync"
2324 "time"
2425
@@ -36,17 +37,20 @@ import (
3637)
3738
3839const (
39- DriverName = "gpu.nvidia.com"
40- ImexDomainLabel = "nvidia.com/gpu.imex-domain"
41- ImexChannelLimit = 128
40+ DriverName = "gpu.nvidia.com"
41+ ImexDomainLabel = "nvidia.com/gpu.imex-domain"
42+ ResourceSliceImexChannelLimit = 128
43+ DriverImexChannelLimit = 2048
4244)
4345
4446type ImexManager struct {
4547 waitGroup sync.WaitGroup
4648 clientset kubernetes.Interface
4749}
4850
49- type DriverResources resourceslice.DriverResources
51+ // imexDomainOffsets represents the offset for assigning IMEX channels
52+ // to ResourceSlices for each <imex-domain, cliqueid> combination.
53+ type imexDomainOffsets map [string ]map [string ]int
5054
5155func StartIMEXManager (ctx context.Context , config * Config ) (* ImexManager , error ) {
5256 // Build a client set config
@@ -99,29 +103,34 @@ func StartIMEXManager(ctx context.Context, config *Config) (*ImexManager, error)
99103
100104// manageResourceSlices reacts to added and removed IMEX domains and triggers the creation / removal of resource slices accordingly.
101105func (m * ImexManager ) manageResourceSlices (ctx context.Context , owner resourceslice.Owner , addedDomainsCh <- chan string , removedDomainsCh <- chan string ) error {
102- driverResources := resourceslice.DriverResources {}
103- controller , err := resourceslice .StartController (ctx , m .clientset , DriverName , owner , & driverResources )
106+ driverResources := & resourceslice.DriverResources {}
107+ controller , err := resourceslice .StartController (ctx , m .clientset , DriverName , owner , driverResources )
104108 if err != nil {
105109 return fmt .Errorf ("error starting resource slice controller: %w" , err )
106110 }
107111
112+ imexDomainOffsets := new (imexDomainOffsets )
108113 m .waitGroup .Add (1 )
109114 go func () {
110115 defer m .waitGroup .Done ()
111116 for {
112117 select {
113118 case addedDomain := <- addedDomainsCh :
119+ offset , err := imexDomainOffsets .add (addedDomain , ResourceSliceImexChannelLimit , DriverImexChannelLimit )
120+ if err != nil {
121+ klog .Errorf ("Error calculating channel offset for IMEX domain %s: %v" , addedDomain , err )
122+ return
123+ }
114124 klog .Infof ("Adding channels for new IMEX domain: %v" , addedDomain )
115- newDriverResources := DriverResources (driverResources ).DeepCopy ()
116- newDriverResources .Pools [addedDomain ] = generateImexChannelPool (addedDomain , ImexChannelLimit )
117- controller .Update (& newDriverResources )
118- driverResources = newDriverResources
125+ driverResources := driverResources .DeepCopy ()
126+ driverResources .Pools [addedDomain ] = generateImexChannelPool (addedDomain , offset , ResourceSliceImexChannelLimit )
127+ controller .Update (driverResources )
119128 case removedDomain := <- removedDomainsCh :
120129 klog .Infof ("Removing channels for removed IMEX domain: %v" , removedDomain )
121- newDriverResources := DriverResources ( driverResources ) .DeepCopy ()
122- delete (newDriverResources .Pools , removedDomain )
123- controller . Update ( & newDriverResources )
124- driverResources = newDriverResources
130+ driverResources := driverResources .DeepCopy ()
131+ delete (driverResources .Pools , removedDomain )
132+ imexDomainOffsets . remove ( removedDomain )
133+ controller . Update ( driverResources )
125134 case <- ctx .Done ():
126135 return
127136 }
@@ -146,17 +155,6 @@ func (m *ImexManager) Stop() error {
146155 return nil
147156}
148157
149- // DeepCopy will perform a deep copy of the provided DriverResources.
150- func (d DriverResources ) DeepCopy () resourceslice.DriverResources {
151- driverResources := resourceslice.DriverResources {
152- Pools : make (map [string ]resourceslice.Pool ),
153- }
154- for p := range d .Pools {
155- driverResources .Pools [p ] = generateImexChannelPool (p , ImexChannelLimit )
156- }
157- return driverResources
158- }
159-
160158// streamImexDomains returns two channels that streams imexDomans that are added and removed from nodes over time.
161159func (m * ImexManager ) streamImexDomains (ctx context.Context ) (<- chan string , <- chan string , error ) {
162160 // Create channels to stream IMEX domain ids that are added / removed
@@ -249,10 +247,10 @@ func (m *ImexManager) streamImexDomains(ctx context.Context) (<-chan string, <-c
249247}
250248
251249// generateImexChannelPool generates the contents of a ResourceSlice pool for a given IMEX domain.
252- func generateImexChannelPool (imexDomain string , numChannels int ) resourceslice.Pool {
253- // Generate dchannels from 0 to numChannels
250+ func generateImexChannelPool (imexDomain string , startChannel int , numChannels int ) resourceslice.Pool {
251+ // Generate dchannels from startChannel to offset+ numChannels
254252 var devices []resourceapi.Device
255- for i := 0 ; i < numChannels ; i ++ {
253+ for i := startChannel ; i < ( startChannel + numChannels ) ; i ++ {
256254 d := resourceapi.Device {
257255 Name : fmt .Sprintf ("imex-channel-%d" , i ),
258256 Basic : & resourceapi.BasicDevice {
@@ -312,3 +310,59 @@ func (m *ImexManager) cleanupResourceSlices() error {
312310
313311 return nil
314312}
313+
314+ // add sets the offset where an IMEX domain's channels should start counting from.
315+ func (offsets imexDomainOffsets ) add (imexDomain string , resourceSliceImexChannelLimit , driverImexChannelLimit int ) (int , error ) {
316+ id := strings .SplitN (imexDomain , "." , 2 )
317+ if len (id ) != 2 {
318+ return - 1 , fmt .Errorf ("error adding IMEX domain %s: invalid format" , imexDomain )
319+ }
320+ domain := id [0 ]
321+ cliqueID := id [1 ]
322+
323+ // Check if the IMEX domain is already in the map.
324+ if _ , ok := offsets [domain ]; ! ok {
325+ offsets [id [0 ]] = make (map [string ]int )
326+ }
327+
328+ // Check if the clique is already in the map.
329+ var offset int
330+ if _ , ok := offsets [domain ][cliqueID ]; ! ok {
331+ // Look for an offset number that is not already in use.
332+ // stepping by ResourceSliceImexChannelLimit to avoid overlapping with other cliques.
333+ for offset = 0 ; offset < driverImexChannelLimit ; offset += resourceSliceImexChannelLimit {
334+ used := false
335+ for _ , v := range offsets [domain ] {
336+ if offset == v {
337+ used = true
338+ break
339+ }
340+ }
341+ if ! used {
342+ break
343+ }
344+ }
345+
346+ // If we reach the limit, return an error.
347+ if offset >= driverImexChannelLimit {
348+ return - 1 , fmt .Errorf ("error adding IMEX domain %s: channel limit reached" , id [0 ])
349+ }
350+ offsets [domain ][cliqueID ] = offset
351+ }
352+
353+ return offset , nil
354+ }
355+
356+ func (i imexDomainOffsets ) remove (imexDomain string ) {
357+ id := strings .SplitN (imexDomain , "." , 2 )
358+ if len (id ) != 2 {
359+ return
360+ }
361+ domain := id [0 ]
362+ cliqueID := id [1 ]
363+
364+ delete (i [domain ], cliqueID )
365+ if len (i [domain ]) == 0 {
366+ delete (i , domain )
367+ }
368+ }
0 commit comments