@@ -19,6 +19,7 @@ package main
1919import (
2020 "context"
2121 "fmt"
22+ "sync"
2223 "time"
2324
2425 v1 "k8s.io/api/core/v1"
@@ -40,25 +41,30 @@ const (
4041 ImexChannelLimit = 128
4142)
4243
44+ type ImexManager struct {
45+ waitGroup sync.WaitGroup
46+ clientset kubernetes.Interface
47+ }
48+
4349type DriverResources resourceslice.DriverResources
4450
45- func StartIMEXManager (ctx context.Context , config * Config ) error {
51+ func StartIMEXManager (ctx context.Context , config * Config ) ( * ImexManager , error ) {
4652 // Build a client set config
4753 csconfig , err := config .flags .kubeClientConfig .NewClientSetConfig ()
4854 if err != nil {
49- return fmt .Errorf ("error creating client set config: %w" , err )
55+ return nil , fmt .Errorf ("error creating client set config: %w" , err )
5056 }
5157
5258 // Create a new clientset
5359 clientset , err := kubernetes .NewForConfig (csconfig )
5460 if err != nil {
55- return fmt .Errorf ("error creating dynamic client: %w" , err )
61+ return nil , fmt .Errorf ("error creating dynamic client: %w" , err )
5662 }
5763
5864 // Fetch the current Pod object
5965 pod , err := clientset .CoreV1 ().Pods (config .flags .namespace ).Get (ctx , config .flags .podName , metav1.GetOptions {})
6066 if err != nil {
61- return fmt .Errorf ("error fetching pod: %w" , err )
67+ return nil , fmt .Errorf ("error fetching pod: %w" , err )
6268 }
6369
6470 // Set the owner of the ResourceSlices we will create
@@ -69,32 +75,39 @@ func StartIMEXManager(ctx context.Context, config *Config) error {
6975 UID : pod .UID ,
7076 }
7177
78+ // Create the manager itself
79+ m := & ImexManager {
80+ clientset : clientset ,
81+ }
82+
7283 // Stream added/removed IMEX domains from nodes over time
7384 klog .Info ("Start streaming IMEX domains from nodes..." )
74- addedDomainsCh , removedDomainsCh , err := streamImexDomains (ctx , clientset )
85+ addedDomainsCh , removedDomainsCh , err := m . streamImexDomains (ctx )
7586 if err != nil {
76- return fmt .Errorf ("error streaming IMEX domains: %w" , err )
87+ return nil , fmt .Errorf ("error streaming IMEX domains: %w" , err )
7788 }
7889
7990 // Add/Remove resource slices from IMEX domains as they come and go
8091 klog .Info ("Start publishing IMEX channels to ResourceSlices..." )
81- err = manageResourceSlices (ctx , clientset , owner , addedDomainsCh , removedDomainsCh )
92+ err = m . manageResourceSlices (ctx , owner , addedDomainsCh , removedDomainsCh )
8293 if err != nil {
83- return fmt .Errorf ("error managing resource slices: %w" , err )
94+ return nil , fmt .Errorf ("error managing resource slices: %w" , err )
8495 }
8596
86- return nil
97+ return m , nil
8798}
8899
89100// manageResourceSlices reacts to added and removed IMEX domains and triggers the creation / removal of resource slices accordingly.
90- func manageResourceSlices (ctx context.Context , clientset kubernetes. Interface , owner resourceslice.Owner , addedDomainsCh <- chan string , removedDomainsCh <- chan string ) error {
101+ func ( m * ImexManager ) manageResourceSlices (ctx context.Context , owner resourceslice.Owner , addedDomainsCh <- chan string , removedDomainsCh <- chan string ) error {
91102 driverResources := resourceslice.DriverResources {}
92- controller , err := resourceslice .StartController (ctx , clientset , DriverName , owner , & driverResources )
103+ controller , err := resourceslice .StartController (ctx , m . clientset , DriverName , owner , & driverResources )
93104 if err != nil {
94105 return fmt .Errorf ("error starting resource slice controller: %w" , err )
95106 }
96107
108+ m .waitGroup .Add (1 )
97109 go func () {
110+ defer m .waitGroup .Done ()
98111 for {
99112 select {
100113 case addedDomain := <- addedDomainsCh :
@@ -118,6 +131,21 @@ func manageResourceSlices(ctx context.Context, clientset kubernetes.Interface, o
118131 return nil
119132}
120133
134+ // Stop stops a running ImexManager.
135+ func (m * ImexManager ) Stop () error {
136+ if m == nil {
137+ return nil
138+ }
139+
140+ m .waitGroup .Wait ()
141+ klog .Info ("Cleaning up all resourceSlices" )
142+ if err := m .cleanupResourceSlices (); err != nil {
143+ return fmt .Errorf ("error cleaning up resource slices: %w" , err )
144+ }
145+
146+ return nil
147+ }
148+
121149// DeepCopy will perform a deep copy of the provided DriverResources.
122150func (d DriverResources ) DeepCopy () resourceslice.DriverResources {
123151 driverResources := resourceslice.DriverResources {
@@ -130,7 +158,7 @@ func (d DriverResources) DeepCopy() resourceslice.DriverResources {
130158}
131159
132160// streamImexDomains returns two channels that streams imexDomans that are added and removed from nodes over time.
133- func streamImexDomains (ctx context.Context , clientset kubernetes. Interface ) (<- chan string , <- chan string , error ) {
161+ func ( m * ImexManager ) streamImexDomains (ctx context.Context ) (<- chan string , <- chan string , error ) {
134162 // Create channels to stream IMEX domain ids that are added / removed
135163 addedDomainCh := make (chan string )
136164 removedDomainCh := make (chan string )
@@ -147,7 +175,7 @@ func streamImexDomains(ctx context.Context, clientset kubernetes.Interface) (<-c
147175
148176 // Create a shared informer factory for nodes
149177 informerFactory := informers .NewSharedInformerFactoryWithOptions (
150- clientset ,
178+ m . clientset ,
151179 time .Minute * 10 , // Resync period
152180 informers .WithTweakListOptions (func (options * metav1.ListOptions ) {
153181 options .LabelSelector = labelSelector
@@ -206,7 +234,11 @@ func streamImexDomains(ctx context.Context, clientset kubernetes.Interface) (<-c
206234 }
207235
208236 // Start the informer and wait for it to sync
209- go informerFactory .Start (ctx .Done ())
237+ m .waitGroup .Add (1 )
238+ go func () {
239+ defer m .waitGroup .Done ()
240+ informerFactory .Start (ctx .Done ())
241+ }()
210242
211243 // Wait for the informer caches to sync
212244 if ! cache .WaitForCacheSync (ctx .Done (), nodeInformer .HasSynced ) {
@@ -259,3 +291,24 @@ func generateImexChannelPool(imexDomain string, numChannels int) resourceslice.P
259291
260292 return pool
261293}
294+
295+ // cleanupResourceSlices removes all resource slices created by the IMEX manager.
296+ func (m * ImexManager ) cleanupResourceSlices () error {
297+ // Delete all resource slices created by the IMEX manager
298+ ops := metav1.ListOptions {
299+ FieldSelector : fmt .Sprintf ("%s=%s" , resourceapi .ResourceSliceSelectorDriver , DriverName ),
300+ }
301+ l , err := m .clientset .ResourceV1alpha3 ().ResourceSlices ().List (context .Background (), ops )
302+ if err != nil {
303+ return fmt .Errorf ("error listing resource slices: %w" , err )
304+ }
305+
306+ for _ , rs := range l .Items {
307+ err := m .clientset .ResourceV1alpha3 ().ResourceSlices ().Delete (context .Background (), rs .Name , metav1.DeleteOptions {})
308+ if err != nil {
309+ return fmt .Errorf ("error deleting resource slice %s: %w" , rs .Name , err )
310+ }
311+ }
312+
313+ return nil
314+ }
0 commit comments