Skip to content

Commit 6589feb

Browse files
author
Jie Chen
authored
add prefix status in pool and different retry time when prefix not avaialble (#268)
1 parent 5782342 commit 6589feb

File tree

9 files changed

+99
-40
lines changed

9 files changed

+99
-40
lines changed

mocks/amazon-vcp-resource-controller-k8s/pkg/pool/mock_pool.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/handler/warm.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ import (
2929
)
3030

3131
const (
32-
RequeueAfterWhenWPEmpty = time.Millisecond * 600
33-
RequeueAfterWhenResourceCooling = time.Second * 20
34-
ReasonResourceAllocationFailed = "ResourceAllocationFailed"
35-
ReasonResourceAllocated = "ResourceAllocated"
32+
RequeueAfterWhenPrefixNotAvailable = time.Minute * 2
33+
RequeueAfterWhenWPEmpty = time.Millisecond * 600
34+
RequeueAfterWhenResourceCooling = time.Second * 20
35+
ReasonResourceAllocationFailed = "ResourceAllocationFailed"
36+
ReasonResourceAllocated = "ResourceAllocated"
3637
)
3738

3839
type warmResourceHandler struct {
@@ -100,6 +101,12 @@ func (w *warmResourceHandler) HandleCreate(_ int, pod *v1.Pod) (ctrl.Result, err
100101
return ctrl.Result{}, nil
101102
}
102103
return ctrl.Result{}, err
104+
case pool.ErrInsufficientCidrBlocks:
105+
log.V(1).Info("prefix is not available in subnet, will retry")
106+
w.APIWrapper.K8sAPI.BroadcastEvent(pod, ReasonResourceAllocationFailed,
107+
fmt.Sprintf("Warm pool for resource %s is currently empty because the specified subnet does not have enough "+
108+
"free cidr blocks, will retry in %s", w.resourceName, RequeueAfterWhenPrefixNotAvailable), v1.EventTypeWarning)
109+
return ctrl.Result{Requeue: true, RequeueAfter: RequeueAfterWhenPrefixNotAvailable}, nil
103110
default:
104111
return ctrl.Result{}, err
105112
}

pkg/pool/pool.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ var (
2929
ErrResourceAreBeingCooledDown = fmt.Errorf("cannot assign resource now, resources are being cooled down")
3030
ErrResourcesAreBeingCreated = fmt.Errorf("cannot assign resource now, resources are being created")
3131
ErrWarmPoolEmpty = fmt.Errorf("warm pool is empty")
32+
ErrInsufficientCidrBlocks = fmt.Errorf("InsufficientCidrBlocks: The specified subnet does not have enough free cidr blocks to satisfy the request")
3233
ErrResourceAlreadyAssigned = fmt.Errorf("resource is already assigned to the requestor")
3334
ErrResourceDoesntExist = fmt.Errorf("requested resource doesn't exist in used pool")
3435
ErrIncorrectResourceOwner = fmt.Errorf("resource doesn't belong to the requestor")
@@ -42,7 +43,7 @@ type Pool interface {
4243
AssignResource(requesterID string) (resourceID string, shouldReconcile bool, err error)
4344
FreeResource(requesterID string, resourceID string) (shouldReconcile bool, err error)
4445
GetAssignedResource(requesterID string) (resourceID string, ownsResource bool)
45-
UpdatePool(job *worker.WarmPoolJob, didSucceed bool) (shouldReconcile bool)
46+
UpdatePool(job *worker.WarmPoolJob, didSucceed bool, prefixAvailable bool) (shouldReconcile bool)
4647
ReSync(resources []string)
4748
ReconcilePool() *worker.WarmPoolJob
4849
ProcessCoolDownQueue() bool
@@ -77,6 +78,8 @@ type pool struct {
7778
reSyncRequired bool
7879
// isPDPool indicates whether the pool is for prefix IP provider or secondary IP provider
7980
isPDPool bool
81+
// prefixAvailable indicates whether subnet has any prefix available
82+
prefixAvailable bool
8083
}
8184

8285
// Resource represents a secondary IPv4 address or a prefix-deconstructed IPv4 address, uniquely identified by GroupID and ResourceID
@@ -245,9 +248,14 @@ func (p *pool) AssignResource(requesterID string) (resourceID string, shouldReco
245248
return "", false, ErrResourcesAreBeingCreated
246249
}
247250

248-
// Caller can retry in 600 ms [Average time to create and attach a new ENI] or less
249-
// Different from above check because here we want to perform reconciliation
250251
if len(p.warmResources) == 0 {
252+
// If prefix is not available in subnet, caller can retry in 2 min [Action required from user to change subnet]
253+
if p.isPDPool && !p.prefixAvailable {
254+
return "", false, ErrInsufficientCidrBlocks
255+
}
256+
257+
// Caller can retry in 600 ms [Average time to assign a new secondary IP or prefix] or less
258+
// Different from above check because here we want to perform reconciliation
251259
return "", true, ErrWarmPoolEmpty
252260
}
253261

@@ -309,7 +317,7 @@ func (p *pool) FreeResource(requesterID string, resourceID string) (shouldReconc
309317
}
310318

311319
// UpdatePool updates the warm pool with the result of the asynchronous job executed by the provider
312-
func (p *pool) UpdatePool(job *worker.WarmPoolJob, didSucceed bool) (shouldReconcile bool) {
320+
func (p *pool) UpdatePool(job *worker.WarmPoolJob, didSucceed bool, prefixAvailable bool) (shouldReconcile bool) {
313321
p.lock.Lock()
314322
defer p.lock.Unlock()
315323

@@ -322,6 +330,13 @@ func (p *pool) UpdatePool(job *worker.WarmPoolJob, didSucceed bool) (shouldRecon
322330
log.Error(fmt.Errorf("warm pool job failed: %v", job), "operation failed")
323331
}
324332

333+
if p.isPDPool {
334+
p.prefixAvailable = prefixAvailable
335+
if !p.prefixAvailable {
336+
log.Error(fmt.Errorf("warm pool job failed: %v", job), "prefix is not available in subnet")
337+
}
338+
}
339+
325340
if job.Resources != nil && len(job.Resources) > 0 {
326341
// Add the resources to the warm pool
327342
for _, resourceGroup := range job.Resources {

pkg/pool/pool_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func TestPool_UpdatePool_OperationCreate_Succeed(t *testing.T) {
221221
Operations: worker.OperationCreate,
222222
Resources: createdResources,
223223
ResourceCount: 2,
224-
}, true)
224+
}, true, false)
225225

226226
warmResources := make(map[string][]Resource)
227227
warmResources[res3] = []Resource{{GroupID: res3, ResourceID: res3}}
@@ -239,7 +239,7 @@ func TestPool_UpdatePool_OperationCreate_Failed(t *testing.T) {
239239
Operations: worker.OperationCreate,
240240
Resources: nil,
241241
ResourceCount: 2,
242-
}, false)
242+
}, false, false)
243243

244244
assert.True(t, shouldReconcile)
245245
assert.True(t, warmPool.reSyncRequired)
@@ -253,7 +253,7 @@ func TestPool_UpdatePool_OperationDelete_Succeed(t *testing.T) {
253253
Operations: worker.OperationDeleted,
254254
Resources: nil,
255255
ResourceCount: 1,
256-
}, true)
256+
}, true, false)
257257

