Skip to content

Commit 97b02f0

Browse files
authored
Merge pull request #332 from jgehrcke/jp/bump-to-dra-lib-033
Update DRA lib to v0.33.0, and k8s lib to v1.33.0
2 parents d17cfcf + b8de9db commit 97b02f0

File tree

813 files changed

+61403
-24003
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

813 files changed

+61403
-24003
lines changed

cmd/compute-domain-kubelet-plugin/driver.go

Lines changed: 77 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ import (
2323
"sync"
2424
"time"
2525

26+
resourceapi "k8s.io/api/resource/v1beta1"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/types"
2729
coreclientset "k8s.io/client-go/kubernetes"
2830
"k8s.io/dynamic-resource-allocation/kubeletplugin"
31+
"k8s.io/dynamic-resource-allocation/resourceslice"
2932
"k8s.io/klog/v2"
30-
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1beta1"
3133

3234
"github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue"
3335
)
@@ -38,63 +40,65 @@ const ErrorRetryMaxTimeout = 45 * time.Second
3840

3941
// permanentError defines an error indicating that it is permanent.
4042
// By default, every error will be retried up to ErrorRetryMaxTimeout.
41-
// Errors marked as permament will not be retried.
43+
// Errors marked as permanent will not be retried.
4244
type permanentError struct{ error }
4345

4446
func isPermanentError(err error) bool {
4547
return errors.As(err, &permanentError{})
4648
}
4749

48-
var _ drapbv1.DRAPluginServer = &driver{}
49-
5050
type driver struct {
5151
sync.Mutex
52-
client coreclientset.Interface
53-
plugin kubeletplugin.DRAPlugin
54-
state *DeviceState
52+
client coreclientset.Interface
53+
pluginhelper *kubeletplugin.Helper
54+
state *DeviceState
5555
}
5656

5757
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
58-
driver := &driver{
59-
client: config.clientsets.Core,
60-
}
61-
6258
state, err := NewDeviceState(ctx, config)
6359
if err != nil {
6460
return nil, err
6561
}
66-
driver.state = state
6762

68-
plugin, err := kubeletplugin.Start(
63+
driver := &driver{
64+
client: config.clientsets.Core,
65+
state: state,
66+
}
67+
68+
helper, err := kubeletplugin.Start(
6969
ctx,
70-
[]any{driver},
70+
driver,
7171
kubeletplugin.KubeClient(driver.client),
7272
kubeletplugin.NodeName(config.flags.nodeName),
7373
kubeletplugin.DriverName(DriverName),
74-
kubeletplugin.RegistrarSocketPath(PluginRegistrationPath),
75-
kubeletplugin.PluginSocketPath(DriverPluginSocketPath),
76-
kubeletplugin.KubeletPluginSocketPath(DriverPluginSocketPath))
74+
)
7775
if err != nil {
7876
return nil, err
7977
}
80-
driver.plugin = plugin
78+
driver.pluginhelper = helper
8179

8280
// Enumerate the set of ComputeDomain daemon devices and publish them
83-
var resources kubeletplugin.Resources
81+
var resourceSlice resourceslice.Slice
8482
for _, device := range state.allocatable {
8583
// Explicitly exclude ComputeDomain channels from being advertised here. They
8684
// are instead advertised in as a network resource from the control plane.
8785
if device.Type() == ComputeDomainChannelType && device.Channel.ID != 0 {
8886
continue
8987
}
90-
resources.Devices = append(resources.Devices, device.GetDevice())
88+
resourceSlice.Devices = append(resourceSlice.Devices, device.GetDevice())
89+
}
90+
91+
resources := resourceslice.DriverResources{
92+
Pools: map[string]resourceslice.Pool{
93+
config.flags.nodeName: {Slices: []resourceslice.Slice{resourceSlice}},
94+
},
9195
}
9296

9397
if err := state.computeDomainManager.Start(ctx); err != nil {
9498
return nil, err
9599
}
96100

