Skip to content

Commit 9913136

Browse files
authored
Merge pull request #82 from DmytroLinkin/periodic_update_network
Periodic update network
2 parents 41bb817 + a9a762c commit 9913136

File tree

2 files changed

+105
-61
lines changed

2 files changed

+105
-61
lines changed

pkg/daemon/codes.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,4 @@ package daemon
22

33
const (
44
ErrUnknown = iota // Default value
5-
ErrGUIDAlreadyAllocated
6-
ErrNetworkNotConfigured
7-
ErrNotIBSriovNetwork
85
)

pkg/daemon/daemon.go

Lines changed: 105 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"k8s.io/apimachinery/pkg/util/wait"
2020

2121
"github.com/Mellanox/ib-kubernetes/pkg/config"
22-
"github.com/Mellanox/ib-kubernetes/pkg/errcode"
2322
"github.com/Mellanox/ib-kubernetes/pkg/guid"
2423
k8sClient "github.com/Mellanox/ib-kubernetes/pkg/k8s-client"
2524
"github.com/Mellanox/ib-kubernetes/pkg/sm"
@@ -55,6 +54,12 @@ type networksMap struct {
5554
theMap map[types.UID][]*v1.NetworkSelectionElement
5655
}
5756

57+
// Exponential backoff ~26 sec + 6 * <api call time>
58+
// NOTE: k8s client has built in exponential backoff, which ib-kubernetes don't use.
59+
// In case client's backoff was configured time may dramatically increase.
60+
// NOTE: ufm client has default timeout on request operation for 30 seconds.
61+
var backoffValues = wait.Backoff{Duration: 1 * time.Second, Factor: 1.6, Jitter: 0.1, Steps: 6}
62+
5863
// Return networks mapped to the pod. If mapping not exist it is created
5964
func (n *networksMap) getPodNetworks(pod *kapi.Pod) ([]*v1.NetworkSelectionElement, error) {
6065
var err error
@@ -107,8 +112,17 @@ func NewDaemon() (Daemon, error) {
107112
return nil, err
108113
}
109114

110-
if err := smClient.Validate(); err != nil {
111-
return nil, err
115+
// Try to validate if subnet manager is reachable in backoff loop
116+
var validateErr error
117+
if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) {
118+
if err := smClient.Validate(); err != nil {
119+
log.Warn().Msgf("%v", err)
120+
validateErr = err
121+
return false, nil
122+
}
123+
return true, nil
124+
}); err != nil {
125+
return nil, validateErr
112126
}
113127

114128
podWatcher := watcher.NewWatcher(podEventHandler, client)
@@ -155,9 +169,18 @@ func (d *daemon) getIbSriovNetwork(networkID string) (string, *utils.IbSriovCniS
155169
return "", nil, fmt.Errorf("failed to parse network id %s with error: %v", networkID, err)
156170
}
157171

158-
netAttInfo, err := d.kubeClient.GetNetworkAttachmentDefinition(networkNamespace, networkName)
159-
if err != nil {
160-
return "", nil, fmt.Errorf("failed to get networkName attachment %s with error: %v", networkName, err)
172+
// Try to get net-attach-def in backoff loop
173+
var netAttInfo *v1.NetworkAttachmentDefinition
174+
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
175+
netAttInfo, err = d.kubeClient.GetNetworkAttachmentDefinition(networkNamespace, networkName)
176+
if err != nil {
177+
log.Warn().Msgf("failed to get networkName attachment %s with error %v",
178+
networkName, err)
179+
return false, nil
180+
}
181+
return true, nil
182+
}); err != nil {
183+
return "", nil, fmt.Errorf("failed to get networkName attachment %s", networkName)
161184
}
162185
log.Debug().Msgf("networkName attachment %v", netAttInfo)
163186

