Skip to content

Commit 06ea908

Browse files
authored
Merge pull request #394 from klueska/migrate-checkpoint
Migrate checkpoint format from the format in 25.3.0-rc.2 to the latest
2 parents 049413b + b1c8038 commit 06ea908

File tree

4 files changed

+192
-2
lines changed

4 files changed

+192
-2
lines changed

cmd/compute-domain-kubelet-plugin/checkpoint.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"encoding/json"
5+
"fmt"
56

67
resourceapi "k8s.io/api/resource/v1beta1"
78
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
@@ -45,7 +46,31 @@ func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
4546
}
4647

4748
func (cp *Checkpoint) UnmarshalCheckpoint(data []byte) error {
48-
return json.Unmarshal(data, cp)
49+
// If we can unmarshal to a Checkpoint directly we are done
50+
if err := json.Unmarshal(data, cp); err == nil {
51+
return nil
52+
}
53+
54+
// Otherwise attempt to unmarshal to a Checkpoint2503RC2
55+
// TODO: Remove this one release cycle following the v25.3.0 release
56+
var cp2503rc2 Checkpoint2503RC2
57+
if err := cp2503rc2.UnmarshalCheckpoint(data); err != nil {
58+
return fmt.Errorf("unable to unmarshal as %T or %T", *cp, cp2503rc2)
59+
}
60+
61+
// If that succeeded, verify its checksum...
62+
if err := cp2503rc2.VerifyChecksum(); err != nil {
63+
return fmt.Errorf("error verifying checksum for %T: %w", cp2503rc2, err)
64+
}
65+
66+
// And then convert it to the v1 checkpoint format and try again
67+
cpconv := cp2503rc2.ToV1()
68+
data, err := cpconv.MarshalCheckpoint()
69+
if err != nil {
70+
return fmt.Errorf("error remarshalling %T after conversion from %T: %w", cpconv, cp2503rc2, err)
71+
}
72+
73+
return cp.UnmarshalCheckpoint(data)
4974
}
5075

5176
func (cp *Checkpoint) VerifyChecksum() error {
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
6+
"k8s.io/dynamic-resource-allocation/kubeletplugin"
7+
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1beta1"
8+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
9+
)
10+
11+
// Legacy structs and methods to help with conversion as necessary.
12+
type PreparedDeviceList2503RC2 []PreparedDevice2503RC2
13+
type PreparedDevices2503RC2 []*PreparedDeviceGroup2503RC2
14+
type PreparedClaims2503RC2 map[string]PreparedDevices2503RC2
15+
16+
type PreparedDevice2503RC2 struct {
17+
Channel *PreparedComputeDomainChannel2503RC2 `json:"channel"`
18+
Daemon *PreparedComputeDomainDaemon2503RC2 `json:"daemon"`
19+
}
20+
21+
type PreparedComputeDomainChannel2503RC2 struct {
22+
Info *ComputeDomainChannelInfo `json:"info"`
23+
Device *drapbv1.Device `json:"device"`
24+
}
25+
26+
type PreparedComputeDomainDaemon2503RC2 struct {
27+
Info *ComputeDomainDaemonInfo `json:"info"`
28+
Device *drapbv1.Device `json:"device"`
29+
}
30+
31+
type PreparedDeviceGroup2503RC2 struct {
32+
Devices PreparedDeviceList2503RC2 `json:"devices"`
33+
ConfigState DeviceConfigState `json:"configState"`
34+
}
35+
36+
type Checkpoint2503RC2 struct {
37+
Checksum checksum.Checksum `json:"checksum"`
38+
V1 *Checkpoint2503RC2V1 `json:"v1,omitempty"`
39+
}
40+
41+
type Checkpoint2503RC2V1 struct {
42+
PreparedClaims PreparedClaims2503RC2 `json:"preparedClaims,omitempty"`
43+
}
44+
45+
func (cp *Checkpoint2503RC2) MarshalCheckpoint() ([]byte, error) {
46+
cp.Checksum = 0
47+
out, err := json.Marshal(*cp)
48+
if err != nil {
49+
return nil, err
50+
}
51+
cp.Checksum = checksum.New(out)
52+
return json.Marshal(*cp)
53+
}
54+
55+
func (cp *Checkpoint2503RC2) UnmarshalCheckpoint(data []byte) error {
56+
return json.Unmarshal(data, cp)
57+
}
58+
59+
func (cp *Checkpoint2503RC2) VerifyChecksum() error {
60+
ck := cp.Checksum
61+
cp.Checksum = 0
62+
defer func() {
63+
cp.Checksum = ck
64+
}()
65+
out, err := json.Marshal(*cp)
66+
if err != nil {
67+
return err
68+
}
69+
return ck.Verify(out)
70+
}
71+
72+
// ToV1 converts a PreparedDevice2503RC2 to a PreparedDevice.
73+
func (d *PreparedDevice2503RC2) ToV1() PreparedDevice {
74+
device := PreparedDevice{}
75+
if d.Channel != nil {
76+
device.Channel = d.Channel.ToV1()
77+
}
78+
if d.Daemon != nil {
79+
device.Daemon = d.Daemon.ToV1()
80+
}
81+
return device
82+
}
83+
84+
// ToV1 converts a PreparedComputeDomainChannel2503RC2 to a PreparedComputeDomainChannel.
85+
func (c *PreparedComputeDomainChannel2503RC2) ToV1() *PreparedComputeDomainChannel {
86+
channel := &PreparedComputeDomainChannel{}
87+
if c.Info != nil {
88+
channel.Info = c.Info
89+
}
90+
if c.Device != nil {
91+
channel.Device = &kubeletplugin.Device{
92+
Requests: c.Device.RequestNames,
93+
PoolName: c.Device.PoolName,
94+
DeviceName: c.Device.DeviceName,
95+
CDIDeviceIDs: c.Device.CDIDeviceIDs,
96+
}
97+
}
98+
return channel
99+
}
100+
101+
// ToV1 converts a PreparedComputeDomainDaemon2503RC2 to a PreparedComputeDomainDaemon.
102+
func (d *PreparedComputeDomainDaemon2503RC2) ToV1() *PreparedComputeDomainDaemon {
103+
daemon := &PreparedComputeDomainDaemon{}
104+
if d.Info != nil {
105+
daemon.Info = d.Info
106+
}
107+
if d.Device != nil {
108+
daemon.Device = &kubeletplugin.Device{
109+
Requests: d.Device.RequestNames,
110+
PoolName: d.Device.PoolName,
111+
DeviceName: d.Device.DeviceName,
112+
CDIDeviceIDs: d.Device.CDIDeviceIDs,
113+
}
114+
}
115+
return daemon
116+
}
117+
118+
// ToV1 converts a PreparedDeviceGroup2503RC2 to a PreparedDeviceGroup.
119+
func (g *PreparedDeviceGroup2503RC2) ToV1() *PreparedDeviceGroup {
120+
group := &PreparedDeviceGroup{
121+
Devices: make(PreparedDeviceList, 0, len(g.Devices)),
122+
ConfigState: g.ConfigState,
123+
}
124+
for _, d := range g.Devices {
125+
group.Devices = append(group.Devices, d.ToV1())
126+
}
127+
return group
128+
}
129+
130+
// ToV1 converts a Checkpoint2503RC2 to a Checkpoint.
131+
func (cp *Checkpoint2503RC2) ToV1() *Checkpoint {
132+
cpv1 := newCheckpoint()
133+
for k, v := range cp.V1.PreparedClaims {
134+
pds := make(PreparedDevices, 0, len(v))
135+
for _, pd := range v {
136+
pds = append(pds, pd.ToV1())
137+
}
138+
cpv1.V1.PreparedClaims[k] = PreparedClaim{
139+
PreparedDevices: pds,
140+
}
141+
}
142+
return cpv1
143+
}

