@@ -19,6 +19,7 @@ package main
1919import (
2020 "context"
2121 "fmt"
22+ "strings"
2223 "sync"
2324 "time"
2425
@@ -36,17 +37,20 @@ import (
3637)
3738
3839const (
39- DriverName = "gpu.nvidia.com"
40- ImexDomainLabel = "nvidia.com/gpu.imex-domain"
41- ImexChannelLimit = 128
40+ DriverName = "gpu.nvidia.com"
41+ ImexDomainLabel = "nvidia.com/gpu.imex-domain"
42+ ResourceSliceImexChannelLimit = 128
43+ DriverImexChannelLimit = 2048
4244)
4345
4446type ImexManager struct {
4547 waitGroup sync.WaitGroup
4648 clientset kubernetes.Interface
4749}
4850
49- type DriverResources resourceslice.DriverResources
51+ // imexDomainOffsets represents the offset for assigning IMEX channels
52+ // to ResourceSlices for each <imex-domain, cliqueid> combination.
53+ type imexDomainOffsets map [string ]map [string ]int
5054
5155func StartIMEXManager (ctx context.Context , config * Config ) (* ImexManager , error ) {
5256 // Build a client set config
@@ -99,29 +103,34 @@ func StartIMEXManager(ctx context.Context, config *Config) (*ImexManager, error)
99103
100104// manageResourceSlices reacts to added and removed IMEX domains and triggers the creation / removal of resource slices accordingly.
101105func (m * ImexManager ) manageResourceSlices (ctx context.Context , owner resourceslice.Owner , addedDomainsCh <- chan string , removedDomainsCh <- chan string ) error {
102- driverResources := resourceslice.DriverResources {}
103- controller , err := resourceslice .StartController (ctx , m .clientset , DriverName , owner , & driverResources )
106+ driverResources := & resourceslice.DriverResources {}
107+ controller , err := resourceslice .StartController (ctx , m .clientset , DriverName , owner , driverResources )
104108 if err != nil {
105109 return fmt .Errorf ("error starting resource slice controller: %w" , err )
106110 }
107111
112+ imexDomainOffsets := new (imexDomainOffsets )
108113 m .waitGroup .Add (1 )
109114 go func () {
110115 defer m .waitGroup .Done ()
111116 for {
112117 select {
113118 case addedDomain := <- addedDomainsCh :
119+ offset , err := imexDomainOffsets .add (addedDomain , ResourceSliceImexChannelLimit , DriverImexChannelLimit )
120+ if err != nil {
121+ klog .Errorf ("Error calculating channel offset for IMEX domain %s: %v" , addedDomain , err )
122+ return
123+ }
114124 klog .Infof ("Adding channels for new IMEX domain: %v" , addedDomain )
115- newDriverResources := DriverResources (driverResources ).DeepCopy ()
116- newDriverResources .Pools [addedDomain ] = generateImexChannelPool (addedDomain , ImexChannelLimit )
117- controller .Update (& newDriverResources )
118- driverResources = newDriverResources
125+ driverResources := driverResources .DeepCopy ()
126+ driverResources .Pools [addedDomain ] = generateImexChannelPool (addedDomain , offset , ResourceSliceImexChannelLimit )
127+ controller .Update (driverResources )
119128 case removedDomain := <- removedDomainsCh :
120129 klog .Infof ("Removing channels for removed IMEX domain: %v" , removedDomain )
121- newDriverResources := DriverResources ( driverResources ) .DeepCopy ()
122- delete (newDriverResources .Pools , removedDomain )
123- controller . Update ( & newDriverResources )
124- driverResources = newDriverResources
130+ driverResources := driverResources .DeepCopy ()
131+ delete (driverResources .Pools , removedDomain )
132+ imexDomainOffsets . remove ( removedDomain )
133+ controller . Update ( driverResources )
125134 case <- ctx .Done ():
126135 return
127136 }
@@ -146,17 +155,6 @@ func (m *ImexManager) Stop() error {
146155 return nil
147156}
148157
149- // DeepCopy will perform a deep copy of the provided DriverResources.
150- func (d DriverResources ) DeepCopy () resourceslice.DriverResources {
151- driverResources := resourceslice.DriverResources {
152- Pools : make (map [string ]resourceslice.Pool ),
153- }
154- for p := range d .Pools {
155- driverResources .Pools [p ] = generateImexChannelPool (p , ImexChannelLimit )
156- }
157- return driverResources
158- }
159-
160158// streamImexDomains returns two channels that streams imexDomans that are added and removed from nodes over time.
161159func (m * ImexManager ) streamImexDomains (ctx context.Context ) (<- chan string , <- chan string , error ) {
162160 // Create channels to stream IMEX domain ids that are added / removed
@@ -249,10 +247,10 @@ func (m *ImexManager) streamImexDomains(ctx context.Context) (<-chan string, <-c
249247}
250248
251249// generateImexChannelPool generates the contents of a ResourceSlice pool for a given IMEX domain.
252- func generateImexChannelPool (imexDomain string , numChannels int ) resourceslice.Pool {
253- // Generate dchannels from 0 to numChannels
250+ func generateImexChannelPool (imexDomain string , startChannel int , numChannels int ) resourceslice.Pool {
251+ // Generate channels from startChannel to offset+ numChannels
254252 var devices []resourceapi.Device
255- for i := 0 ; i < numChannels ; i ++ {
253+ for i := startChannel ; i < ( startChannel + numChannels ) ; i ++ {
256254 d := resourceapi.Device {
257255 Name : fmt .Sprintf ("imex-channel-%d" , i ),
258256 Basic : & resourceapi.BasicDevice {
@@ -312,3 +310,60 @@ func (m *ImexManager) cleanupResourceSlices() error {
312310
313311 return nil
314312}
313+
314+ // add sets the offset where an IMEX domain's channels should start counting from.
315+ func (offsets imexDomainOffsets ) add (imexDomain string , resourceSliceImexChannelLimit , driverImexChannelLimit int ) (int , error ) {
316+ // Split the incoming imexDomain to split off its cliqueID
317+ id := strings .SplitN (imexDomain , "." , 2 )
318+ if len (id ) != 2 {
319+ return - 1 , fmt .Errorf ("error adding IMEX domain %s: invalid format" , imexDomain )
320+ }
321+ imexDomain = id [0 ]
322+ cliqueID := id [1 ]
323+
324+ // Check if the IMEX domain is already in the map
325+ if _ , ok := offsets [imexDomain ]; ! ok {
326+ offsets [imexDomain ] = make (map [string ]int )
327+ }
328+
329+ // Return early if the clique is already in the map
330+ if offset , exists := offsets [imexDomain ][cliqueID ]; exists {
331+ return offset , nil
332+ }
333+
334+ // Track used offsets for the current imexDomain
335+ usedOffsets := make (map [int ]struct {})
336+ for _ , v := range offsets [imexDomain ] {
337+ usedOffsets [v ] = struct {}{}
338+ }
339+
340+ // Look for the first unused offset, stepping by resourceSliceImexChannelLimit
341+ var offset int
342+ for offset = 0 ; offset < driverImexChannelLimit ; offset += resourceSliceImexChannelLimit {
343+ if _ , exists := usedOffsets [offset ]; ! exists {
344+ break
345+ }
346+ }
347+
348+ // If we reach the limit, return an error
349+ if offset == driverImexChannelLimit {
350+ return - 1 , fmt .Errorf ("error adding IMEX domain %s: channel limit reached" , imexDomain )
351+ }
352+ offsets [imexDomain ][cliqueID ] = offset
353+
354+ return offset , nil
355+ }
356+
357+ func (offsets imexDomainOffsets ) remove (imexDomain string ) {
358+ id := strings .SplitN (imexDomain , "." , 2 )
359+ if len (id ) != 2 {
360+ return
361+ }
362+ imexDomain = id [0 ]
363+ cliqueID := id [1 ]
364+
365+ delete (offsets [imexDomain ], cliqueID )
366+ if len (offsets [imexDomain ]) == 0 {
367+ delete (offsets , imexDomain )
368+ }
369+ }
0 commit comments