@@ -170,7 +193,7 @@ func (d *daemon) getIbSriovNetwork(networkID string) (string, *utils.IbSriovCniS
170193

171194
ibCniSpec, err := utils.GetIbSriovCniFromNetwork(networkSpec)
172195
if err != nil {
173-
return "", nil, errcode.Errorf(ErrNotIBSriovNetwork,
196+
return "", nil, fmt.Errorf(
174197
"failed to get InfiniBand SR-IOV CNI spec from network attachment %+v, with error %v",
175198
networkSpec, err)
176199
}
@@ -202,8 +225,8 @@ func getPodNetworkInfo(netName string, pod *kapi.Pod, netMap networksMap) (*podN
202225
func (d *daemon) allocatePodNetworkGUID(allocatedGUID, podNetworkID string, podUID types.UID) error {
203226
if mappedID, exist := d.guidPodNetworkMap[allocatedGUID]; exist {
204227
if podNetworkID != mappedID {
205-
return errcode.Errorf(ErrGUIDAlreadyAllocated,
206-
"failed to allocate requested guid %s, already allocated for %s", allocatedGUID, mappedID)
228+
return fmt.Errorf("failed to allocate requested guid %s, already allocated for %s",
229+
allocatedGUID, mappedID)
207230
}
208231
} else if err := d.guidPool.AllocateGUID(allocatedGUID); err != nil {
209232
return fmt.Errorf("failed to allocate GUID for pod ID %s, wit error: %v", podUID, err)
@@ -277,11 +300,20 @@ func (d *daemon) updatePodNetworkAnnotation(pi *podNetworkInfo, removedList *[]n
277300

278301
pi.pod.Annotations[v1.NetworkAttachmentAnnot] = string(netAnnotations)
279302

280-
if err := d.kubeClient.SetAnnotationsOnPod(pi.pod, pi.pod.Annotations); err != nil {
281-
if kerrors.IsNotFound(err) {
282-
return fmt.Errorf("failed to update pod annotations with err: %v", err)
303+
// Try to set pod's annotations in backoff loop
304+
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
305+
if err = d.kubeClient.SetAnnotationsOnPod(pi.pod, pi.pod.Annotations); err != nil {
306+
if kerrors.IsNotFound(err) {
307+
return false, err
308+
}
309+
log.Warn().Msgf("failed to update pod annotations with err: %v", err)
310+
return false, nil
283311
}
284312

313+
return true, nil
314+
}); err != nil {
315+
log.Error().Msgf("failed to update pod annotations")
316+
285317
if err = d.guidPool.ReleaseGUID(pi.addr.String()); err != nil {
286318
log.Warn().Msgf("failed to release guid \"%s\" from removed pod \"%s\" in namespace "+
287319
"\"%s\" with error: %v", pi.addr.String(), pi.pod.Name, pi.pod.Namespace, err)
@@ -319,29 +351,22 @@ func (d *daemon) AddPeriodicUpdate() {
319351
log.Info().Msgf("processing network networkID %s", networkID)
320352
networkName, ibCniSpec, err := d.getIbSriovNetwork(networkID)
321353
if err != nil {
322-
if errcode.GetCode(err) == ErrNotIBSriovNetwork {
323-
// Not an IB SR-IOV network
324-
addMap.UnSafeRemove(networkID)
325-
}
326-
log.Error().Msgf("%v", err)
354+
addMap.UnSafeRemove(networkID)
355+
log.Error().Msgf("droping network: %v", err)
327356
continue
328357
}
329358

330359
var guidList []net.HardwareAddr
331360
var passedPods []*podNetworkInfo
332-
var failedPods []*kapi.Pod
333361
for _, pod := range pods {
334362
log.Debug().Msgf("pod namespace %s name %s", pod.Namespace, pod.Name)
335-
pi, err := getPodNetworkInfo(networkName, pod, netMap)
363+
var pi *podNetworkInfo
364+
pi, err = getPodNetworkInfo(networkName, pod, netMap)
336365
if err != nil {
337-
failedPods = append(failedPods, pod)
338366
log.Error().Msgf("%v", err)
339367
continue
340368
}
341-
if err := d.processNetworkGUID(networkName, ibCniSpec, pi); err != nil {
342-
if errcode.GetCode(err) != ErrGUIDAlreadyAllocated {
343-
failedPods = append(failedPods, pod)
344-
}
369+
if err = d.processNetworkGUID(networkName, ibCniSpec, pi); err != nil {
345370
log.Error().Msgf("%v", err)
346371
continue
347372
}
@@ -352,46 +377,57 @@ func (d *daemon) AddPeriodicUpdate() {
352377

353378
// Get configured PKEY for network and add the relevant POD GUIDs as members of the PKey via Subnet Manager
354379
if ibCniSpec.PKey != "" && len(guidList) != 0 {
355-
pKey, err := utils.ParsePKey(ibCniSpec.PKey)
380+
var pKey int
381+
pKey, err = utils.ParsePKey(ibCniSpec.PKey)
356382
if err != nil {
357383
log.Error().Msgf("failed to parse PKey %s with error: %v", ibCniSpec.PKey, err)
358384
continue
359385
}
360386

361-
if err = d.smClient.AddGuidsToPKey(pKey, guidList); err != nil {
362-
log.Error().Msgf("failed to config pKey with subnet manager %s with error: %v",
363-
d.smClient.Name(), err)
387+
// Try to add pKeys via subnet manager in backoff loop
388+
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
389+
if err = d.smClient.AddGuidsToPKey(pKey, guidList); err != nil {
390+
log.Warn().Msgf("failed to config pKey with subnet manager %s with error : %v",
391+
d.smClient.Name(), err)
392+
return false, nil
393+
}
394+
return true, nil
395+
}); err != nil {
396+
log.Error().Msgf("failed to config pKey with subnet manager %s", d.smClient.Name())
364397
continue
365398
}
366399
}
367400

368401
// Update annotations for PODs that finished the previous steps successfully
369402
var removedGUIDList []net.HardwareAddr
370403
for _, pi := range passedPods {
371-
err := d.updatePodNetworkAnnotation(pi, &removedGUIDList)
404+
err = d.updatePodNetworkAnnotation(pi, &removedGUIDList)
372405
if err != nil {
373-
failedPods = append(failedPods, pi.pod)
374406
log.Error().Msgf("%v", err)
375-
continue
376407
}
377408
}
378409

379410
if ibCniSpec.PKey != "" && len(removedGUIDList) != 0 {
380411
// Already check the parse above
381412
pKey, _ := utils.ParsePKey(ibCniSpec.PKey)
382-
if pkeyErr := d.smClient.RemoveGuidsFromPKey(pKey, removedGUIDList); pkeyErr != nil {
383-
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s with subnet manager %s with error: %v",
384-
ibCniSpec.PKey, d.smClient.Name(), pkeyErr)
413+
414+
// Try to remove pKeys via subnet manager in backoff loop
415+
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
416+
if err = d.smClient.RemoveGuidsFromPKey(pKey, removedGUIDList); err != nil {
417+
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+
418+
" with subnet manager %s with error: %v", ibCniSpec.PKey,
419+
d.smClient.Name(), err)
420+
return false, nil
421+
}
422+
return true, nil
423+
}); err != nil {
424+
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+
425+
" with subnet manager %s", ibCniSpec.PKey, d.smClient.Name())
385426
continue
386427
}
387428
}
388429

389-
if len(failedPods) == 0 {
390-
addMap.UnSafeRemove(networkID)
391-
} else {
392-
// Requeue faild pods for the network, they will be processed in the next AddPeriodicUpdate iteration
393-
addMap.UnSafeSet(networkID, failedPods)
394-
}
430+
addMap.UnSafeRemove(networkID)
395431
}
396432
log.Info().Msg("add periodic update finished")
397433
}
@@ -410,7 +446,7 @@ func getPodGUIDForNetwork(pod *kapi.Pod, networkName string) (net.HardwareAddr,
410446
}
411447

412448
if !utils.IsPodNetworkConfiguredWithInfiniBand(network) {
413-
return nil, errcode.Errorf(ErrNetworkNotConfigured, "network %+v is not InfiniBand configured", network)
449+
return nil, fmt.Errorf("network %+v is not InfiniBand configured", network)
414450
}
415451

416452
allocatedGUID, netErr := utils.GetPodNetworkGUID(network)
@@ -446,20 +482,17 @@ func (d *daemon) DeletePeriodicUpdate() {
446482

447483
networkName, ibCniSpec, err := d.getIbSriovNetwork(networkID)
448484
if err != nil {
449-
log.Error().Msgf("%v", err)
485+
deleteMap.UnSafeRemove(networkID)
486+
log.Warn().Msgf("droping network: %v", err)
450487
continue
451488
}
452489

453490
var guidList []net.HardwareAddr
454-
var failedPods []*kapi.Pod
455491
var guidAddr net.HardwareAddr
456492
for _, pod := range pods {
457493
log.Debug().Msgf("pod namespace %s name %s", pod.Namespace, pod.Name)
458494
guidAddr, err = getPodGUIDForNetwork(pod, networkName)
459495
if err != nil {
460-
if errcode.GetCode(err) != ErrNetworkNotConfigured {
461-
failedPods = append(failedPods, pod)
462-
}
463496
log.Error().Msgf("%v", err)
464497
continue
465498
}
@@ -474,9 +507,18 @@ func (d *daemon) DeletePeriodicUpdate() {
474507
continue
475508
}
476509

477-
if pkeyErr = d.smClient.RemoveGuidsFromPKey(pKey, guidList); pkeyErr != nil {
478-
log.Error().Msgf("failed to config pKey with subnet manager %s with error: %v",
479-
d.smClient.Name(), pkeyErr)
510+
// Try to remove pKeys via subnet manager on backoff loop
511+
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
512+
if err = d.smClient.RemoveGuidsFromPKey(pKey, guidList); err != nil {
513+
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+
514+
" with subnet manager %s with error: %v", ibCniSpec.PKey,
515+
d.smClient.Name(), err)
516+
return false, nil
517+
}
518+
return true, nil
519+
}); err != nil {
520+
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+
521+
" with subnet manager %s", ibCniSpec.PKey, d.smClient.Name())
480522
continue
481523
}
482524
}
@@ -489,11 +531,7 @@ func (d *daemon) DeletePeriodicUpdate() {
489531

490532
delete(d.guidPodNetworkMap, guidAddr.String())
491533
}
492-
if len(failedPods) == 0 {
493-
deleteMap.UnSafeRemove(networkID)
494-
} else {
495-
deleteMap.UnSafeSet(networkID, failedPods)
496-
}
534+
deleteMap.UnSafeRemove(networkID)
497535
}
498536

499537
log.Info().Msg("delete periodic update finished")
@@ -502,10 +540,19 @@ func (d *daemon) DeletePeriodicUpdate() {
502540
// initPool check the guids that are already allocated by the running pods
503541
func (d *daemon) initPool() error {
504542
log.Info().Msg("Initializing GUID pool.")
505-
pods, err := d.kubeClient.GetPods(kapi.NamespaceAll)
506-
if err != nil {
507-
err = fmt.Errorf("failed to get pods from kubernetes: %v", err)
508-
log.Err(err)
543+
544+
// Try to get pod list from k8s client in backoff loop
545+
var pods *kapi.PodList
546+
if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) {
547+
var err error
548+
if pods, err = d.kubeClient.GetPods(kapi.NamespaceAll); err != nil {
549+
log.Warn().Msgf("failed to get pods from kubernetes: %v", err)
550+
return false, nil
551+
}
552+
return true, nil
553+
}); err != nil {
554+
err = fmt.Errorf("failed to get pods from kubernetes")
555+
log.Error().Msgf("%v", err)
509556
return err
510557
}
511558

0 commit comments

Comments
 (0)