258258
// Assert resources are not added back to the warm pool
259259
assert.Zero(t, len(warmPool.warmResources))
@@ -268,7 +268,7 @@ func TestPool_UpdatePool_OperationDelete_Failed(t *testing.T) {
268268
Operations: worker.OperationDeleted,
269269
Resources: failedResources,
270270
ResourceCount: 2,
271-
}, false)
271+
}, false, false)
272272

273273
failed := make(map[string][]Resource)
274274
failed[res3] = []Resource{{GroupID: res3, ResourceID: res3}}

pkg/provider/ip/provider.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ func (p *ipv4Provider) DeletePrivateIPv4AndUpdatePool(job *worker.WarmPoolJob) {
361361
// updatePoolAndReconcileIfRequired updates the resource pool and reconcile again and submit a new job if required
362362
func (p *ipv4Provider) updatePoolAndReconcileIfRequired(resourcePool pool.Pool, job *worker.WarmPoolJob, didSucceed bool) {
363363
// Update the pool to add the created/failed resource to the warm pool and decrement the pending count
364-
shouldReconcile := resourcePool.UpdatePool(job, didSucceed)
364+
shouldReconcile := resourcePool.UpdatePool(job, didSucceed, false)
365365

366366
if shouldReconcile {
367367
job := resourcePool.ReconcilePool()

pkg/provider/ip/provider_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func TestIpv4Provider_updatePoolAndReconcileIfRequired_NoFurtherReconcile(t *tes
130130

131131
job := &worker.WarmPoolJob{Operations: worker.OperationCreate}
132132

133-
mockPool.EXPECT().UpdatePool(job, true).Return(false)
133+
mockPool.EXPECT().UpdatePool(job, true, false).Return(false)
134134

135135
provider.updatePoolAndReconcileIfRequired(mockPool, job, true)
136136
}
@@ -147,7 +147,7 @@ func TestIpv4Provider_updatePoolAndReconcileIfRequired_ReconcileRequired(t *test
147147

148148
job := &worker.WarmPoolJob{Operations: worker.OperationCreate}
149149

150-
mockPool.EXPECT().UpdatePool(job, true).Return(true)
150+
mockPool.EXPECT().UpdatePool(job, true, false).Return(true)
151151
mockPool.EXPECT().ReconcilePool().Return(job)
152152
mockWorker.EXPECT().SubmitJob(job)
153153

@@ -179,7 +179,7 @@ func TestIpv4Provider_DeletePrivateIPv4AndUpdatePool(t *testing.T) {
179179
Resources: []string{},
180180
ResourceCount: 2,
181181
NodeName: nodeName,
182-
}, true).Return(false)
182+
}, true, false).Return(false)
183183

184184
ipv4Provider.DeletePrivateIPv4AndUpdatePool(deleteJob)
185185
}
@@ -210,7 +210,7 @@ func TestIpv4Provider_DeletePrivateIPv4AndUpdatePool_SomeResourceFail(t *testing
210210
Resources: failedResources,
211211
ResourceCount: 2,
212212
NodeName: nodeName,
213-
}, true).Return(false)
213+
}, true, false).Return(false)
214214

