Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ RUN go mod download
ADD ./ ./
RUN make all

FROM nvcr.io/nvidia/distroless/go:v3.1.13
FROM nvcr.io/nvidia/distroless/go:v3.2.0
LABEL org.opencontainers.image.source=https://nvcr.io/nvidia/cloud-native/multus-cni
WORKDIR /
# Copy the built binary and plugins from the builder stage
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ GOPROXY ?= $(shell go env GOPROXY)
# Go tools
GO = go

TIMEOUT = 15
TIMEOUT = 30
Q = $(if $(filter 1,$V),,@)

.PHONY: all
Expand Down Expand Up @@ -88,7 +88,7 @@ $(GOVERALLS): | $(BIN_DIR); $(info building goveralls...)
$(call go-install-tool,$(GOVERALLS),github.com/mattn/goveralls@$(GOVERALLS_VERSION)

ENVTEST := $(BIN_DIR)/setup-envtest
ENVTEST_VERSION := latest
ENVTEST_VERSION := release-0.21
ENVTEST_K8S_VERSION := 1.30.0
.PHONY: envtest
envtest: $(ENVTEST) ## Download envtest if necessary
Expand Down
140 changes: 111 additions & 29 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,18 @@ func NewDaemon() (Daemon, error) {
return nil, err
}

// Reset guid pool with already allocated guids to avoid collisions
err = syncGUIDPool(smClient, guidPool)
if err != nil {
return nil, err
}

podWatcher := watcher.NewWatcher(podEventHandler, client)
nadWatcher := watcher.NewWatcher(nadEventHandler, client)

// Return daemon fully formed
return &daemon{
config: daemonConfig,
podWatcher: podWatcher,
nadWatcher: nadWatcher,
kubeClient: client,
guidPool: guidPool,
smClient: smClient,
guidPodNetworkMap: make(map[string]string),
podWatcher: podWatcher,
nadWatcher: nadWatcher,
nadCache: make(map[string]*v1.NetworkAttachmentDefinition),
}, nil
}
Expand Down Expand Up @@ -278,8 +274,8 @@ func (d *daemon) becomeLeader() error {
log.Info().Msg("Becoming leader, initializing daemon logic")

// Initialize the GUID pool (rebuild state from existing pods)
if err := d.initPool(); err != nil {
log.Error().Msgf("initPool(): Leader could not init the guid pool: %v", err)
if err := d.initGUIDPool(); err != nil {
log.Error().Msgf("initGUIDPool(): Leader could not init the guid pool: %v", err)
return fmt.Errorf("failed to initialize GUID pool as leader: %v", err)
}

Expand Down Expand Up @@ -410,14 +406,22 @@ func (d *daemon) processPodsForNetwork(
}

// Verify if GUID already exist for given network ID and allocates new one if not
func (d *daemon) allocatePodNetworkGUID(allocatedGUID, podNetworkID string, podUID types.UID) error {
func (d *daemon) allocatePodNetworkGUID(allocatedGUID, podNetworkID string, podUID types.UID, targetPkey string) error {
existingPkey, _ := d.guidPool.Get(allocatedGUID)
if existingPkey != "" {
// This happens when a GUID is being reallocated to a different PKey
// (e.g., pod was rescheduled or network configuration changed)
if err := d.removeStaleGUID(allocatedGUID, existingPkey); err != nil {
log.Warn().Msgf("failed to remove stale GUID %s from pkey %s: %v", allocatedGUID, existingPkey, err)
}
}
if mappedID, exist := d.guidPodNetworkMap[allocatedGUID]; exist {
if podNetworkID != mappedID {
return fmt.Errorf("failed to allocate requested guid %s, already allocated for %s",
allocatedGUID, mappedID)
}
} else if err := d.guidPool.AllocateGUID(allocatedGUID); err != nil {
return fmt.Errorf("failed to allocate GUID for pod ID %s, wit error: %v", podUID, err)
} else if err := d.guidPool.AllocateGUID(allocatedGUID, targetPkey); err != nil {
return fmt.Errorf("failed to allocate GUID for pod ID %s, with error: %v", podUID, err)
} else {
d.guidPodNetworkMap[allocatedGUID] = podNetworkID
}
Expand Down Expand Up @@ -448,7 +452,7 @@ func (d *daemon) processNetworkGUID(
return fmt.Errorf("failed to parse user allocated guid %s with error: %v", allocatedGUID, err)
}

err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID)
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID, spec.PKey)
if err != nil {
return err
}
Expand All @@ -458,7 +462,7 @@ func (d *daemon) processNetworkGUID(
switch err {
// If the guid pool is exhausted, need to sync with SM in case there are unsynced changes
case guid.ErrGUIDPoolExhausted:
err = syncGUIDPool(d.smClient, d.guidPool)
err = d.syncWithSubnetManager()
if err != nil {
return err
}
Expand All @@ -468,7 +472,7 @@ func (d *daemon) processNetworkGUID(
}

allocatedGUID = guidAddr.String()
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID)
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID, spec.PKey)
if err != nil {
return err
}
Expand All @@ -492,28 +496,53 @@ func (d *daemon) processNetworkGUID(
return nil
}

func syncGUIDPool(smClient plugins.SubnetManagerClient, guidPool guid.Pool) error {
usedGuids, err := smClient.ListGuidsInUse()
func (d *daemon) removeStaleGUID(allocatedGUID, existingPkey string) error {
parsedPkey, err := utils.ParsePKey(existingPkey)
if err != nil {
log.Error().Msgf("failed to parse PKey %s with error: %v", existingPkey, err)
return err
}

// Reset guid pool with already allocated guids to avoid collisions
err = guidPool.Reset(usedGuids)
guidAddr, err := guid.ParseGUID(allocatedGUID)
if err != nil {
return fmt.Errorf("failed to parse user allocated guid %s with error: %v", allocatedGUID, err)
}
allocatedGUIDList := []net.HardwareAddr{guidAddr.HardWareAddress()}
// Try to remove pKeys via subnet manager in backoff loop
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
log.Info().Msgf("removing guids of previous pods from pKey %s"+
" with subnet manager %s", existingPkey,
d.smClient.Name())
if err = d.smClient.RemoveGuidsFromPKey(parsedPkey, allocatedGUIDList); err != nil {
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+
" with subnet manager %s with error: %v", existingPkey,
d.smClient.Name(), err)
return false, nil //nolint:nilerr // retry pattern for exponential backoff
}
return true, nil
}); err != nil {
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+
" with subnet manager %s", existingPkey, d.smClient.Name())
return err
}

