Skip to content

Commit 5069e1c

Browse files
committed
preliminary device health monitor
Signed-off-by: Swati Gupta <[email protected]>
1 parent 70a9f20 commit 5069e1c

File tree

4 files changed

+276
-9
lines changed

4 files changed

+276
-9
lines changed

cmd/gpu-kubelet-plugin/allocatable.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,19 @@ import (
2222
resourceapi "k8s.io/api/resource/v1"
2323
)
2424

25+
const (
26+
// Healthy means that the device is healthy
27+
Healthy = "Healthy"
28+
// Unhealthy means that the device is unhealthy
29+
Unhealthy = "Unhealthy"
30+
)
31+
2532
type AllocatableDevices map[string]*AllocatableDevice
2633

2734
type AllocatableDevice struct {
28-
Gpu *GpuInfo
29-
Mig *MigDeviceInfo
35+
Gpu *GpuInfo
36+
Mig *MigDeviceInfo
37+
Health string // from device-plugin
3038
}
3139

3240
func (d AllocatableDevice) Type() string {
@@ -69,6 +77,10 @@ func (d *AllocatableDevice) GetDevice() resourceapi.Device {
6977
panic("unexpected type for AllocatableDevice")
7078
}
7179

80+
func (d *AllocatableDevice) IsHealthy() bool {
81+
return d.Health == Healthy
82+
}
83+
7284
func (d AllocatableDevices) GpuUUIDs() []string {
7385
var uuids []string
7486
for _, device := range d {
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright 2025 The Kubernetes Authors.
3+
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package main
19+
20+
import (
21+
"context"
22+
"fmt"
23+
"sync"
24+
25+
"github.com/NVIDIA/go-nvml/pkg/nvml"
26+
"k8s.io/klog/v2"
27+
)
28+
29+
type deviceHealthMonitor struct {
30+
nvdevlib *deviceLib
31+
allocatable AllocatableDevices
32+
eventSet nvml.EventSet
33+
unhealthy chan *AllocatableDevice
34+
stop chan struct{}
35+
wg sync.WaitGroup
36+
}
37+
38+
func newDeviceHealthMonitor(ctx context.Context, config *Config) (*deviceHealthMonitor, error) {
39+
containerDriverRoot := root(config.flags.containerDriverRoot)
40+
nvdevlib, err := newDeviceLib(containerDriverRoot)
41+
if err != nil {
42+
return nil, fmt.Errorf("failed to create device library: %w", err)
43+
}
44+
45+
if err := nvdevlib.Init(); err != nil {
46+
return nil, fmt.Errorf("failed to initialize NVML: %w", err)
47+
}
48+
//defer nvdevlib.alwaysShutdown()
49+
50+
allocatable, err := nvdevlib.enumerateAllPossibleDevices(config)
51+
if err != nil {
52+
return nil, fmt.Errorf("error enumerating all possible devices: %w", err)
53+
}
54+
55+
eventSet, err := nvdevlib.nvmllib.EventSetCreate()
56+
if err != nvml.SUCCESS {
57+
return nil, fmt.Errorf("failed to create event set: %w", err)
58+
}
59+
60+
monitor := &deviceHealthMonitor{
61+
nvdevlib: nvdevlib,
62+
allocatable: allocatable,
63+
eventSet: eventSet,
64+
unhealthy: make(chan *AllocatableDevice, len(allocatable)),
65+
stop: make(chan struct{}),
66+
}
67+
68+
if err := monitor.registerDevicesForEvents(); err != nil {
69+
monitor.eventSet.Free()
70+
return nil, fmt.Errorf("failed to register devices for health monitoring: %w", err)
71+
}
72+
73+
monitor.start()
74+
return monitor, nil
75+
}
76+
77+
func (m *deviceHealthMonitor) registerDevicesForEvents() error {
78+
nvmllib := m.nvdevlib.nvmllib
79+
eventMask := uint64(nvml.EventTypeXidCriticalError | nvml.EventTypeDoubleBitEccError | nvml.EventTypeSingleBitEccError)
80+
81+
for _, uuid := range m.allocatable.UUIDs() {
82+
gpu, err := nvmllib.DeviceGetHandleByUUID(uuid)
83+
if err != nvml.SUCCESS {
84+
klog.Infof("Unable to get NVML handle for UUID %s: %v; skipping health check for this device", uuid, err)
85+
continue
86+
}
87+
88+
if err := gpu.RegisterEvents(eventMask, m.eventSet); err != nvml.SUCCESS {
89+
klog.Infof("Failed to register events for device %s: %v; skipping health check for this device", uuid, err)
90+
}
91+
}
92+
return nil
93+
}
94+
95+
func (m *deviceHealthMonitor) start() {
96+
m.wg.Add(1)
97+
go m.run()
98+
}
99+
100+
func (m *deviceHealthMonitor) Stop() {
101+
if m == nil {
102+
return
103+
}
104+
close(m.stop)
105+
m.wg.Wait()
106+
close(m.unhealthy)
107+
m.eventSet.Free()
108+
109+
if m.nvdevlib != nil {
110+
m.nvdevlib.alwaysShutdown()
111+
}
112+
}
113+
114+
func (m *deviceHealthMonitor) run() {
115+
defer m.wg.Done()
116+
117+
uuidToDeviceMap := make(map[string]*AllocatableDevice)
118+
for _, device := range m.allocatable {
119+
uuid := device.getUUID()
120+
if uuid != "" {
121+
uuidToDeviceMap[uuid] = device
122+
}
123+
}
124+
125+
klog.Info("Starting event-driven GPU health monitor...")
126+
127+
for {
128+
select {
129+
case <-m.stop:
130+
klog.Info("Stopping event-driven GPU health monitor...")
131+
return
132+
default:
133+
event, err := m.eventSet.Wait(5000)
134+
if err == nvml.ERROR_TIMEOUT {
135+
continue
136+
}
137+
if err != nvml.SUCCESS {
138+
klog.Infof("Error waiting for event: %v; Marking all devices as unhealthy", err)
139+
for _, dev := range m.allocatable {
140+
m.unhealthy <- dev
141+
}
142+
continue
143+
}
144+
145+
// Process health events
146+
switch event.EventType {
147+
case nvml.EventTypeXidCriticalError:
148+
klog.Warningf("Critical XID error detected on device")
149+
case nvml.EventTypeDoubleBitEccError:
150+
klog.Warningf("Double-bit ECC error detected on device")
151+
case nvml.EventTypeSingleBitEccError:
152+
klog.Infof("Single-bit ECC error detected on device")
153+
default:
154+
continue
155+
}
156+
157+
eventUUID, err := event.Device.GetUUID()
158+
if err != nvml.SUCCESS {
159+
klog.Infof("Failed to determine uuid for event %v: %v; Marking all devices as unhealthy.", event, err)
160+
for _, dev := range m.allocatable {
161+
m.unhealthy <- dev
162+
}
163+
continue
164+
}
165+
166+
device, exists := uuidToDeviceMap[eventUUID]
167+
if !exists {
168+
continue
169+
}
170+
171+
// Send notification to driver
172+
klog.Infof("Sending unhealthy notification for device %s due to event type %v", eventUUID, event.EventType)
173+
select {
174+
case m.unhealthy <- device:
175+
// Successfully sent notification
176+
default:
177+
// Channel full, log and continue
178+
klog.Warningf("Health notification channel full, dropping event for device %s", eventUUID)
179+
}
180+
}
181+
}
182+
}
183+
184+
func (m *deviceHealthMonitor) Unhealthy() <-chan *AllocatableDevice {
185+
return m.unhealthy
186+
}
187+
188+
func (d *AllocatableDevice) getUUID() string {
189+
if d.Gpu != nil {
190+
return d.Gpu.UUID
191+
}
192+
if d.Mig != nil {
193+
return d.Mig.UUID
194+
}
195+
return ""
196+
}

cmd/gpu-kubelet-plugin/driver.go

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@ import (
3939
const DriverPrepUprepFlockFileName = "pu.lock"
4040

4141
type driver struct {
42-
client coreclientset.Interface
43-
pluginhelper *kubeletplugin.Helper
44-
state *DeviceState
45-
pulock *flock.Flock
46-
healthcheck *healthcheck
42+
client coreclientset.Interface
43+
pluginhelper *kubeletplugin.Helper
44+
state *DeviceState
45+
pulock *flock.Flock
46+
healthcheck *healthcheck
47+
deviceHealthMonitor *deviceHealthMonitor
4748
}
4849

4950
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
@@ -93,10 +94,18 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
9394
}
9495
driver.healthcheck = healthcheck
9596

97+
deviceHealthMonitor, err := newDeviceHealthMonitor(ctx, config)
98+
if err != nil {
99+
return nil, fmt.Errorf("start deviceHealthMonitor: %w", err)
100+
}
101+
driver.deviceHealthMonitor = deviceHealthMonitor
102+
96103
if err := driver.pluginhelper.PublishResources(ctx, resources); err != nil {
97104
return nil, err
98105
}
99106

107+
go driver.handleHealthNotifications(ctx, config.flags.nodeName)
108+
100109
return driver, nil
101110
}
102111

@@ -109,6 +118,10 @@ func (d *driver) Shutdown() error {
109118
d.healthcheck.Stop()
110119
}
111120

121+
if d.deviceHealthMonitor != nil {
122+
d.deviceHealthMonitor.Stop()
123+
}
124+
112125
d.pluginhelper.Stop()
113126
return nil
114127
}
@@ -177,6 +190,50 @@ func (d *driver) nodeUnprepareResource(ctx context.Context, claimNs kubeletplugi
177190
return nil
178191
}
179192

193+
func (d *driver) handleHealthNotifications(ctx context.Context, nodeName string) {
194+
for {
195+
select {
196+
case <-ctx.Done():
197+
klog.Info("Stopping health notification handler")
198+
return
199+
case device, ok := <-d.deviceHealthMonitor.Unhealthy():
200+
if !ok {
201+
klog.Info("Health monitor channel closed")
202+
return
203+
}
204+
205+
uuid := device.getUUID()
206+
klog.Warningf("Received unhealthy notification for device: %s", uuid)
207+
208+
// Mark device as unhealthy in state
209+
//d.state.MarkUnhealthy(device)
210+
211+
// Rebuild resource slice with only healthy devices
212+
var resourceSlice resourceslice.Slice
213+
for _, dev := range d.state.allocatable {
214+
if dev.IsHealthy() {
215+
resourceSlice.Devices = append(resourceSlice.Devices, dev.GetDevice())
216+
} else {
217+
klog.Errorf("device:%v with uuid:%s is unhealthy", uuid, dev)
218+
}
219+
}
220+
221+
// Republish updated resources
222+
resources := resourceslice.DriverResources{
223+
Pools: map[string]resourceslice.Pool{
224+
nodeName: {Slices: []resourceslice.Slice{resourceSlice}},
225+
},
226+
}
227+
228+
if err := d.pluginhelper.PublishResources(ctx, resources); err != nil {
229+
klog.Errorf("Failed to publish resources after health change: %v", err)
230+
} else {
231+
klog.Infof("Successfully republished resources after marking device %s unhealthy", uuid)
232+
}
233+
}
234+
}
235+
}
236+
180237
// TODO: implement loop to remove CDI files from the CDI path for claimUIDs
181238
// that have been removed from the AllocatedClaims map.
182239
// func (d *driver) cleanupCDIFiles(wg *sync.WaitGroup) chan error {

cmd/gpu-kubelet-plugin/nvlib.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ func (l deviceLib) enumerateGpusAndMigDevices(config *Config) (AllocatableDevice
129129
}
130130

131131
deviceInfo := &AllocatableDevice{
132-
Gpu: gpuInfo,
132+
Gpu: gpuInfo,
133+
Health: Healthy,
133134
}
134135
devices[gpuInfo.CanonicalName()] = deviceInfo
135136

@@ -140,7 +141,8 @@ func (l deviceLib) enumerateGpusAndMigDevices(config *Config) (AllocatableDevice
140141

141142
for _, migDeviceInfo := range migs {
142143
deviceInfo := &AllocatableDevice{
143-
Mig: migDeviceInfo,
144+
Mig: migDeviceInfo,
145+
Health: Healthy,
144146
}
145147
devices[migDeviceInfo.CanonicalName()] = deviceInfo
146148
}

0 commit comments

Comments
 (0)