Skip to content

Commit 20e4e6b

Browse files
Merge pull request #167 from mladjan-gadzic/bugfixes
Bugfixes
2 parents cf3775b + 9e8319e commit 20e4e6b

File tree

14 files changed

+986
-73
lines changed

14 files changed

+986
-73
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ GOPROXY ?= $(shell go env GOPROXY)
5555
# Go tools
5656
GO = go
5757

58-
TIMEOUT = 15
58+
TIMEOUT = 30
5959
Q = $(if $(filter 1,$V),,@)
6060

6161
.PHONY: all

pkg/daemon/daemon.go

Lines changed: 111 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -149,22 +149,18 @@ func NewDaemon() (Daemon, error) {
149149
return nil, err
150150
}
151151

152-
// Reset guid pool with already allocated guids to avoid collisions
153-
err = syncGUIDPool(smClient, guidPool)
154-
if err != nil {
155-
return nil, err
156-
}
157-
158152
podWatcher := watcher.NewWatcher(podEventHandler, client)
159153
nadWatcher := watcher.NewWatcher(nadEventHandler, client)
154+
155+
// Return daemon fully formed
160156
return &daemon{
161157
config: daemonConfig,
162-
podWatcher: podWatcher,
163-
nadWatcher: nadWatcher,
164158
kubeClient: client,
165159
guidPool: guidPool,
166160
smClient: smClient,
167161
guidPodNetworkMap: make(map[string]string),
162+
podWatcher: podWatcher,
163+
nadWatcher: nadWatcher,
168164
nadCache: make(map[string]*v1.NetworkAttachmentDefinition),
169165
}, nil
170166
}
@@ -278,8 +274,8 @@ func (d *daemon) becomeLeader() error {
278274
log.Info().Msg("Becoming leader, initializing daemon logic")
279275

280276
// Initialize the GUID pool (rebuild state from existing pods)
281-
if err := d.initPool(); err != nil {
282-
log.Error().Msgf("initPool(): Leader could not init the guid pool: %v", err)
277+
if err := d.initGUIDPool(); err != nil {
278+
log.Error().Msgf("initGUIDPool(): Leader could not init the guid pool: %v", err)
283279
return fmt.Errorf("failed to initialize GUID pool as leader: %v", err)
284280
}
285281

@@ -410,14 +406,22 @@ func (d *daemon) processPodsForNetwork(
410406
}
411407

412408
// Verify if GUID already exist for given network ID and allocates new one if not
413-
func (d *daemon) allocatePodNetworkGUID(allocatedGUID, podNetworkID string, podUID types.UID) error {
409+
func (d *daemon) allocatePodNetworkGUID(allocatedGUID, podNetworkID string, podUID types.UID, targetPkey string) error {
410+
existingPkey, _ := d.guidPool.Get(allocatedGUID)
411+
if existingPkey != "" {
412+
// This happens when a GUID is being reallocated to a different PKey
413+
// (e.g., pod was rescheduled or network configuration changed)
414+
if err := d.removeStaleGUID(allocatedGUID, existingPkey); err != nil {
415+
log.Warn().Msgf("failed to remove stale GUID %s from pkey %s: %v", allocatedGUID, existingPkey, err)
416+
}
417+
}
414418
if mappedID, exist := d.guidPodNetworkMap[allocatedGUID]; exist {
415419
if podNetworkID != mappedID {
416420
return fmt.Errorf("failed to allocate requested guid %s, already allocated for %s",
417421
allocatedGUID, mappedID)
418422
}
419-
} else if err := d.guidPool.AllocateGUID(allocatedGUID); err != nil {
420-
return fmt.Errorf("failed to allocate GUID for pod ID %s, wit error: %v", podUID, err)
423+
} else if err := d.guidPool.AllocateGUID(allocatedGUID, targetPkey); err != nil {
424+
return fmt.Errorf("failed to allocate GUID for pod ID %s, with error: %v", podUID, err)
421425
} else {
422426
d.guidPodNetworkMap[allocatedGUID] = podNetworkID
423427
}
@@ -448,7 +452,7 @@ func (d *daemon) processNetworkGUID(
448452
return fmt.Errorf("failed to parse user allocated guid %s with error: %v", allocatedGUID, err)
449453
}
450454

451-
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID)
455+
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID, spec.PKey)
452456
if err != nil {
453457
return err
454458
}
@@ -458,7 +462,7 @@ func (d *daemon) processNetworkGUID(
458462
switch err {
459463
// If the guid pool is exhausted, need to sync with SM in case there are unsynced changes
460464
case guid.ErrGUIDPoolExhausted:
461-
err = syncGUIDPool(d.smClient, d.guidPool)
465+
err = d.syncWithSubnetManager()
462466
if err != nil {
463467
return err
464468
}
@@ -468,7 +472,7 @@ func (d *daemon) processNetworkGUID(
468472
}
469473

470474
allocatedGUID = guidAddr.String()
471-
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID)
475+
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID, spec.PKey)
472476
if err != nil {
473477
return err
474478
}
@@ -492,28 +496,53 @@ func (d *daemon) processNetworkGUID(
492496
return nil
493497
}
494498

495-
func syncGUIDPool(smClient plugins.SubnetManagerClient, guidPool guid.Pool) error {
496-
usedGuids, err := smClient.ListGuidsInUse()
499+
func (d *daemon) removeStaleGUID(allocatedGUID, existingPkey string) error {
500+
parsedPkey, err := utils.ParsePKey(existingPkey)
497501
if err != nil {
502+
log.Error().Msgf("failed to parse PKey %s with error: %v", existingPkey, err)
498503
return err
499504
}
500-
501-
// Reset guid pool with already allocated guids to avoid collisions
502-
err = guidPool.Reset(usedGuids)
505+
guidAddr, err := guid.ParseGUID(allocatedGUID)
503506
if err != nil {
507+
return fmt.Errorf("failed to parse user allocated guid %s with error: %v", allocatedGUID, err)
508+
}
509+
allocatedGUIDList := []net.HardwareAddr{guidAddr.HardWareAddress()}
510+
// Try to remove pKeys via subnet manager in backoff loop
511+
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
512+
log.Info().Msgf("removing guids of previous pods from pKey %s"+
513+
" with subnet manager %s", existingPkey,
514+
d.smClient.Name())
515+
if err = d.smClient.RemoveGuidsFromPKey(parsedPkey, allocatedGUIDList); err != nil {
516+
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+
517+
" with subnet manager %s with error: %v", existingPkey,
518+
d.smClient.Name(), err)
519+
return false, nil //nolint:nilerr // retry pattern for exponential backoff
520+
}
521+
return true, nil
522+
}); err != nil {
523+
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+
524+
" with subnet manager %s", existingPkey, d.smClient.Name())
525+
return err
526+
}
527+
528+
if err = d.guidPool.ReleaseGUID(allocatedGUID); err != nil {
529+
log.Warn().Msgf("failed to release guid \"%s\" with error: %v", allocatedGUID, err)
504530
return err
505531
}
532+
delete(d.guidPodNetworkMap, allocatedGUID)
533+
log.Info().Msgf("successfully released %s from pkey %s", allocatedGUID, existingPkey)
506534
return nil
507535
}
508536

