Skip to content

Commit 7875a1b

Browse files
refactor: extract health checking into HealthProvider interface
Extract device health checking logic into a dedicated HealthProvider interface with proper lifecycle management using WaitGroups and context. - Add HealthProvider interface (Start/Stop/Health methods) - Implement nvmlHealthProvider with WaitGroup coordination - Update ResourceManager to return HealthProvider instead of CheckHealth - Update device plugin to use HealthProvider - Add no-op implementation for Tegra devices This refactoring improves code modularity and testability without changing existing behavior. Prepares foundation for future device recovery features. Signed-off-by: Carlos Eduardo Arango Gutierrez <[email protected]>
1 parent 0635d3e commit 7875a1b

File tree

6 files changed

+491
-235
lines changed

6 files changed

+491
-235
lines changed

internal/plugin/server.go

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,10 @@ type nvidiaDevicePlugin struct {
6161

6262
socket string
6363
server *grpc.Server
64-
health chan *rm.Device
65-
stop chan interface{}
64+
65+
// Health monitoring
66+
healthProvider rm.HealthProvider
67+
cancel context.CancelFunc
6668

6769
imexChannels imex.Channels
6870

@@ -90,11 +92,9 @@ func (o *options) devicePluginForResource(ctx context.Context, resourceManager r
9092
mps: mpsOptions,
9193

9294
socket: getPluginSocketPath(resourceManager.Resource()),
93-
// These will be reinitialized every
94-
// time the plugin server is restarted.
95+
// server and healthProvider will be reinitialized every time
96+
// the plugin server is restarted.
9597
server: nil,
96-
health: nil,
97-
stop: nil,
9898
}
9999
return &plugin, nil
100100
}
@@ -108,15 +108,11 @@ func getPluginSocketPath(resource spec.ResourceName) string {
108108

109109
func (plugin *nvidiaDevicePlugin) initialize() {
110110
plugin.server = grpc.NewServer([]grpc.ServerOption{}...)
111-
plugin.health = make(chan *rm.Device)
112-
plugin.stop = make(chan interface{})
113111
}
114112

115113
func (plugin *nvidiaDevicePlugin) cleanup() {
116-
close(plugin.stop)
117114
plugin.server = nil
118-
plugin.health = nil
119-
plugin.stop = nil
115+
plugin.healthProvider = nil
120116
}
121117

122118
// Devices returns the full set of devices associated with the plugin.
@@ -148,13 +144,14 @@ func (plugin *nvidiaDevicePlugin) Start(kubeletSocket string) error {
148144
}
149145
klog.Infof("Registered device plugin for '%s' with Kubelet", plugin.rm.Resource())
150146

151-
go func() {
152-
// TODO: add MPS health check
153-
err := plugin.rm.CheckHealth(plugin.stop, plugin.health)
154-
if err != nil {
155-
klog.Errorf("Failed to start health check: %v; continuing with health checks disabled", err)
156-
}
157-
}()
147+
// Initialize and start health provider
148+
plugin.ctx, plugin.cancel = context.WithCancel(context.Background())
149+
plugin.healthProvider = plugin.rm.HealthProvider()
150+
151+
// TODO: add MPS health check
152+
if err := plugin.healthProvider.Start(plugin.ctx); err != nil {
153+
klog.Errorf("Failed to start health provider: %v; continuing with health checks disabled", err)
154+
}
158155

159156
return nil
160157
}
@@ -164,6 +161,15 @@ func (plugin *nvidiaDevicePlugin) Stop() error {
164161
if plugin == nil || plugin.server == nil {
165162
return nil
166163
}
164+
165+
// Stop health monitoring
166+
if plugin.healthProvider != nil {
167+
plugin.healthProvider.Stop()
168+
}
169+
if plugin.cancel != nil {
170+
plugin.cancel()
171+
}
172+
167173
klog.Infof("Stopping to serve '%s' on %s", plugin.rm.Resource(), plugin.socket)
168174
plugin.server.Stop()
169175
if err := os.Remove(plugin.socket); err != nil && !os.IsNotExist(err) {
@@ -269,13 +275,19 @@ func (plugin *nvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.D
269275
return err
270276
}
271277

278+
// If health provider not available, wait for context cancellation
279+
if plugin.healthProvider == nil {
280+
<-plugin.ctx.Done()
281+
return nil
282+
}
283+
272284
for {
273285
select {
274-
case <-plugin.stop:
286+
case <-plugin.ctx.Done():
275287
return nil
276-
case d := <-plugin.health:
277-
// FIXME: there is no way to recover from the Unhealthy state.
278-
d.Health = pluginapi.Unhealthy
288+
case d := <-plugin.healthProvider.Health():
289+
// Device became unhealthy
290+
// Device.Health already set to Unhealthy by health provider
279291
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
280292
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
281293
return nil

0 commit comments

Comments
 (0)