215215
ipv4Provider.DeletePrivateIPv4AndUpdatePool(&deleteJob)
216216
}
@@ -240,7 +240,7 @@ func TestIPv4Provider_CreatePrivateIPv4AndUpdatePool(t *testing.T) {
240240
Resources: createdResources,
241241
ResourceCount: 2,
242242
NodeName: nodeName,
243-
}, true).Return(false)
243+
}, true, false).Return(false)
244244

245245
ipv4Provider.CreatePrivateIPv4AndUpdatePool(createJob)
246246
}
@@ -270,7 +270,7 @@ func TestIPv4Provider_CreatePrivateIPv4AndUpdatePool_Fail(t *testing.T) {
270270
Resources: createdResources,
271271
ResourceCount: 2,
272272
NodeName: nodeName,
273-
}, false).Return(false)
273+
}, false, false).Return(false)
274274

275275
ipv4Provider.CreatePrivateIPv4AndUpdatePool(createJob)
276276
}

pkg/provider/prefix/provider.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,21 +290,34 @@ func (p *ipv4PrefixProvider) CreateIPv4PrefixAndUpdatePool(job *worker.WarmPoolJ
290290
p.log.Error(utils.ErrNotFound, utils.ErrMsgProviderAndPoolNotFound, "node name", job.NodeName)
291291
return
292292
}
293-
didSucceed := true
293+
294+
// For successful jobs or non-retryable errors, do not re-sync or reconcile the pool.
295+
notRetry := true
296+
// If subnet has sufficient cidr blocks, prefixAvailable is true, otherwise false.
297+
prefixAvailable := true
298+
294299
resources, err := instanceResource.eniManager.CreateIPV4Resource(job.ResourceCount, config.ResourceTypeIPv4Prefix, p.apiWrapper.EC2API,
295300
p.log)
296301

297302
if err != nil {
298303
p.log.Error(err, "failed to create all/some of the IPv4 prefixes", "created resources", resources)
299-
didSucceed = false
304+
305+
// For retryable errors, set notRetry as false to re-sync and reconcile the pool.
306+
if utils.ShouldRetryOnError(err) {
307+
notRetry = false
308+
}
309+
300310
//TODO: This adds a dependency on EC2 API error. Refactor later.
301311
if strings.HasPrefix(err.Error(), utils.InsufficientCidrBlocksReason) {
312+
// Prefix not available in the subnet, set status and pass it to pool. Note this is a non-retryable error.
313+
prefixAvailable = false
314+
// Send node event to inform user of insufficient CIDR blocks error
302315
utils.SendNodeEvent(p.apiWrapper.K8sAPI, job.NodeName, utils.InsufficientCidrBlocksReason,
303316
utils.ErrInsufficientCidrBlocks.Error(), v1.EventTypeWarning, p.log)
304317
}
305318
}
306319
job.Resources = resources
307-
p.updatePoolAndReconcileIfRequired(instanceResource.resourcePool, job, didSucceed)
320+
p.updatePoolAndReconcileIfRequired(instanceResource.resourcePool, job, notRetry, prefixAvailable)
308321
}
309322

