Skip to content

Commit d292c81

Browse files
authored
Merge pull request #453 from guptaNswati/liveness-probe-kp
Add liveness probe to kubelet-plugin
2 parents 4f95467 + 9ecf47f commit d292c81

File tree

12 files changed

+1045
-1
lines changed

12 files changed

+1045
-1
lines changed

cmd/compute-domain-kubelet-plugin/driver.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type driver struct {
6363
pluginhelper *kubeletplugin.Helper
6464
state *DeviceState
6565
pulock *flock.Flock
66+
healthcheck *healthcheck
6667
}
6768

6869
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
@@ -121,6 +122,12 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
121122
return nil, err
122123
}
123124

125+
healthcheck, err := startHealthcheck(ctx, config)
126+
if err != nil {
127+
return nil, fmt.Errorf("start healthcheck: %w", err)
128+
}
129+
driver.healthcheck = healthcheck
130+
124131
if err := driver.pluginhelper.PublishResources(ctx, resources); err != nil {
125132
return nil, err
126133
}
@@ -132,9 +139,15 @@ func (d *driver) Shutdown() error {
132139
if d == nil {
133140
return nil
134141
}
142+
135143
if err := d.state.computeDomainManager.Stop(); err != nil {
136144
return fmt.Errorf("error stopping ComputeDomainManager: %w", err)
137145
}
146+
147+
if d.healthcheck != nil {
148+
d.healthcheck.Stop()
149+
}
150+
138151
d.pluginhelper.Stop()
139152
return nil
140153
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright 2025 The Kubernetes Authors.
3+
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package main
19+
20+
import (
21+
"context"
22+
"fmt"
23+
"net"
24+
"net/url"
25+
"path"
26+
"strconv"
27+
"sync"
28+
29+
"google.golang.org/grpc"
30+
"google.golang.org/grpc/codes"
31+
"google.golang.org/grpc/credentials/insecure"
32+
"google.golang.org/grpc/health/grpc_health_v1"
33+
"google.golang.org/grpc/status"
34+
"k8s.io/klog/v2"
35+
drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1"
36+
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
37+
)
38+
39+
type healthcheck struct {
40+
grpc_health_v1.UnimplementedHealthServer
41+
42+
server *grpc.Server
43+
wg sync.WaitGroup
44+
45+
regClient registerapi.RegistrationClient
46+
draClient drapb.DRAPluginClient
47+
}
48+
49+
func startHealthcheck(ctx context.Context, config *Config) (*healthcheck, error) {
50+
port := config.flags.healthcheckPort
51+
if port < 0 {
52+
return nil, nil
53+
}
54+
55+
addr := net.JoinHostPort("", strconv.Itoa(port))
56+
lis, err := net.Listen("tcp", addr)
57+
if err != nil {
58+
return nil, fmt.Errorf("failed to listen for healthcheck service at %s: %w", addr, err)
59+
}
60+
61+
regSockPath := (&url.URL{
62+
Scheme: "unix",
63+
// TODO: this needs to adapt when seamless upgrades
64+
// are enabled and the filename includes a uid.
65+
Path: path.Join(config.flags.kubeletRegistrarDirectoryPath, DriverName+"-reg.sock"),
66+
}).String()
67+
klog.V(6).Infof("connecting to registration socket path=%s", regSockPath)
68+
regConn, err := grpc.NewClient(
69+
regSockPath,
70+
grpc.WithTransportCredentials(insecure.NewCredentials()),
71+
)
72+
if err != nil {
73+
return nil, fmt.Errorf("connect to registration socket: %w", err)
74+
}
75+
76+
draSockPath := (&url.URL{
77+
Scheme: "unix",
78+
Path: path.Join(config.DriverPluginPath(), "dra.sock"),
79+
}).String()
80+
klog.V(6).Infof("connecting to DRA socket path=%s", draSockPath)
81+
draConn, err := grpc.NewClient(
82+
draSockPath,
83+
grpc.WithTransportCredentials(insecure.NewCredentials()),
84+
)
85+
if err != nil {
86+
return nil, fmt.Errorf("connect to DRA socket: %w", err)
87+
}
88+
89+
server := grpc.NewServer()
90+
healthcheck := &healthcheck{
91+
server: server,
92+
regClient: registerapi.NewRegistrationClient(regConn),
93+
draClient: drapb.NewDRAPluginClient(draConn),
94+
}
95+
grpc_health_v1.RegisterHealthServer(server, healthcheck)
96+
97+
healthcheck.wg.Add(1)
98+
go func() {
99+
defer healthcheck.wg.Done()
100+
klog.Infof("starting healthcheck service at %s", lis.Addr().String())
101+
if err := server.Serve(lis); err != nil {
102+
klog.Errorf("failed to serve healthcheck service on %s: %v", addr, err)
103+
}
104+
}()
105+
106+
return healthcheck, nil
107+
}
108+
109+
func (h *healthcheck) Stop() {
110+
if h.server != nil {
111+
klog.Info("Stopping healthcheck service")
112+
h.server.GracefulStop()
113+
}
114+
h.wg.Wait()
115+
}
116+
117+
// Check implements [grpc_health_v1.HealthServer].
118+
func (h *healthcheck) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
119+
knownServices := map[string]struct{}{"": {}, "liveness": {}}
120+
if _, known := knownServices[req.GetService()]; !known {
121+
return nil, status.Error(codes.NotFound, "unknown service")
122+
}
123+
124+
status := &grpc_health_v1.HealthCheckResponse{
125+
Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
126+
}
127+
128+
info, err := h.regClient.GetInfo(ctx, &registerapi.InfoRequest{})
129+
if err != nil {
130+
klog.ErrorS(err, "failed to call GetInfo")
131+
return status, nil
132+
}
133+
klog.V(6).Infof("Successfully invoked GetInfo: %v", info)
134+
135+
_, err = h.draClient.NodePrepareResources(ctx, &drapb.NodePrepareResourcesRequest{})
136+
if err != nil {
137+
klog.ErrorS(err, "failed to call NodePrepareResources")
138+
return status, nil
139+
}
140+
klog.V(6).Info("Successfully invoked NodePrepareResources")
141+
142+
status.Status = grpc_health_v1.HealthCheckResponse_SERVING
143+
return status, nil
144+
}