509537
// Update and set Pod's network annotation.
510538
// If failed to update annotation, pod's GUID added into the list to be removed from Pkey.
511-
func (d *daemon) updatePodNetworkAnnotation(pi *podNetworkInfo, removedList *[]net.HardwareAddr) error {
539+
func (d *daemon) updatePodNetworkAnnotation(pi *podNetworkInfo, removedList *[]net.HardwareAddr, pkey string) error {
512540
if pi.ibNetwork.CNIArgs == nil {
513541
pi.ibNetwork.CNIArgs = &map[string]interface{}{}
514542
}
515543

516544
(*pi.ibNetwork.CNIArgs)[utils.InfiniBandAnnotation] = utils.ConfiguredInfiniBandPod
545+
(*pi.ibNetwork.CNIArgs)[utils.PkeyAnnotation] = pkey
517546

518547
netAnnotations, err := json.Marshal(pi.networks)
519548
if err != nil {
@@ -605,7 +634,7 @@ func (d *daemon) AddPeriodicUpdate() {
605634
// Update annotations for PODs that finished the previous steps successfully
606635
var removedGUIDList []net.HardwareAddr
607636
for _, pi := range passedPods {
608-
err = d.updatePodNetworkAnnotation(pi, &removedGUIDList)
637+
err = d.updatePodNetworkAnnotation(pi, &removedGUIDList, ibCniSpec.PKey)
609638
if err != nil {
610639
log.Error().Msgf("%v", err)
611640
}
@@ -712,7 +741,21 @@ func (d *daemon) DeletePeriodicUpdate() {
712741
continue
713742
}
714743

715-
guidList = append(guidList, podGUIDs...)
744+
// Process each GUID from the pod
745+
for _, guidAddr := range podGUIDs {
746+
podNetworkID := utils.GeneratePodNetworkID(pod, networkName)
747+
if guidPodEntry, exist := d.guidPodNetworkMap[guidAddr.String()]; exist {
748+
if podNetworkID == guidPodEntry {
749+
log.Info().Msgf("matched guid %s to pod %s, removing", guidAddr, guidPodEntry)
750+
guidList = append(guidList, guidAddr)
751+
} else {
752+
log.Warn().Msgf("guid %s is allocated to another pod %s not %s, not removing",
753+
guidAddr, guidPodEntry, podNetworkID)
754+
}
755+
} else {
756+
log.Warn().Msgf("guid %s is not allocated to any pod on delete", guidAddr)
757+
}
758+
}
716759
}
717760

718761
if ibCniSpec.PKey != "" && len(guidList) != 0 {
@@ -810,11 +853,12 @@ func (d *daemon) getCachedNAD(networkID string) (*v1.NetworkAttachmentDefinition
810853
return netAttInfo, nil
811854
}
812855

813-
// initPool check the guids that are already allocated by the running pods
814-
func (d *daemon) initPool() error {
856+
// initGUIDPool initializes the GUID pool by first populating guidPodNetworkMap with existing pods,
857+
// then syncing with subnet manager and cleaning up stale GUIDs
858+
func (d *daemon) initGUIDPool() error {
815859
log.Info().Msg("Initializing GUID pool.")
816860

817-
// Try to get pod list from k8s client in backoff loop
861+
// First populate guidPodNetworkMap with existing pods
818862
var pods *kapi.PodList
819863
if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) {
820864
var err error
@@ -832,6 +876,9 @@ func (d *daemon) initPool() error {
832876
for index := range pods.Items {
833877
log.Debug().Msgf("checking pod for network annotations %v", pods.Items[index])
834878
pod := pods.Items[index]
879+
if utils.PodIsFinished(&pod) {
880+
continue
881+
}
835882
networks, err := netAttUtils.ParsePodNetworkAnnotation(&pod)
836883
if err != nil {
837884
continue
@@ -846,6 +893,7 @@ func (d *daemon) initPool() error {
846893
if err != nil {
847894
continue
848895
}
896+
849897
podNetworkID := string(pod.UID) + network.Name
850898
if _, exist := d.guidPodNetworkMap[podGUID]; exist {
851899
if podNetworkID != d.guidPodNetworkMap[podGUID] {
@@ -854,8 +902,8 @@ func (d *daemon) initPool() error {
854902
}
855903
continue
856904
}
857-
858-
if err = d.guidPool.AllocateGUID(podGUID); err != nil {
905+
podPkey, _ := utils.GetPodNetworkPkey(network)
906+
if err = d.guidPool.AllocateGUID(podGUID, podPkey); err != nil {
859907
err = fmt.Errorf("failed to allocate guid for running pod: %v", err)
860908
log.Error().Msgf("%v", err)
861909
continue
@@ -865,5 +913,39 @@ func (d *daemon) initPool() error {
865913
}
866914
}
867915

916+
// Now sync with subnet manager and clean up stale GUIDs
917+
return d.syncWithSubnetManager()
918+
}
919+
920+
// syncWithSubnetManager syncs the GUID pool with the subnet manager
921+
// This is used both during initialization and when the pool is exhausted at runtime
922+
func (d *daemon) syncWithSubnetManager() error {
923+
usedGuids, err := d.smClient.ListGuidsInUse()
924+
if err != nil {
925+
return err
926+
}
927+
928+
// Reset guid pool with already allocated guids to avoid collisions
929+
err = d.guidPool.Reset(usedGuids)
930+
if err != nil {
931+
return err
932+
}
933+
934+
// Remove stale GUIDs that are no longer in use by the subnet manager
935+
// This handles cleanup of GUIDs from deleted/finished pods
936+
for allocatedGUID, podNetworkID := range d.guidPodNetworkMap {
937+
if _, found := usedGuids[allocatedGUID]; !found {
938+
// If GUID is not found in the subnet manager's list of used GUIDs,
939+
// it means the pod was deleted/finished and we should clean it up
940+
log.Info().Msgf("removing stale GUID %s for pod network %s", allocatedGUID, podNetworkID)
941+
if err = d.guidPool.ReleaseGUID(allocatedGUID); err != nil {
942+
log.Warn().Msgf("failed to release stale guid \"%s\" with error: %v", allocatedGUID, err)
943+
} else {
944+
delete(d.guidPodNetworkMap, allocatedGUID)
945+
log.Info().Msgf("successfully cleaned up stale GUID %s", allocatedGUID)
946+
}
947+
}
948+
}
949+
868950
return nil
869951
}

pkg/daemon/daemon_suite_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2025 NVIDIA CORPORATION & AFFILIATES
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// SPDX-License-Identifier: Apache-2.0
16+
17+
package daemon_test
18+
19+
import (
20+
"testing"
21+
22+
. "github.com/onsi/ginkgo/v2"
23+
. "github.com/onsi/gomega"
24+
)
25+
26+
func TestDaemon(t *testing.T) {
27+
RegisterFailHandler(Fail)
28+
RunSpecs(t, "Daemon Suite")
29+
}

0 commit comments

Comments
 (0)