310323
// DeleteIPv4PrefixAndUpdatePool executes the Delete IPv4 Prefix workflow for the list of prefixes provided in the warm pool job
@@ -324,7 +337,7 @@ func (p *ipv4PrefixProvider) DeleteIPv4PrefixAndUpdatePool(job *worker.WarmPoolJ
324337
didSucceed = false
325338
}
326339
job.Resources = failedResources
327-
p.updatePoolAndReconcileIfRequired(instanceResource.resourcePool, job, didSucceed)
340+
p.updatePoolAndReconcileIfRequired(instanceResource.resourcePool, job, didSucceed, true)
328341
}
329342

330343
func (p *ipv4PrefixProvider) ReSyncPool(job *worker.WarmPoolJob) {
@@ -363,9 +376,10 @@ func (p *ipv4PrefixProvider) ProcessDeleteQueue(job *worker.WarmPoolJob) (ctrl.R
363376
}
364377

365378
// updatePoolAndReconcileIfRequired updates the resource pool and reconcile again and submit a new job if required
366-
func (p *ipv4PrefixProvider) updatePoolAndReconcileIfRequired(resourcePool pool.Pool, job *worker.WarmPoolJob, didSucceed bool) {
379+
func (p *ipv4PrefixProvider) updatePoolAndReconcileIfRequired(resourcePool pool.Pool, job *worker.WarmPoolJob, didSucceed bool,
380+
prefixAvailable bool) {
367381
// Update the pool to add the created/failed resource to the warm pool and decrement the pending count
368-
shouldReconcile := resourcePool.UpdatePool(job, didSucceed)
382+
shouldReconcile := resourcePool.UpdatePool(job, didSucceed, prefixAvailable)
369383

370384
if shouldReconcile {
371385
job := resourcePool.ReconcilePool()

pkg/provider/prefix/provider_test.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,9 @@ func TestIpv4PrefixProvider_updatePoolAndReconcileIfRequired_NoFurtherReconcile(
135135

136136
job := &worker.WarmPoolJob{Operations: worker.OperationCreate}
137137

138-
mockPool.EXPECT().UpdatePool(job, true).Return(false)
138+
mockPool.EXPECT().UpdatePool(job, true, true).Return(false)
139139

140-
provider.updatePoolAndReconcileIfRequired(mockPool, job, true)
140+
provider.updatePoolAndReconcileIfRequired(mockPool, job, true, true)
141141
}
142142

143143
// TestIpv4Provider_updatePoolAndReconcileIfRequired_ReconcileRequired tests pool is updated and reconciliation is
@@ -152,11 +152,11 @@ func TestIpv4Provider_updatePoolAndReconcileIfRequired_ReconcileRequired(t *test
152152

153153
job := &worker.WarmPoolJob{Operations: worker.OperationCreate}
154154

155-
mockPool.EXPECT().UpdatePool(job, true).Return(true)
155+
mockPool.EXPECT().UpdatePool(job, true, true).Return(true)
156156
mockPool.EXPECT().ReconcilePool().Return(job)
157157
mockWorker.EXPECT().SubmitJob(job)
158158

159-
provider.updatePoolAndReconcileIfRequired(mockPool, job, true)
159+
provider.updatePoolAndReconcileIfRequired(mockPool, job, true, true)
160160
}
161161

162162
// TestIpv4PrefixProvider_DeleteIPv4PrefixAndUpdatePool tests job with empty resources is passed back if some resource
@@ -184,7 +184,7 @@ func TestIpv4PrefixProvider_DeleteIPv4PrefixAndUpdatePool(t *testing.T) {
184184
Resources: []string{},
185185
ResourceCount: 2,
186186
NodeName: nodeName,
187-
}, true).Return(false)
187+
}, true, true).Return(false)
188188

189189
provider.DeleteIPv4PrefixAndUpdatePool(deleteJob)
190190
}
@@ -215,7 +215,7 @@ func TestIpv4PrefixProvider_DeletePrivateIPv4AndUpdatePool_SomeResourceFail(t *t
215215
Resources: failedResources,
216216
ResourceCount: 2,
217217
NodeName: nodeName,
218-
}, true).Return(false)
218+
}, true, true).Return(false)
219219