cmd/compute-domain-kubelet-plugin/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type Flags struct {
5151
nvidiaCDIHookPath string
5252
kubeletRegistrarDirectoryPath string
5353
kubeletPluginsDirectoryPath string
54+
healthcheckPort int
5455
}
5556

5657
type Config struct {
@@ -131,6 +132,13 @@ func newApp() *cli.App {
131132
Destination: &flags.kubeletPluginsDirectoryPath,
132133
EnvVars: []string{"KUBELET_PLUGINS_DIRECTORY_PATH"},
133134
},
135+
&cli.IntFlag{
136+
Name: "healthcheck-port",
137+
Usage: "Port to start a gRPC healthcheck service. When positive, a literal port number. When zero, a random port is allocated. When negative, the healthcheck service is disabled.",
138+
Value: -1,
139+
Destination: &flags.healthcheckPort,
140+
EnvVars: []string{"HEALTHCHECK_PORT"},
141+
},
134142
}
135143
cliFlags = append(cliFlags, flags.kubeClientConfig.Flags()...)
136144
cliFlags = append(cliFlags, flags.featureGateConfig.Flags()...)

cmd/gpu-kubelet-plugin/driver.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type driver struct {
4343
pluginhelper *kubeletplugin.Helper
4444
state *DeviceState
4545
pulock *flock.Flock
46+
healthcheck *healthcheck
4647
}
4748

4849
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
@@ -86,6 +87,12 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
8687
},
8788
}
8889

90+
healthcheck, err := startHealthcheck(ctx, config)
91+
if err != nil {
92+
return nil, fmt.Errorf("start healthcheck: %w", err)
93+
}
94+
driver.healthcheck = healthcheck
95+
8996
if err := driver.pluginhelper.PublishResources(ctx, resources); err != nil {
9097
return nil, err
9198
}
@@ -97,6 +104,11 @@ func (d *driver) Shutdown() error {
97104
if d == nil {
98105
return nil
99106
}
107+
108+
if d.healthcheck != nil {
109+
d.healthcheck.Stop()
110+
}
111+
100112
d.pluginhelper.Stop()
101113
return nil
102114
}

