Skip to content

Commit c0d7b2d

Browse files
Merge pull request #176 from winsopc/ib-k8s-nad-add-support
NAD: add-only support without pod restart
2 parents 7475f50 + f1a6744 commit c0d7b2d

File tree

7 files changed

+372
-34
lines changed

7 files changed

+372
-34
lines changed

deployment/ib-kubernetes.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ rules:
1515
verbs: ["get", "list", "patch", "watch"]
1616
- apiGroups: ["k8s.cni.cncf.io"]
1717
resources: ["*"]
18-
verbs: ["get"]
18+
verbs: ["get", "list", "watch"]
1919
# Leader election permissions
2020
- apiGroups: ["coordination.k8s.io"]
2121
resources: ["leases"]

pkg/daemon/daemon.go

Lines changed: 83 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,15 @@ type Daemon interface {
5555

5656
type daemon struct {
5757
config config.DaemonConfig
58-
watcher watcher.Watcher
58+
podWatcher watcher.Watcher
59+
nadWatcher watcher.Watcher // NAD watcher for network definition changes
5960
kubeClient k8sClient.Client
6061
guidPool guid.Pool
6162
smClient plugins.SubnetManagerClient
6263
guidPodNetworkMap map[string]string // allocated guid mapped to the pod and network
64+
65+
// NAD add-only cache
66+
nadCache map[string]*v1.NetworkAttachmentDefinition // network ID -> NAD
6367
}
6468

6569
// Temporary struct used to proceed pods' networks
@@ -109,6 +113,7 @@ func NewDaemon() (Daemon, error) {
109113
}
110114

111115
podEventHandler := resEvenHandler.NewPodEventHandler()
116+
nadEventHandler := resEvenHandler.NewNADEventHandler()
112117
client, err := k8sClient.NewK8sClient()
113118
if err != nil {
114119
return nil, err
@@ -151,13 +156,16 @@ func NewDaemon() (Daemon, error) {
151156
}
152157

153158
podWatcher := watcher.NewWatcher(podEventHandler, client)
159+
nadWatcher := watcher.NewWatcher(nadEventHandler, client)
154160
return &daemon{
155161
config: daemonConfig,
156-
watcher: podWatcher,
162+
podWatcher: podWatcher,
163+
nadWatcher: nadWatcher,
157164
kubeClient: client,
158165
guidPool: guidPool,
159166
smClient: smClient,
160167
guidPodNetworkMap: make(map[string]string),
168+
nadCache: make(map[string]*v1.NetworkAttachmentDefinition),
161169
}, nil
162170
}
163171

@@ -289,11 +297,14 @@ func (d *daemon) runLeaderLogic() {
289297

290298
go wait.Until(d.AddPeriodicUpdate, time.Duration(d.config.PeriodicUpdate)*time.Second, stopPeriodicsChan)
291299
go wait.Until(d.DeletePeriodicUpdate, time.Duration(d.config.PeriodicUpdate)*time.Second, stopPeriodicsChan)
300+
go wait.Until(d.ProcessNADChanges, time.Duration(d.config.PeriodicUpdate)*time.Second, stopPeriodicsChan)
292301
defer close(stopPeriodicsChan)
293302

294-
// Run Watcher in background, calling watcherStopFunc() will stop the watcher
295-
watcherStopFunc := d.watcher.RunBackground()
296-
defer watcherStopFunc()
303+
// Run both watchers in background
304+
podWatcherStopFunc := d.podWatcher.RunBackground()
305+
nadWatcherStopFunc := d.nadWatcher.RunBackground()
306+
defer podWatcherStopFunc()
307+
defer nadWatcherStopFunc()
297308

298309
// Run until interrupted by os signals
299310
sigChan := make(chan os.Signal, 1)
@@ -303,26 +314,16 @@ func (d *daemon) runLeaderLogic() {
303314
}
304315

305316
// If network identified by networkID is IbSriov return network name and spec
306-
//
307-
//nolint:nilerr
308317
func (d *daemon) getIbSriovNetwork(networkID string) (string, *utils.IbSriovCniSpec, error) {
309-
networkNamespace, networkName, err := utils.ParseNetworkID(networkID)
318+
_, networkName, err := utils.ParseNetworkID(networkID)
310319
if err != nil {
311320
return "", nil, fmt.Errorf("failed to parse network id %s with error: %v", networkID, err)
312321
}
313322

314-
// Try to get net-attach-def in backoff loop
315-
var netAttInfo *v1.NetworkAttachmentDefinition
316-
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
317-
netAttInfo, err = d.kubeClient.GetNetworkAttachmentDefinition(networkNamespace, networkName)
318-
if err != nil {
319-
log.Warn().Msgf("failed to get networkName attachment %s with error %v",
320-
networkName, err)
321-
return false, nil
322-
}
323-
return true, nil
324-
}); err != nil {
325-
return "", nil, fmt.Errorf("failed to get networkName attachment %s", networkName)
323+
// Try to get net-attach-def from cache first, then fallback to API
324+
netAttInfo, err := d.getCachedNAD(networkID)
325+
if err != nil {
326+
return "", nil, fmt.Errorf("failed to get network attachment %s: %v", networkName, err)
326327
}
327328
log.Debug().Msgf("networkName attachment %v", netAttInfo)
328329

@@ -551,7 +552,7 @@ func (d *daemon) updatePodNetworkAnnotation(pi *podNetworkInfo, removedList *[]n
551552
//nolint:nilerr
552553
func (d *daemon) AddPeriodicUpdate() {
553554
log.Info().Msgf("running periodic add update")
554-
addMap, _ := d.watcher.GetHandler().GetResults()
555+
addMap, _ := d.podWatcher.GetHandler().GetResults()
555556
addMap.Lock()
556557
defer addMap.Unlock()
557558
// Contains ALL pods' networks
@@ -569,12 +570,10 @@ func (d *daemon) AddPeriodicUpdate() {
569570
if len(pods) == 0 {
570571
continue
571572
}
572-
573-
log.Info().Msgf("processing network networkID %s", networkID)
574573
networkName, ibCniSpec, err := d.getIbSriovNetwork(networkID)
575574
if err != nil {
576-
addMap.UnSafeRemove(networkID)
577-
log.Error().Msgf("droping network: %v", err)
575+
// Do not drop the network; keep for next periodic run when NAD becomes available
576+
log.Warn().Msgf("NAD not ready for network %s: %v (will retry)", networkID, err)
578577
continue
579578
}
580579

@@ -678,7 +677,7 @@ func getAllPodGUIDsForNetwork(pod *kapi.Pod, networkName string) ([]net.Hardware
678677
//nolint:nilerr
679678
func (d *daemon) DeletePeriodicUpdate() {
680679
log.Info().Msg("running delete periodic update")
681-
_, deleteMap := d.watcher.GetHandler().GetResults()
680+
_, deleteMap := d.podWatcher.GetHandler().GetResults()
682681
deleteMap.Lock()
683682
defer deleteMap.Unlock()
684683
for networkID, podsInterface := range deleteMap.Items {
@@ -753,6 +752,64 @@ func (d *daemon) DeletePeriodicUpdate() {
753752
log.Info().Msg("delete periodic update finished")
754753
}
755754

755+
// ProcessNADChanges processes NAD add events
756+
func (d *daemon) ProcessNADChanges() {
757+
log.Debug().Msg("Processing NAD changes...")
758+
759+
nadHandler := d.nadWatcher.GetHandler().(*resEvenHandler.NADEventHandler)
760+
addedNADs, _ := nadHandler.GetResults()
761+
762+
// Process NAD add events only
763+
addedNADs.Lock()
764+
for networkID, nad := range addedNADs.Items {
765+
nadObj := nad.(*v1.NetworkAttachmentDefinition)
766+
767+
// Add-only: cache the NAD; ignore updates/deletes
768+
d.nadCache[networkID] = nadObj
769+
770+
log.Info().Msgf("Successfully processed NAD event: %s", networkID)
771+
772+
// Remove processed item
773+
addedNADs.UnSafeRemove(networkID)
774+
}
775+
addedNADs.Unlock()
776+
777+
log.Debug().Msg("NAD changes processing completed")
778+
}
779+
780+
// getCachedNAD retrieves NAD from cache, falling back to API if not cached
781+
func (d *daemon) getCachedNAD(networkID string) (*v1.NetworkAttachmentDefinition, error) {
782+
// First check cache
783+
if nad, exists := d.nadCache[networkID]; exists {
784+
return nad, nil
785+
}
786+
787+
// Fall back to API call (existing behavior)
788+
networkNamespace, networkName, err := utils.ParseNetworkID(networkID)
789+
if err != nil {
790+
return nil, fmt.Errorf("failed to parse network id %s with error: %v", networkID, err)
791+
}
792+
793+
var netAttInfo *v1.NetworkAttachmentDefinition
794+
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
795+
var getErr error
796+
netAttInfo, getErr = d.kubeClient.GetNetworkAttachmentDefinition(networkNamespace, networkName)
797+
if getErr != nil {
798+
log.Warn().Msgf("failed to get network attachment %s with error %v", networkName, getErr)
799+
// keep retrying until backoff exhausted
800+
return false, nil
801+
}
802+
return true, nil
803+
}); err != nil {
804+
return nil, fmt.Errorf("failed to get network attachment %s", networkName)
805+
}
806+
807+
// Cache the result
808+
d.nadCache[networkID] = netAttInfo
809+
810+
return netAttInfo, nil
811+
}
812+
756813
// initPool check the guids that are already allocated by the running pods
757814
func (d *daemon) initPool() error {
758815
log.Info().Msg("Initializing GUID pool.")

pkg/k8s-client/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Client interface {
4040
GetNetworkAttachmentDefinition(namespace, name string) (*netapi.NetworkAttachmentDefinition, error)
4141
GetRestClient() rest.Interface
4242
GetCoordinationV1() coordv1client.CoordinationV1Interface
43+
GetNetClient() netclient.K8sCniCncfIoV1Interface
4344
}
4445

4546
type client struct {
@@ -120,3 +121,8 @@ func (c *client) GetRestClient() rest.Interface {
120121
func (c *client) GetCoordinationV1() coordv1client.CoordinationV1Interface {
121122
return c.clientset.CoordinationV1()
122123
}
124+
125+
// GetNetClient returns the network attachment definition client for NAD watcher
126+
func (c *client) GetNetClient() netclient.K8sCniCncfIoV1Interface {
127+
return c.netClient
128+
}

pkg/k8s-client/mocks/Client.go

Lines changed: 28 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/watcher/handler/nad.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright 2025 NVIDIA CORPORATION & AFFILIATES
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// SPDX-License-Identifier: Apache-2.0
16+
17+
package handler
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"sync"
23+
24+
v1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
25+
"github.com/rs/zerolog/log"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
29+
"github.com/Mellanox/ib-kubernetes/pkg/utils"
30+
)
31+
32+
type NADEventHandler struct {
33+
addedNADs *utils.SynchronizedMap // Maps network ID to NAD for added NADs
34+
nadCache sync.Map // Cache of current NADs by namespace/name
35+
}
36+
37+
// NewNADEventHandler creates a new NAD event handler
38+
func NewNADEventHandler() ResourceEventHandler {
39+
return &NADEventHandler{
40+
addedNADs: utils.NewSynchronizedMap(),
41+
nadCache: sync.Map{},
42+
}
43+
}
44+
45+
// GetResourceObject returns the NAD object type for the watcher
46+
func (n *NADEventHandler) GetResourceObject() runtime.Object {
47+
return &v1.NetworkAttachmentDefinition{
48+
TypeMeta: metav1.TypeMeta{
49+
Kind: "NetworkAttachmentDefinition",
50+
APIVersion: "k8s.cni.cncf.io/v1",
51+
},
52+
}
53+
}
54+
55+
// OnAdd handles NAD creation events
56+
func (n *NADEventHandler) OnAdd(obj interface{}, _ bool) {
57+
nad := obj.(*v1.NetworkAttachmentDefinition)
58+
log.Info().Msgf("NAD add event: namespace %s name %s", nad.Namespace, nad.Name)
59+
60+
// Only handle InfiniBand SR-IOV networks
61+
if !n.isInfiniBandNetwork(nad) {
62+
log.Debug().Msgf("NAD %s/%s is not an InfiniBand network", nad.Namespace, nad.Name)
63+
return
64+
}
65+
66+
networkID := fmt.Sprintf("%s_%s", nad.Namespace, nad.Name)
67+
68+
// Cache the NAD for future reference
69+
n.nadCache.Store(networkID, nad)
70+
71+
// Add to processing queue
72+
n.addedNADs.Set(networkID, nad)
73+
74+
log.Info().Msgf("Successfully processed NAD add event: %s", networkID)
75+
}
76+
77+
// OnUpdate is a no-op for add-only support
78+
func (n *NADEventHandler) OnUpdate(oldObj, newObj interface{}) {}
79+
80+
// OnDelete is a no-op for add-only support
81+
func (n *NADEventHandler) OnDelete(obj interface{}) {}
82+
83+
// GetResults returns the results maps for processing by the daemon
84+
func (n *NADEventHandler) GetResults() (*utils.SynchronizedMap, *utils.SynchronizedMap) {
85+
return n.addedNADs, nil
86+
}
87+
88+
// GetNADFromCache retrieves a cached NAD by network ID
89+
func (n *NADEventHandler) GetNADFromCache(networkID string) (*v1.NetworkAttachmentDefinition, bool) {
90+
if nad, ok := n.nadCache.Load(networkID); ok {
91+
return nad.(*v1.NetworkAttachmentDefinition), true
92+
}
93+
return nil, false
94+
}
95+
96+
// isInfiniBandNetwork checks if the NAD is for InfiniBand SR-IOV
97+
func (n *NADEventHandler) isInfiniBandNetwork(nad *v1.NetworkAttachmentDefinition) bool {
98+
// Parse the network configuration
99+
var networkConfig map[string]interface{}
100+
if err := json.Unmarshal([]byte(nad.Spec.Config), &networkConfig); err != nil {
101+
log.Error().Msgf("Failed to parse NAD config for %s/%s: %v", nad.Namespace, nad.Name, err)
102+
return false
103+
}
104+
105+
// Check if this is an ib-sriov network
106+
if cniType, ok := networkConfig["type"]; ok {
107+
return cniType == utils.InfiniBandSriovCni
108+
}
109+
110+
return false
111+
}

0 commit comments

Comments
 (0)