@@ -18,6 +18,7 @@ package main
1818
1919import (
2020 "context"
21+ "errors"
2122 "fmt"
2223 "sync"
2324 "time"
@@ -31,6 +32,15 @@ import (
3132 "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue"
3233)
3334
35+ // ErrorRetryMaxTimeout is the max amount of time we retry when errors are
36+ // returned before giving up.
37+ const ErrorRetryMaxTimeout = 45 * time .Second
38+
39+ // permanentError defines an error indicating that it is permanent.
40+ // By default, every error will be retried up to ErrorRetryMaxTimeout.
41+ // Errors marked as permament will not be retried.
42+ type permanentError struct { error }
43+
3444var _ drapbv1.DRAPluginServer = & driver {}
3545
3646type driver struct {
@@ -98,19 +108,19 @@ func (d *driver) NodePrepareResources(ctx context.Context, req *drapbv1.NodePrep
98108 preparedResources := & drapbv1.NodePrepareResourcesResponse {Claims : map [string ]* drapbv1.NodePrepareResourceResponse {}}
99109
100110 var wg sync.WaitGroup
101- ctx , cancel := context .WithTimeout (ctx , 45 * time . Second )
111+ ctx , cancel := context .WithTimeout (ctx , ErrorRetryMaxTimeout )
102112 workQueue := workqueue .New (workqueue .DefaultControllerRateLimiter ())
103113
104114 for _ , claim := range req .Claims {
105115 wg .Add (1 )
106116 workQueue .EnqueueRaw (claim , func (ctx context.Context , obj any ) error {
107- prepared := d .nodePrepareResource (ctx , claim )
108- if prepared .Error != "" {
109- return fmt .Errorf ("%s" , prepared .Error )
117+ done , prepared := d .nodePrepareResource (ctx , claim )
118+ if done {
119+ preparedResources .Claims [claim .UID ] = prepared
120+ wg .Done ()
121+ return nil
110122 }
111- preparedResources .Claims [claim .UID ] = prepared
112- wg .Done ()
113- return nil
123+ return fmt .Errorf ("%s" , prepared .Error )
114124 })
115125 }
116126
@@ -129,19 +139,19 @@ func (d *driver) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUn
129139 unpreparedResources := & drapbv1.NodeUnprepareResourcesResponse {Claims : map [string ]* drapbv1.NodeUnprepareResourceResponse {}}
130140
131141 var wg sync.WaitGroup
132- ctx , cancel := context .WithTimeout (ctx , 45 * time . Second )
142+ ctx , cancel := context .WithTimeout (ctx , ErrorRetryMaxTimeout )
133143 workQueue := workqueue .New (workqueue .DefaultControllerRateLimiter ())
134144
135145 for _ , claim := range req .Claims {
136146 wg .Add (1 )
137147 workQueue .EnqueueRaw (claim , func (ctx context.Context , obj any ) error {
138- unprepared := d .nodeUnprepareResource (ctx , claim )
139- if unprepared .Error != "" {
140- return fmt .Errorf ("%s" , unprepared .Error )
148+ done , unprepared := d .nodeUnprepareResource (ctx , claim )
149+ if done {
150+ unpreparedResources .Claims [claim .UID ] = unprepared
151+ wg .Done ()
152+ return nil
141153 }
142- unpreparedResources .Claims [claim .UID ] = unprepared
143- wg .Done ()
144- return nil
154+ return fmt .Errorf ("%s" , unprepared .Error )
145155 })
146156 }
147157
@@ -155,7 +165,7 @@ func (d *driver) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUn
155165 return unpreparedResources , nil
156166}
157167
158- func (d * driver ) nodePrepareResource (ctx context.Context , claim * drapbv1.Claim ) * drapbv1.NodePrepareResourceResponse {
168+ func (d * driver ) nodePrepareResource (ctx context.Context , claim * drapbv1.Claim ) ( bool , * drapbv1.NodePrepareResourceResponse ) {
159169 d .Lock ()
160170 defer d .Unlock ()
161171
@@ -164,33 +174,43 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *drapbv1.Claim)
164174 claim .Name ,
165175 metav1.GetOptions {})
166176 if err != nil {
167- return & drapbv1.NodePrepareResourceResponse {
177+ ret := & drapbv1.NodePrepareResourceResponse {
168178 Error : fmt .Sprintf ("failed to fetch ResourceClaim %s in namespace %s" , claim .Name , claim .Namespace ),
169179 }
180+ return isPermanentError (err ), ret
170181 }
171182
172183 prepared , err := d .state .Prepare (ctx , resourceClaim )
173184 if err != nil {
174- return & drapbv1.NodePrepareResourceResponse {
185+ ret := & drapbv1.NodePrepareResourceResponse {
175186 Error : fmt .Sprintf ("error preparing devices for claim %v: %v" , claim .UID , err ),
176187 }
188+ return isPermanentError (err ), ret
177189 }
178190
179191 klog .Infof ("Returning newly prepared devices for claim '%v': %v" , claim .UID , prepared )
180- return & drapbv1.NodePrepareResourceResponse {Devices : prepared }
192+ return true , & drapbv1.NodePrepareResourceResponse {Devices : prepared }
181193}
182194
183- func (d * driver ) nodeUnprepareResource (ctx context.Context , claim * drapbv1.Claim ) * drapbv1.NodeUnprepareResourceResponse {
195+ func (d * driver ) nodeUnprepareResource (ctx context.Context , claim * drapbv1.Claim ) ( bool , * drapbv1.NodeUnprepareResourceResponse ) {
184196 d .Lock ()
185197 defer d .Unlock ()
186198
187199 if err := d .state .Unprepare (ctx , claim .UID ); err != nil {
188- return & drapbv1.NodeUnprepareResourceResponse {
200+ ret := & drapbv1.NodeUnprepareResourceResponse {
189201 Error : fmt .Sprintf ("error unpreparing devices for claim %v: %v" , claim .UID , err ),
190202 }
203+ return isPermanentError (err ), ret
191204 }
192205
193- return & drapbv1.NodeUnprepareResourceResponse {}
206+ return true , & drapbv1.NodeUnprepareResourceResponse {}
207+ }
208+
209+ func isPermanentError (err error ) bool {
210+ if errors .As (err , & permanentError {}) {
211+ return true
212+ }
213+ return false
194214}
195215
196216// TODO: implement loop to remove CDI files from the CDI path for claimUIDs
0 commit comments