cmd/gpu-kubelet-plugin/health.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright 2025 The Kubernetes Authors.
3+
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package main
19+
20+
import (
21+
"context"
22+
"fmt"
23+
"net"
24+
"net/url"
25+
"path"
26+
"strconv"
27+
"sync"
28+
29+
"google.golang.org/grpc"
30+
"google.golang.org/grpc/codes"
31+
"google.golang.org/grpc/credentials/insecure"
32+
"google.golang.org/grpc/health/grpc_health_v1"
33+
"google.golang.org/grpc/status"
34+
"k8s.io/klog/v2"
35+
drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1"
36+
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
37+
)
38+
39+
type healthcheck struct {
40+
grpc_health_v1.UnimplementedHealthServer
41+
42+
server *grpc.Server
43+
wg sync.WaitGroup
44+
45+
regClient registerapi.RegistrationClient
46+
draClient drapb.DRAPluginClient
47+
}
48+
49+
func startHealthcheck(ctx context.Context, config *Config) (*healthcheck, error) {
50+
port := config.flags.healthcheckPort
51+
if port < 0 {
52+
return nil, nil
53+
}
54+
55+
addr := net.JoinHostPort("", strconv.Itoa(port))
56+
lis, err := net.Listen("tcp", addr)
57+
if err != nil {
58+
return nil, fmt.Errorf("failed to listen for healthcheck service at %s: %w", addr, err)
59+
}
60+
61+
regSockPath := (&url.URL{
62+
Scheme: "unix",
63+
// TODO: this needs to adapt when seamless upgrades
64+
// are enabled and the filename includes a uid.
65+
Path: path.Join(config.flags.kubeletRegistrarDirectoryPath, DriverName+"-reg.sock"),
66+
}).String()
67+
klog.V(6).Infof("connecting to registration socket path=%s", regSockPath)
68+
regConn, err := grpc.NewClient(
69+
regSockPath,
70+
grpc.WithTransportCredentials(insecure.NewCredentials()),
71+
)
72+
if err != nil {
73+
return nil, fmt.Errorf("connect to registration socket: %w", err)
74+
}
75+
76+
draSockPath := (&url.URL{
77+
Scheme: "unix",
78+
Path: path.Join(config.DriverPluginPath(), "dra.sock"),
79+
}).String()
80+
klog.V(6).Infof("connecting to DRA socket path=%s", draSockPath)
81+
draConn, err := grpc.NewClient(
82+
draSockPath,
83+
grpc.WithTransportCredentials(insecure.NewCredentials()),
84+
)
85+
if err != nil {
86+
return nil, fmt.Errorf("connect to DRA socket: %w", err)
87+
}
88+
89+
server := grpc.NewServer()
90+
healthcheck := &healthcheck{
91+
server: server,
92+
regClient: registerapi.NewRegistrationClient(regConn),
93+
draClient: drapb.NewDRAPluginClient(draConn),
94+
}
95+
grpc_health_v1.RegisterHealthServer(server, healthcheck)
96+
97+
healthcheck.wg.Add(1)
98+
go func() {
99+
defer healthcheck.wg.Done()
100+
klog.Infof("starting healthcheck service at %s", lis.Addr().String())
101+
if err := server.Serve(lis); err != nil {
102+
klog.Errorf("failed to serve healthcheck service on %s: %v", addr, err)
103+
}
104+
}()
105+
106+
return healthcheck, nil
107+
}
108+
109+
func (h *healthcheck) Stop() {
110+
if h.server != nil {
111+
klog.Info("Stopping healthcheck service")
112+
h.server.GracefulStop()
113+
}
114+
h.wg.Wait()
115+
}
116+
117+
// Check implements [grpc_health_v1.HealthServer].
118+
func (h *healthcheck) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
119+
knownServices := map[string]struct{}{"": {}, "liveness": {}}
120+
if _, known := knownServices[req.GetService()]; !known {
121+
return nil, status.Error(codes.NotFound, "unknown service")
122+
}
123+
124+
status := &grpc_health_v1.HealthCheckResponse{
125+
Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
126+
}
127+
128+
info, err := h.regClient.GetInfo(ctx, &registerapi.InfoRequest{})
129+
if err != nil {
130+
klog.ErrorS(err, "failed to call GetInfo")
131+
return status, nil
132+
}
133+
klog.V(6).Infof("Successfully invoked GetInfo: %v", info)
134+
135+
_, err = h.draClient.NodePrepareResources(ctx, &drapb.NodePrepareResourcesRequest{})
136+
if err != nil {
137+
klog.ErrorS(err, "failed to call NodePrepareResources")
138+
return status, nil
139+
}
140+
klog.V(6).Info("Successfully invoked NodePrepareResources")
141+
142+
status.Status = grpc_health_v1.HealthCheckResponse_SERVING
143+
return status, nil
144+
}

0 commit comments

Comments
 (0)