97-
if err := plugin.PublishResources(ctx, resources); err != nil {
101+
if err := driver.pluginhelper.PublishResources(ctx, resources); err != nil {
98102
return nil, err
99103
}
100104

@@ -108,28 +112,28 @@ func (d *driver) Shutdown() error {
108112
if err := d.state.computeDomainManager.Stop(); err != nil {
109113
return fmt.Errorf("error stopping ComputeDomainManager: %w", err)
110114
}
111-
d.plugin.Stop()
115+
d.pluginhelper.Stop()
112116
return nil
113117
}
114118

115-
func (d *driver) NodePrepareResources(ctx context.Context, req *drapbv1.NodePrepareResourcesRequest) (*drapbv1.NodePrepareResourcesResponse, error) {
116-
klog.Infof("NodePrepareResource is called: number of claims: %d", len(req.Claims))
117-
preparedResources := &drapbv1.NodePrepareResourcesResponse{Claims: map[string]*drapbv1.NodePrepareResourceResponse{}}
119+
func (d *driver) PrepareResourceClaims(ctx context.Context, claims []*resourceapi.ResourceClaim) (map[types.UID]kubeletplugin.PrepareResult, error) {
120+
klog.Infof("PrepareResourceClaims called with %d claim(s)", len(claims))
118121

119122
var wg sync.WaitGroup
120123
ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout)
121124
workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())
125+
results := make(map[types.UID]kubeletplugin.PrepareResult)
122126

123-
for _, claim := range req.Claims {
127+
for _, claim := range claims {
124128
wg.Add(1)
125129
workQueue.EnqueueRaw(claim, func(ctx context.Context, obj any) error {
126-
done, prepared := d.nodePrepareResource(ctx, claim)
130+
done, res := d.nodePrepareResource(ctx, claim)
127131
if done {
128-
preparedResources.Claims[claim.UID] = prepared
132+
results[claim.UID] = res
129133
wg.Done()
130134
return nil
131135
}
132-
return fmt.Errorf("%s", prepared.Error)
136+
return fmt.Errorf("%w", res.Err)
133137
})
134138
}
135139

@@ -139,28 +143,27 @@ func (d *driver) NodePrepareResources(ctx context.Context, req *drapbv1.NodePrep
139143
}()
140144

141145
workQueue.Run(ctx)
142-
143-
return preparedResources, nil
146+
return results, nil
144147
}
145148

146-
func (d *driver) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUnprepareResourcesRequest) (*drapbv1.NodeUnprepareResourcesResponse, error) {
147-
klog.Infof("NodeUnprepareResource is called: number of claims: %d", len(req.Claims))
148-
unpreparedResources := &drapbv1.NodeUnprepareResourcesResponse{Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{}}
149+
func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubeletplugin.NamespacedObject) (map[types.UID]error, error) {
150+
klog.Infof("UnprepareResourceClaims called with %d claim(s)", len(claimRefs))
149151

150152
var wg sync.WaitGroup
151153
ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout)
152154
workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())
155+
results := make(map[types.UID]error)
153156

154-
for _, claim := range req.Claims {
157+
for _, claim := range claimRefs {
155158
wg.Add(1)
156159
workQueue.EnqueueRaw(claim, func(ctx context.Context, obj any) error {
157-
done, unprepared := d.nodeUnprepareResource(ctx, claim)
160+
done, err := d.nodeUnprepareResource(ctx, claim)
158161
if done {
159-
unpreparedResources.Claims[claim.UID] = unprepared
162+
results[claim.UID] = err
160163
wg.Done()
161164
return nil
162165
}
163-
return fmt.Errorf("%s", unprepared.Error)
166+
return fmt.Errorf("%w", err)
164167
})
165168
}
166169

@@ -171,73 +174,70 @@ func (d *driver) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUn
171174

172175
workQueue.Run(ctx)
173176

174-
return unpreparedResources, nil
177+
return results, nil
175178
}
176179

