Skip to content

Commit a4566a9

Browse files
committed
publish health status
Signed-off-by: Swati Gupta <[email protected]>
1 parent 5069e1c commit a4566a9

File tree

4 files changed

+107
-28
lines changed

4 files changed

+107
-28
lines changed

cmd/gpu-kubelet-plugin/allocatable.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ func (d *AllocatableDevice) IsHealthy() bool {
8181
return d.Health == Healthy
8282
}
8383

84+
func (d *AllocatableDevice) GetUUID() string {
85+
if d.Gpu != nil {
86+
return d.Gpu.UUID
87+
}
88+
if d.Mig != nil {
89+
return d.Mig.UUID
90+
}
91+
return ""
92+
}
93+
8494
func (d AllocatableDevices) GpuUUIDs() []string {
8595
var uuids []string
8696
for _, device := range d {

cmd/gpu-kubelet-plugin/device_health.go

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,20 @@ type deviceHealthMonitor struct {
3535
wg sync.WaitGroup
3636
}
3737

38-
func newDeviceHealthMonitor(ctx context.Context, config *Config) (*deviceHealthMonitor, error) {
39-
containerDriverRoot := root(config.flags.containerDriverRoot)
40-
nvdevlib, err := newDeviceLib(containerDriverRoot)
41-
if err != nil {
42-
return nil, fmt.Errorf("failed to create device library: %w", err)
43-
}
44-
38+
func newDeviceHealthMonitor(ctx context.Context, config *Config, allocatable AllocatableDevices, nvdevlib *deviceLib) (*deviceHealthMonitor, error) {
39+
klog.Info("[SWATI DEBUG] initializing NVML..")
4540
if err := nvdevlib.Init(); err != nil {
4641
return nil, fmt.Errorf("failed to initialize NVML: %w", err)
4742
}
4843
//defer nvdevlib.alwaysShutdown()
4944

50-
allocatable, err := nvdevlib.enumerateAllPossibleDevices(config)
51-
if err != nil {
52-
return nil, fmt.Errorf("error enumerating all possible devices: %w", err)
53-
}
45+
//klog.Info("[SWATI DEBUG] getting all devices..")
46+
//allocatable, err := nvdevlib.enumerateAllPossibleDevices(config)
47+
//if err != nil {
48+
// return nil, fmt.Errorf("error enumerating all possible devices: %w", err)
49+
//}
5450

51+
klog.Info("[SWATI DEBUG] creating NVML events")
5552
eventSet, err := nvdevlib.nvmllib.EventSetCreate()
5653
if err != nvml.SUCCESS {
5754
return nil, fmt.Errorf("failed to create event set: %w", err)
@@ -65,6 +62,7 @@ func newDeviceHealthMonitor(ctx context.Context, config *Config) (*deviceHealthM
6562
stop: make(chan struct{}),
6663
}
6764

65+
klog.Info("[SWATI DEBUG] registering NVML events")
6866
if err := monitor.registerDevicesForEvents(); err != nil {
6967
monitor.eventSet.Free()
7068
return nil, fmt.Errorf("failed to register devices for health monitoring: %w", err)
@@ -93,6 +91,7 @@ func (m *deviceHealthMonitor) registerDevicesForEvents() error {
9391
}
9492

9593
func (m *deviceHealthMonitor) start() {
94+
klog.Info("[SWATI DEBUG] starting health monitor")
9695
m.wg.Add(1)
9796
go m.run()
9897
}
@@ -101,6 +100,7 @@ func (m *deviceHealthMonitor) Stop() {
101100
if m == nil {
102101
return
103102
}
103+
klog.Info("[SWATI DEBUG] stopping health monitor")
104104
close(m.stop)
105105
m.wg.Wait()
106106
close(m.unhealthy)
@@ -116,7 +116,7 @@ func (m *deviceHealthMonitor) run() {
116116

117117
uuidToDeviceMap := make(map[string]*AllocatableDevice)
118118
for _, device := range m.allocatable {
119-
uuid := device.getUUID()
119+
uuid := device.GetUUID()
120120
if uuid != "" {
121121
uuidToDeviceMap[uuid] = device
122122
}
@@ -132,6 +132,7 @@ func (m *deviceHealthMonitor) run() {
132132
default:
133133
event, err := m.eventSet.Wait(5000)
134134
if err == nvml.ERROR_TIMEOUT {
135+
klog.Info("[SWATI DEBUG] timedout")
135136
continue
136137
}
137138
if err != nvml.SUCCESS {
@@ -145,11 +146,11 @@ func (m *deviceHealthMonitor) run() {
145146
// Process health events
146147
switch event.EventType {
147148
case nvml.EventTypeXidCriticalError:
148-
klog.Warningf("Critical XID error detected on device")
149+
klog.Warningf("Critical XID error detected on device: %+v", event)
149150
case nvml.EventTypeDoubleBitEccError:
150-
klog.Warningf("Double-bit ECC error detected on device")
151+
klog.Warningf("Double-bit ECC error detected on device: %+v", event)
151152
case nvml.EventTypeSingleBitEccError:
152-
klog.Infof("Single-bit ECC error detected on device")
153+
klog.Infof("Single-bit ECC error detected on device:%+v", event)
153154
default:
154155
continue
155156
}
@@ -184,13 +185,3 @@ func (m *deviceHealthMonitor) run() {
184185
func (m *deviceHealthMonitor) Unhealthy() <-chan *AllocatableDevice {
185186
return m.unhealthy
186187
}
187-
188-
func (d *AllocatableDevice) getUUID() string {
189-
if d.Gpu != nil {
190-
return d.Gpu.UUID
191-
}
192-
if d.Mig != nil {
193-
return d.Mig.UUID
194-
}
195-
return ""
196-
}

cmd/gpu-kubelet-plugin/device_state.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import (
2323
"sync"
2424

2525
resourceapi "k8s.io/api/resource/v1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2627
"k8s.io/apimachinery/pkg/runtime"
28+
metav1apply "k8s.io/client-go/applyconfigurations/meta/v1"
29+
resourceapply "k8s.io/client-go/applyconfigurations/resource/v1"
2730
"k8s.io/dynamic-resource-allocation/kubeletplugin"
2831
"k8s.io/klog/v2"
2932
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
@@ -285,8 +288,11 @@ func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.Res
285288
Config: configapi.DefaultMigDeviceConfig(),
286289
})
287290

291+
// Swati: Add resourceclaim status update
288292
// Look through the configs and figure out which one will be applied to
289293
// each device allocation result based on their order of precedence and type.
294+
resourceClaimStatus := resourceapply.ResourceClaimStatus()
295+
var deviceStatuses []*resourceapply.AllocatedDeviceStatusApplyConfiguration
290296
configResultsMap := make(map[runtime.Object][]*resourceapi.DeviceRequestAllocationResult)
291297
for _, result := range claim.Status.Allocation.Devices.Results {
292298
if result.Driver != DriverName {
@@ -296,6 +302,41 @@ func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.Res
296302
if !exists {
297303
return nil, fmt.Errorf("requested device is not allocatable: %v", result.Device)
298304
}
305+
// Swati add health check
306+
klog.Info("[SWATI DEBUG] adding device status")
307+
deviceStatus := resourceapply.AllocatedDeviceStatus().
308+
WithDevice(result.Device).
309+
WithDriver(result.Driver).
310+
WithPool(result.Pool)
311+
312+
if device.Health == Unhealthy {
313+
deviceStatus = deviceStatus.WithConditions(
314+
metav1apply.Condition().
315+
WithType("Ready").
316+
WithStatus(metav1.ConditionFalse).
317+
WithReason("Unhealthy").
318+
WithMessage(fmt.Sprintf("Device %s is not healthy", result.Device)).
319+
WithLastTransitionTime(metav1.Now()),
320+
)
321+
klog.Warningf("Device %s is unhealthy, marking as not ready", result.Device)
322+
} else {
323+
deviceStatus = deviceStatus.WithConditions(
324+
metav1apply.Condition().
325+
WithType("Ready").
326+
WithStatus(metav1.ConditionTrue).
327+
WithReason("Healthy").
328+
WithMessage("Device is healthy and ready").
329+
WithLastTransitionTime(metav1.Now()),
330+
)
331+
klog.Infof("Device %s is healthy, marking as ready", result.Device)
332+
}
333+
deviceStatuses = append(deviceStatuses, deviceStatus)
334+
335+
// Only proceed with config mapping for healthy devices
336+
if device.Health == Unhealthy {
337+
continue
338+
}
339+
299340
for _, c := range slices.Backward(configs) {
300341
if slices.Contains(c.Requests, result.Request) {
301342
if _, ok := c.Config.(*configapi.GpuConfig); ok && device.Type() != GpuDeviceType {
@@ -319,7 +360,25 @@ func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.Res
319360
}
320361
}
321362
}
363+
resourceClaimStatus = resourceClaimStatus.WithDevices(deviceStatuses...)
364+
365+
// Update the resource claim status
366+
resourceClaimApply := resourceapply.ResourceClaim(claim.Name, claim.Namespace).WithStatus(resourceClaimStatus)
367+
_, err = s.config.clientsets.Resource.ResourceClaims(claim.Namespace).ApplyStatus(ctx,
368+
resourceClaimApply,
369+
metav1.ApplyOptions{FieldManager: DriverName, Force: true},
370+
)
371+
372+
if err != nil {
373+
klog.Infof("failed to update status for claim %s/%s : %v", claim.Namespace, claim.Name, err)
374+
} else {
375+
klog.Infof("update status for claim %s/%s", claim.Namespace, claim.Name)
376+
}
322377

378+
// If no healthy devices are available for configuration, return
379+
if len(configResultsMap) == 0 {
380+
return nil, fmt.Errorf("no healthy devices available for allocation")
381+
}
323382
// Normalize, validate, and apply all configs associated with devices that
324383
// need to be prepared. Track device group configs generated from applying the
325384
// config to the set of device allocation results.
@@ -550,6 +609,21 @@ func GetOpaqueDeviceConfigs(
550609
return resultConfigs, nil
551610
}
552611

612+
func (s *DeviceState) MarkDeviceUnhealthy(unhealthyDevice *AllocatableDevice) {
613+
s.Lock()
614+
defer s.Unlock()
615+
616+
uuid := unhealthyDevice.GetUUID()
617+
device, ok := s.allocatable[uuid]
618+
if !ok {
619+
klog.Warningf("Attempted to mark unknown device as unhealthy: %s", uuid)
620+
return
621+
}
622+
623+
device.Health = Unhealthy
624+
klog.Infof("Marked device:%s unhealthy", uuid)
625+
}
626+
553627
// TODO: Dynamic MIG is not yet supported with structured parameters.
554628
// Refactor this to allow for the allocation of statically partitioned MIG
555629
// devices.

cmd/gpu-kubelet-plugin/driver.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,11 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
9494
}
9595
driver.healthcheck = healthcheck
9696

97-
deviceHealthMonitor, err := newDeviceHealthMonitor(ctx, config)
97+
deviceHealthMonitor, err := newDeviceHealthMonitor(ctx, config, state.allocatable, state.nvdevlib)
9898
if err != nil {
9999
return nil, fmt.Errorf("start deviceHealthMonitor: %w", err)
100100
}
101+
klog.Info("[SWATI DEBUGS] Started device health monitor")
101102
driver.deviceHealthMonitor = deviceHealthMonitor
102103

103104
if err := driver.pluginhelper.PublishResources(ctx, resources); err != nil {
@@ -191,6 +192,7 @@ func (d *driver) nodeUnprepareResource(ctx context.Context, claimNs kubeletplugi
191192
}
192193

193194
func (d *driver) handleHealthNotifications(ctx context.Context, nodeName string) {
195+
klog.Info("[SWATI DEBUG] handling Health Notifications")
194196
for {
195197
select {
196198
case <-ctx.Done():
@@ -202,23 +204,25 @@ func (d *driver) handleHealthNotifications(ctx context.Context, nodeName string)
202204
return
203205
}
204206

205-
uuid := device.getUUID()
207+
uuid := device.GetUUID()
206208
klog.Warningf("Received unhealthy notification for device: %s", uuid)
207209

208210
// Mark device as unhealthy in state
209-
//d.state.MarkUnhealthy(device)
211+
d.state.MarkDeviceUnhealthy(device)
210212

211213
// Rebuild resource slice with only healthy devices
212214
var resourceSlice resourceslice.Slice
213215
for _, dev := range d.state.allocatable {
214216
if dev.IsHealthy() {
217+
klog.Infof("[SWATI DEBUG] device is healthy, added to resoureslice: %v", dev)
215218
resourceSlice.Devices = append(resourceSlice.Devices, dev.GetDevice())
216219
} else {
217220
klog.Errorf("device:%v with uuid:%s is unhealthy", uuid, dev)
218221
}
219222
}
220223

221224
// Republish updated resources
225+
klog.Info("[SWATI DEBUG] rebulishing resourceslice with healthy devices")
222226
resources := resourceslice.DriverResources{
223227
Pools: map[string]resourceslice.Pool{
224228
nodeName: {Slices: []resourceslice.Slice{resourceSlice}},

0 commit comments

Comments
 (0)