if err = d.guidPool.ReleaseGUID(allocatedGUID); err != nil {
log.Warn().Msgf("failed to release guid \"%s\" with error: %v", allocatedGUID, err)
return err
}
delete(d.guidPodNetworkMap, allocatedGUID)
log.Info().Msgf("successfully released %s from pkey %s", allocatedGUID, existingPkey)
return nil
}

// Update and set Pod's network annotation.
// If failed to update annotation, pod's GUID added into the list to be removed from Pkey.
func (d *daemon) updatePodNetworkAnnotation(pi *podNetworkInfo, removedList *[]net.HardwareAddr) error {
func (d *daemon) updatePodNetworkAnnotation(pi *podNetworkInfo, removedList *[]net.HardwareAddr, pkey string) error {
if pi.ibNetwork.CNIArgs == nil {
pi.ibNetwork.CNIArgs = &map[string]interface{}{}
}

(*pi.ibNetwork.CNIArgs)[utils.InfiniBandAnnotation] = utils.ConfiguredInfiniBandPod
(*pi.ibNetwork.CNIArgs)[utils.PkeyAnnotation] = pkey

netAnnotations, err := json.Marshal(pi.networks)
if err != nil {
Expand Down Expand Up @@ -605,7 +634,7 @@ func (d *daemon) AddPeriodicUpdate() {
// Update annotations for PODs that finished the previous steps successfully
var removedGUIDList []net.HardwareAddr
for _, pi := range passedPods {
err = d.updatePodNetworkAnnotation(pi, &removedGUIDList)
err = d.updatePodNetworkAnnotation(pi, &removedGUIDList, ibCniSpec.PKey)
if err != nil {
log.Error().Msgf("%v", err)
}
Expand Down Expand Up @@ -712,7 +741,21 @@ func (d *daemon) DeletePeriodicUpdate() {
continue
}

guidList = append(guidList, podGUIDs...)
// Process each GUID from the pod
for _, guidAddr := range podGUIDs {
podNetworkID := utils.GeneratePodNetworkID(pod, networkName)
if guidPodEntry, exist := d.guidPodNetworkMap[guidAddr.String()]; exist {
if podNetworkID == guidPodEntry {
log.Info().Msgf("matched guid %s to pod %s, removing", guidAddr, guidPodEntry)
guidList = append(guidList, guidAddr)
} else {
log.Warn().Msgf("guid %s is allocated to another pod %s not %s, not removing",
guidAddr, guidPodEntry, podNetworkID)
}
} else {
log.Warn().Msgf("guid %s is not allocated to any pod on delete", guidAddr)
}
}
}

if ibCniSpec.PKey != "" && len(guidList) != 0 {
Expand Down Expand Up @@ -810,11 +853,12 @@ func (d *daemon) getCachedNAD(networkID string) (*v1.NetworkAttachmentDefinition
return netAttInfo, nil
}

// initPool check the guids that are already allocated by the running pods
func (d *daemon) initPool() error {
// initGUIDPool initializes the GUID pool by first populating guidPodNetworkMap with existing pods,
// then syncing with subnet manager and cleaning up stale GUIDs
func (d *daemon) initGUIDPool() error {
log.Info().Msg("Initializing GUID pool.")

// Try to get pod list from k8s client in backoff loop
// First populate guidPodNetworkMap with existing pods
var pods *kapi.PodList
if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) {
var err error
Expand All @@ -832,6 +876,9 @@ func (d *daemon) initPool() error {
for index := range pods.Items {
log.Debug().Msgf("checking pod for network annotations %v", pods.Items[index])
pod := pods.Items[index]
if utils.PodIsFinished(&pod) {
continue
}
networks, err := netAttUtils.ParsePodNetworkAnnotation(&pod)
if err != nil {
continue
Expand All @@ -846,6 +893,7 @@ func (d *daemon) initPool() error {
if err != nil {
continue
}

podNetworkID := string(pod.UID) + network.Name
if _, exist := d.guidPodNetworkMap[podGUID]; exist {
if podNetworkID != d.guidPodNetworkMap[podGUID] {
Expand All @@ -854,8 +902,8 @@ func (d *daemon) initPool() error {
}
continue
}

if err = d.guidPool.AllocateGUID(podGUID); err != nil {
podPkey, _ := utils.GetPodNetworkPkey(network)
if err = d.guidPool.AllocateGUID(podGUID, podPkey); err != nil {
err = fmt.Errorf("failed to allocate guid for running pod: %v", err)
log.Error().Msgf("%v", err)
continue
Expand All @@ -865,5 +913,39 @@ func (d *daemon) initPool() error {
}
}

// Now sync with subnet manager and clean up stale GUIDs
return d.syncWithSubnetManager()
}

// syncWithSubnetManager syncs the GUID pool with the subnet manager
// This is used both during initialization and when the pool is exhausted at runtime
func (d *daemon) syncWithSubnetManager() error {
usedGuids, err := d.smClient.ListGuidsInUse()
if err != nil {
return err
}

// Reset guid pool with already allocated guids to avoid collisions
err = d.guidPool.Reset(usedGuids)
if err != nil {
return err
}

// Remove stale GUIDs that are no longer in use by the subnet manager
// This handles cleanup of GUIDs from deleted/finished pods
for allocatedGUID, podNetworkID := range d.guidPodNetworkMap {
if _, found := usedGuids[allocatedGUID]; !found {
// If GUID is not found in the subnet manager's list of used GUIDs,
// it means the pod was deleted/finished and we should clean it up
log.Info().Msgf("removing stale GUID %s for pod network %s", allocatedGUID, podNetworkID)
if err = d.guidPool.ReleaseGUID(allocatedGUID); err != nil {
log.Warn().Msgf("failed to release stale guid \"%s\" with error: %v", allocatedGUID, err)
} else {
delete(d.guidPodNetworkMap, allocatedGUID)
log.Info().Msgf("successfully cleaned up stale GUID %s", allocatedGUID)
}
}
}

return nil
}
29 changes: 29 additions & 0 deletions pkg/daemon/daemon_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2025 NVIDIA CORPORATION & AFFILIATES
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package daemon_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestDaemon(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Daemon Suite")
}
Loading
Loading