cmd/compute-domain-kubelet-plugin/device_state.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"sync"
2424

2525
resourceapi "k8s.io/api/resource/v1beta1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/dynamic-resource-allocation/kubeletplugin"
2829
"k8s.io/klog/v2"
@@ -194,6 +195,27 @@ func (s *DeviceState) Unprepare(ctx context.Context, claimRef kubeletplugin.Name
194195
return nil
195196
}
196197

198+
// If pc.Status.Allocation is 'nil', attempt to pull the status from the
199+
// API server. This should only ever happen if we have unmarshaled from a
200+
// legacy checkpoint format that did not include the Status field.
201+
//
202+
// TODO: Remove this one release cycle following the v25.3.0 release
203+
if pc.Status.Allocation == nil {
204+
klog.Infof("PreparedClaim status was unset in Checkpoint for ResourceClaim %s: attempting to pull it from API server", claimRef.String())
205+
claim, err := s.config.clientsets.Core.ResourceV1beta1().ResourceClaims(claimRef.Namespace).Get(
206+
ctx,
207+
claimRef.Name,
208+
metav1.GetOptions{})
209+
210+
if err != nil {
211+
return permanentError{fmt.Errorf("failed to fetch ResourceClaim %s: %w", claimRef.String(), err)}
212+
}
213+
if claim.Status.Allocation == nil {
214+
return permanentError{fmt.Errorf("no allocation set in ResourceClaim %s", claim.String())}
215+
}
216+
pc.Status = claim.Status
217+
}
218+
197219
if err := s.unprepareDevices(ctx, &pc.Status); err != nil {
198220
return fmt.Errorf("unprepare devices failed: %w", err)
199221
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
k8s.io/component-base v0.33.0
2323
k8s.io/dynamic-resource-allocation v0.33.0
2424
k8s.io/klog/v2 v2.130.1
25+
k8s.io/kubelet v0.33.0
2526
k8s.io/kubernetes v1.33.0
2627
k8s.io/mount-utils v0.33.0
2728
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
@@ -84,7 +85,6 @@ require (
8485
gopkg.in/inf.v0 v0.9.1 // indirect
8586
gopkg.in/yaml.v3 v3.0.1 // indirect
8687
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect
87-
k8s.io/kubelet v0.33.0 // indirect
8888
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
8989
sigs.k8s.io/randfill v1.0.0 // indirect
9090
sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect

0 commit comments

Comments
 (0)