177-
func (d *driver) nodePrepareResource(ctx context.Context, claim *drapbv1.Claim) (bool, *drapbv1.NodePrepareResourceResponse) {
180+
func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) (bool, kubeletplugin.PrepareResult) {
178181
d.Lock()
179182
defer d.Unlock()
180183

181-
resourceClaim, err := d.client.ResourceV1beta1().ResourceClaims(claim.Namespace).Get(
182-
ctx,
183-
claim.Name,
184-
metav1.GetOptions{})
185-
if err != nil {
186-
ret := &drapbv1.NodePrepareResourceResponse{
187-
Error: fmt.Sprintf("failed to fetch ResourceClaim %s in namespace %s", claim.Name, claim.Namespace),
184+
if claim.Status.Allocation == nil {
185+
res := kubeletplugin.PrepareResult{
186+
Err: fmt.Errorf("no allocation set in ResourceClaim %s in namespace %s", claim.Name, claim.Namespace),
188187
}
189-
return isPermanentError(err), ret
188+
return true, res
190189
}
191190

192-
if resourceClaim.Status.Allocation == nil {
193-
ret := &drapbv1.NodePrepareResourceResponse{
194-
Error: fmt.Sprintf("no allocation set in ResourceClaim %s in namespace %s", claim.Name, claim.Namespace),
191+
devs, err := d.state.Prepare(ctx, claim)
192+
if err != nil {
193+
res := kubeletplugin.PrepareResult{
194+
Err: fmt.Errorf("error preparing devices for claim %v: %w", claim.UID, err),
195195
}
196-
return true, ret
196+
return isPermanentError(err), res
197197
}
198198

199-
prepared, err := d.state.Prepare(ctx, resourceClaim)
200-
if err != nil {
201-
ret := &drapbv1.NodePrepareResourceResponse{
202-
Error: fmt.Sprintf("error preparing devices for claim %v: %v", claim.UID, err),
199+
// TODO: change return type of state.Prepare()
200+
var prepDevs []kubeletplugin.Device
201+
for _, d := range devs {
202+
device := kubeletplugin.Device{
203+
Requests: d.RequestNames,
204+
PoolName: d.PoolName,
205+
DeviceName: d.DeviceName,
206+
CDIDeviceIDs: d.CDIDeviceIDs,
203207
}
204-
return isPermanentError(err), ret
208+
prepDevs = append(prepDevs, device)
205209
}
206210

207-
klog.Infof("Returning newly prepared devices for claim '%v': %v", claim.UID, prepared)
208-
return true, &drapbv1.NodePrepareResourceResponse{Devices: prepared}
211+
klog.Infof("Returning newly prepared devices for claim '%v': %v", claim.UID, prepDevs)
212+
return true, kubeletplugin.PrepareResult{Devices: prepDevs}
209213
}
210214

211-
func (d *driver) nodeUnprepareResource(ctx context.Context, claim *drapbv1.Claim) (bool, *drapbv1.NodeUnprepareResourceResponse) {
215+
func (d *driver) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) (bool, error) {
212216
d.Lock()
213217
defer d.Unlock()
214218

215-
resourceClaim, err := d.client.ResourceV1beta1().ResourceClaims(claim.Namespace).Get(
219+
// Fetching the resource claim should not be needed (and not be done) in the
220+
// unprepare code path. Any state required during unprepare can be stored
221+
// via checkpointing.
222+
claim, err := d.client.ResourceV1beta1().ResourceClaims(claimRef.Namespace).Get(
216223
ctx,
217-
claim.Name,
224+
claimRef.Name,
218225
metav1.GetOptions{})
226+
219227
if err != nil {
220-
ret := &drapbv1.NodeUnprepareResourceResponse{
221-
Error: fmt.Sprintf("failed to fetch ResourceClaim %s in namespace %s", claim.Name, claim.Namespace),
222-
}
223-
return isPermanentError(err), ret
228+
return isPermanentError(err), fmt.Errorf("failed to fetch ResourceClaim %s in namespace %s: %w", claimRef.Name, claimRef.Namespace, err)
224229
}
225230

226-
if resourceClaim.Status.Allocation == nil {
227-
ret := &drapbv1.NodeUnprepareResourceResponse{
228-
Error: fmt.Sprintf("no allocation set in ResourceClaim %s in namespace %s", claim.Name, claim.Namespace),
229-
}
230-
return true, ret
231+
if claim.Status.Allocation == nil {
232+
return true, fmt.Errorf("no allocation set in ResourceClaim %s in namespace %s", claim.Name, claim.Namespace)
231233
}
232234

233-
if err := d.state.Unprepare(ctx, resourceClaim); err != nil {
234-
ret := &drapbv1.NodeUnprepareResourceResponse{
235-
Error: fmt.Sprintf("error unpreparing devices for claim %v: %v", claim.UID, err),
236-
}
237-
return isPermanentError(err), ret
235+
if err := d.state.Unprepare(ctx, claim); err != nil {
236+
return isPermanentError(err), fmt.Errorf("error unpreparing devices for claim '%v': %w", claim.UID, err)
238237
}
239238

240-
return true, &drapbv1.NodeUnprepareResourceResponse{}
239+
klog.Infof("unprepared devices for claim '%v'", claim.UID)
240+
return true, nil
241241
}
242242

243243
// TODO: implement loop to remove CDI files from the CDI path for claimUIDs

cmd/compute-domain-kubelet-plugin/main.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,8 @@ import (
3232
)
3333

3434
const (
35-
DriverName = "compute-domain.nvidia.com"
36-
37-
PluginRegistrationPath = "/var/lib/kubelet/plugins_registry/" + DriverName + ".sock"
35+
DriverName = "compute-domain.nvidia.com"
3836
DriverPluginPath = "/var/lib/kubelet/plugins/" + DriverName
39-
DriverPluginSocketPath = DriverPluginPath + "/plugin.sock"
4037
DriverPluginCheckpointFile = "checkpoint.json"
4138
)
4239

0 commit comments

Comments
 (0)