220220
prefixProvider.DeleteIPv4PrefixAndUpdatePool(&deleteJob)
221221
}
@@ -245,7 +245,7 @@ func TestIPv4PrefixProvider_CreateIPv4PrefixAndUpdatePool(t *testing.T) {
245245
Resources: createdResources,
246246
ResourceCount: 2,
247247
NodeName: nodeName,
248-
}, true).Return(false)
248+
}, true, true).Return(false)
249249

250250
prefixProvider.CreateIPv4PrefixAndUpdatePool(createJob)
251251
}
@@ -256,7 +256,9 @@ func TestIPv4PrefixProvider_CreateIPv4PrefixAndUpdatePool_Fail(t *testing.T) {
256256
ctrl := gomock.NewController(t)
257257
defer ctrl.Finish()
258258

259-
prefixProvider := getMockIPv4PrefixProvider()
259+
mockWorker := mock_worker.NewMockWorker(ctrl)
260+
prefixProvider := ipv4PrefixProvider{instanceProviderAndPool: map[string]*ResourceProviderAndPool{}, workerPool: mockWorker,
261+
log: zap.New(zap.UseDevMode(true)).WithName("prefix provider")}
260262
mockPool := mock_pool.NewMockPool(ctrl)
261263
mockManager := mock_eni.NewMockENIManager(ctrl)
262264
prefixProvider.putInstanceProviderAndPool(nodeName, mockPool, mockManager, nodeCapacity, true)
@@ -276,7 +278,11 @@ func TestIPv4PrefixProvider_CreateIPv4PrefixAndUpdatePool_Fail(t *testing.T) {
276278
Resources: createdResources,
277279
ResourceCount: 3,
278280
NodeName: nodeName,
279-
}, false).Return(false)
281+
}, false, true).Return(true)
282+
283+
job := &worker.WarmPoolJob{Operations: worker.OperationCreate}
284+
mockPool.EXPECT().ReconcilePool().Return(job)
285+
mockWorker.EXPECT().SubmitJob(job)
280286

281287
prefixProvider.CreateIPv4PrefixAndUpdatePool(createJob)
282288
}
@@ -307,12 +313,15 @@ func TestIPv4PrefixProvider_CreateIPv4PrefixAndUpdatePool_Fail_InsufficientCidrB
307313

308314
mockManager.EXPECT().CreateIPV4Resource(3, config.ResourceTypeIPv4Prefix, nil, gomock.Any()).Return(createdResources,
309315
fmt.Errorf("InsufficientCidrBlocks: The specified subnet does not have enough free cidr blocks to satisfy the request. Status"))
316+
317+
// Since InsufficientCidrBlocks error is not retryable, didSucceed was not set as false to not re-sync or reconcile the pool.
318+
// Also set prefixAvailable as false to notify pool
310319
mockPool.EXPECT().UpdatePool(&worker.WarmPoolJob{
311320
Operations: worker.OperationCreate,
312321
Resources: createdResources,
313322
ResourceCount: 3,
314323
NodeName: nodeName,
315-
}, false).Return(false)
324+
}, true, false).Return(false)
316325

317326
mockK8sWrapper.EXPECT().GetNode(nodeName).Return(node, nil).Times(1)
318327
mockK8sWrapper.EXPECT().BroadcastEvent(node, utils.InsufficientCidrBlocksReason, utils.ErrInsufficientCidrBlocks.Error(), v1.EventTypeWarning).Times(1)

pkg/utils/errors.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,23 @@
11
package utils
22

3-
import "errors"
3+
import (
4+
"errors"
5+
"strings"
6+
)
47

58
var (
69
ErrNotFound = errors.New("resource was not found")
710
ErrInsufficientCidrBlocks = errors.New("InsufficientCidrBlocks: The specified subnet does not have enough free cidr blocks to satisfy the request")
811
ErrMsgProviderAndPoolNotFound = "cannot find the instance provider and pool from the cache"
12+
NotRetryErrors = []string{InsufficientCidrBlocksReason}
913
)
14+
15+
// ShouldRetryOnError returns true if the error is retryable, else returns false
16+
func ShouldRetryOnError(err error) bool {
17+
for _, e := range NotRetryErrors {
18+
if strings.HasPrefix(err.Error(), e) {
19+
return false
20+
}
21+
}
22+
return true
23+
}

0 commit comments

Comments
 (0)