Skip to content

Commit 6beddd6

Browse files
klueskaArangoGutierrez
authored andcommitted
Add more synchronization around the shutdown of the IMEX manager
Signed-off-by: Kevin Klues <[email protected]> Signed-off-by: Carlos Eduardo Arango Gutierrez <[email protected]>
1 parent c50aa21 commit 6beddd6

File tree

2 files changed

+69
-36
lines changed

2 files changed

+69
-36
lines changed

cmd/nvidia-dra-controller/imex.go

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ package main
1919
import (
2020
"context"
2121
"fmt"
22-
"os"
23-
"os/signal"
24-
"syscall"
22+
"sync"
2523
"time"
2624

2725
v1 "k8s.io/api/core/v1"
@@ -43,25 +41,30 @@ const (
4341
ImexChannelLimit = 128
4442
)
4543

44+
type ImexManager struct {
45+
waitGroup sync.WaitGroup
46+
clientset kubernetes.Interface
47+
}
48+
4649
type DriverResources resourceslice.DriverResources
4750

48-
func StartIMEXManager(ctx context.Context, config *Config) error {
51+
func StartIMEXManager(ctx context.Context, config *Config) (*ImexManager, error) {
4952
// Build a client set config
5053
csconfig, err := config.flags.kubeClientConfig.NewClientSetConfig()
5154
if err != nil {
52-
return fmt.Errorf("error creating client set config: %w", err)
55+
return nil, fmt.Errorf("error creating client set config: %w", err)
5356
}
5457

5558
// Create a new clientset
5659
clientset, err := kubernetes.NewForConfig(csconfig)
5760
if err != nil {
58-
return fmt.Errorf("error creating dynamic client: %w", err)
61+
return nil, fmt.Errorf("error creating dynamic client: %w", err)
5962
}
6063

6164
// Fetch the current Pod object
6265
pod, err := clientset.CoreV1().Pods(config.flags.namespace).Get(ctx, config.flags.podName, metav1.GetOptions{})
6366
if err != nil {
64-
return fmt.Errorf("error fetching pod: %w", err)
67+
return nil, fmt.Errorf("error fetching pod: %w", err)
6568
}
6669

6770
// Set the owner of the ResourceSlices we will create
@@ -72,36 +75,39 @@ func StartIMEXManager(ctx context.Context, config *Config) error {
7275
UID: pod.UID,
7376
}
7477

78+
// Create the manager itself
79+
m := &ImexManager{
80+
clientset: clientset,
81+
}
82+
7583
// Stream added/removed IMEX domains from nodes over time
7684
klog.Info("Start streaming IMEX domains from nodes...")
77-
addedDomainsCh, removedDomainsCh, err := streamImexDomains(ctx, clientset)
85+
addedDomainsCh, removedDomainsCh, err := m.streamImexDomains(ctx)
7886
if err != nil {
79-
return fmt.Errorf("error streaming IMEX domains: %w", err)
87+
return nil, fmt.Errorf("error streaming IMEX domains: %w", err)
8088
}
8189

8290
// Add/Remove resource slices from IMEX domains as they come and go
8391
klog.Info("Start publishing IMEX channels to ResourceSlices...")
84-
err = manageResourceSlices(ctx, clientset, owner, addedDomainsCh, removedDomainsCh)
92+
err = m.manageResourceSlices(ctx, owner, addedDomainsCh, removedDomainsCh)
8593
if err != nil {
86-
return fmt.Errorf("error managing resource slices: %w", err)
94+
return nil, fmt.Errorf("error managing resource slices: %w", err)
8795
}
8896

89-
return nil
97+
return m, nil
9098
}
9199

92100
// manageResourceSlices reacts to added and removed IMEX domains and triggers the creation / removal of resource slices accordingly.
93-
func manageResourceSlices(ctx context.Context, clientset kubernetes.Interface, owner resourceslice.Owner, addedDomainsCh <-chan string, removedDomainsCh <-chan string) error {
101+
func (m *ImexManager) manageResourceSlices(ctx context.Context, owner resourceslice.Owner, addedDomainsCh <-chan string, removedDomainsCh <-chan string) error {
94102
driverResources := resourceslice.DriverResources{}
95-
controller, err := resourceslice.StartController(ctx, clientset, DriverName, owner, &driverResources)
103+
controller, err := resourceslice.StartController(ctx, m.clientset, DriverName, owner, &driverResources)
96104
if err != nil {
97105
return fmt.Errorf("error starting resource slice controller: %w", err)
98106
}
99107

100-
// Setup signal catching
101-
sigs := make(chan os.Signal, 1)
102-
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
103-
108+
m.waitGroup.Add(1)
104109
go func() {
110+
defer m.waitGroup.Done()
105111
for {
106112
select {
107113
case addedDomain := <-addedDomainsCh:
@@ -116,12 +122,7 @@ func manageResourceSlices(ctx context.Context, clientset kubernetes.Interface, o
116122
delete(newDriverResources.Pools, removedDomain)
117123
controller.Update(&newDriverResources)
118124
driverResources = newDriverResources
119-
case <-sigs:
120-
controller.Stop()
121-
err = cleanupImexChannels(ctx, clientset)
122-
if err != nil {
123-
klog.Errorf("error cleaning up resource slices: %v", err)
124-
}
125+
case <-ctx.Done():
125126
return
126127
}
127128
}
@@ -130,6 +131,21 @@ func manageResourceSlices(ctx context.Context, clientset kubernetes.Interface, o
130131
return nil
131132
}
132133

134+
// Stop stops a running ImexManager.
135+
func (m *ImexManager) Stop() error {
136+
if m == nil {
137+
return nil
138+
}
139+
140+
m.waitGroup.Wait()
141+
klog.Info("Cleaning up all resourceSlices")
142+
if err := m.cleanupResourceSlices(); err != nil {
143+
return fmt.Errorf("error cleaning up resource slices: %w", err)
144+
}
145+
146+
return nil
147+
}
148+
133149
// DeepCopy will perform a deep copy of the provided DriverResources.
134150
func (d DriverResources) DeepCopy() resourceslice.DriverResources {
135151
driverResources := resourceslice.DriverResources{
@@ -142,7 +158,7 @@ func (d DriverResources) DeepCopy() resourceslice.DriverResources {
142158
}
143159

144160
// streamImexDomains returns two channels that streams imexDomans that are added and removed from nodes over time.
145-
func streamImexDomains(ctx context.Context, clientset kubernetes.Interface) (<-chan string, <-chan string, error) {
161+
func (m *ImexManager) streamImexDomains(ctx context.Context) (<-chan string, <-chan string, error) {
146162
// Create channels to stream IMEX domain ids that are added / removed
147163
addedDomainCh := make(chan string)
148164
removedDomainCh := make(chan string)
@@ -159,7 +175,7 @@ func streamImexDomains(ctx context.Context, clientset kubernetes.Interface) (<-c
159175

160176
// Create a shared informer factory for nodes
161177
informerFactory := informers.NewSharedInformerFactoryWithOptions(
162-
clientset,
178+
m.clientset,
163179
time.Minute*10, // Resync period
164180
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
165181
options.LabelSelector = labelSelector
@@ -218,7 +234,11 @@ func streamImexDomains(ctx context.Context, clientset kubernetes.Interface) (<-c
218234
}
219235

220236
// Start the informer and wait for it to sync
221-
go informerFactory.Start(ctx.Done())
237+
m.waitGroup.Add(1)
238+
go func() {
239+
defer m.waitGroup.Done()
240+
informerFactory.Start(ctx.Done())
241+
}()
222242

223243
// Wait for the informer caches to sync
224244
if !cache.WaitForCacheSync(ctx.Done(), nodeInformer.HasSynced) {
@@ -272,22 +292,21 @@ func generateImexChannelPool(imexDomain string, numChannels int) resourceslice.P
272292
return pool
273293
}
274294

275-
// cleanupImexChannels removes all resource slices created by the IMEX manager.
276-
func cleanupImexChannels(ctx context.Context, clientset kubernetes.Interface) error {
295+
// cleanupResourceSlices removes all resource slices created by the IMEX manager.
296+
func (m *ImexManager) cleanupResourceSlices() error {
277297
// Delete all resource slices created by the IMEX manager
278298
ops := metav1.ListOptions{
279299
FieldSelector: fmt.Sprintf("%s=%s", resourceapi.ResourceSliceSelectorDriver, DriverName),
280300
}
281-
l, err := clientset.ResourceV1alpha3().ResourceSlices().List(ctx, ops)
301+
l, err := m.clientset.ResourceV1alpha3().ResourceSlices().List(context.Background(), ops)
282302
if err != nil {
283303
return fmt.Errorf("error listing resource slices: %w", err)
284304
}
285305

286306
for _, rs := range l.Items {
287-
klog.Info("Deleting resource slice: ", rs.Name)
288-
err := clientset.ResourceV1alpha3().ResourceSlices().Delete(ctx, rs.Name, metav1.DeleteOptions{})
307+
err := m.clientset.ResourceV1alpha3().ResourceSlices().Delete(context.Background(), rs.Name, metav1.DeleteOptions{})
289308
if err != nil {
290-
return fmt.Errorf("error deleting resource slice %s: %w", rs.Name, err)
309+
return fmt.Errorf("Error deleting resource slice %s: %w", rs.Name, err)
291310
}
292311
}
293312

cmd/nvidia-dra-controller/main.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
package main
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"net"
2223
"net/http"
2324
"net/http/pprof"
2425
"os"
26+
"os/signal"
2527
"path"
28+
"syscall"
2629

2730
"github.com/prometheus/client_golang/prometheus"
2831
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -132,7 +135,6 @@ func newApp() *cli.App {
132135
return flags.loggingConfig.Apply()
133136
},
134137
Action: func(c *cli.Context) error {
135-
ctx := c.Context
136138
mux := http.NewServeMux()
137139
flags.deviceClasses = sets.New[string](c.StringSlice("device-classes")...)
138140

@@ -154,14 +156,26 @@ func newApp() *cli.App {
154156
}
155157
}
156158

159+
sigs := make(chan os.Signal, 1)
160+
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
161+
162+
var imexManager *ImexManager
163+
ctx, cancel := context.WithCancel(c.Context)
164+
defer func() {
165+
cancel()
166+
if err := imexManager.Stop(); err != nil {
167+
klog.Errorf("error stopping IMEX manager: %v", err)
168+
}
169+
}()
170+
157171
if flags.deviceClasses.Has(ImexChannelType) {
158-
err = StartIMEXManager(ctx, config)
172+
imexManager, err = StartIMEXManager(ctx, config)
159173
if err != nil {
160174
return fmt.Errorf("start IMEX manager: %w", err)
161175
}
162176
}
163177

164-
<-ctx.Done()
178+
<-sigs
165179

166180
return nil
167181
},

0 commit comments

Comments
 (0)