Skip to content

Commit 7a58f22

Browse files
committed
imp: add stale flows cleanup
clenaup will be done in case the controller will fail to delete flows cleanup logic will list all pods on a local node (using specific cache) for each bridge will check external ids related to rail-cni - pod-uuid=rail_pod_id if the pod doesn't exit it means that there are stale flows creating a cookie from the pod uuid removing the flows removing external-id from the bridge Signed-off-by: Michael Filanov <[email protected]>
1 parent 5e39470 commit 7a58f22

File tree

7 files changed

+614
-13
lines changed

7 files changed

+614
-13
lines changed

cmd/flowscontroller/main.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"crypto/tls"
2222
"flag"
2323
"os"
24+
"time"
2425

2526
"github.com/Mellanox/spectrum-x-operator/internal/controller"
27+
"github.com/Mellanox/spectrum-x-operator/internal/staleflows"
2628
"github.com/Mellanox/spectrum-x-operator/pkg/exec"
2729
"github.com/Mellanox/spectrum-x-operator/pkg/filewatcher"
2830
"github.com/Mellanox/spectrum-x-operator/pkg/lib/netlink"
@@ -160,6 +162,14 @@ func main() {
160162
os.Exit(1)
161163
}
162164

165+
staleFlowsCleaner := &staleflows.Cleaner{
166+
Client: mgr.GetClient(),
167+
Flows: &controller.Flows{Exec: &exec.Exec{}, NetlinkLib: netlink.New()},
168+
CleanupInterval: 5 * time.Minute,
169+
}
170+
171+
staleFlowsCleaner.StartCleanupRoutine(context.Background())
172+
163173
//+kubebuilder:scaffold:builder
164174

165175
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {

internal/controller/flows.go

Lines changed: 84 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ limitations under the License.
1616
package controller
1717

1818
import (
19+
"context"
1920
"fmt"
2021
"strings"
2122

2223
"github.com/Mellanox/spectrum-x-operator/pkg/exec"
2324
libnetlink "github.com/Mellanox/spectrum-x-operator/pkg/lib/netlink"
2425

2526
"go.uber.org/multierr"
27+
"sigs.k8s.io/controller-runtime/pkg/log"
2628
)
2729

2830
const (
@@ -38,6 +40,7 @@ type FlowsAPI interface {
3840
AddPodRailFlows(cookie uint64, vf, bridge, podIP, podMAC string) error
3941
DeletePodRailFlows(cookie uint64, podID string) error
4042
IsBridgeManagedByRailCNI(bridge, podID string) (bool, error)
43+
CleanupStaleFlowsForBridges(ctx context.Context, existingPodUIDs map[string]bool) error
4144
}
4245

4346
var _ FlowsAPI = &Flows{}
@@ -125,23 +128,29 @@ func (f *Flows) DeletePodRailFlows(cookie uint64, podID string) error {
125128
}
126129

127130
if val == RailPodID {
128-
flow := fmt.Sprintf(`ovs-ofctl del-flows %s cookie=0x%x/-1`, bridge, cookie)
129-
out, err := f.Exec.Execute(flow)
130-
if err != nil {
131-
errs = multierr.Append(errs, fmt.Errorf("failed to delete flows for bridge %s: %v, output: %s", bridge, err, out))
132-
continue
133-
}
134-
// clear external id
135-
_, err = f.Exec.Execute(fmt.Sprintf(`ovs-vsctl br-set-external-id %s %s ""`, bridge, podID))
136-
if err != nil {
137-
errs = multierr.Append(errs, fmt.Errorf("failed to clear external id for bridge %s: %v", bridge, err))
131+
if err := f.cleanBridgeFlows(bridge, podID, cookie); err != nil {
132+
errs = multierr.Append(errs, err)
138133
continue
139134
}
140135
}
141136
}
142137
return errs
143138
}
144139

140+
func (f *Flows) cleanBridgeFlows(bridge, podID string, cookie uint64) error {
141+
flow := fmt.Sprintf("ovs-ofctl del-flows %s cookie=0x%x/-1", bridge, cookie)
142+
out, err := f.Exec.Execute(flow)
143+
if err != nil {
144+
return fmt.Errorf("failed to delete flows for bridge %s: %v, output: %s", bridge, err, out)
145+
}
146+
// clear external id
147+
_, err = f.Exec.Execute(fmt.Sprintf("ovs-vsctl remove bridge %s external_ids %s", bridge, podID))
148+
if err != nil {
149+
return fmt.Errorf("failed to clear external id for bridge %s: %v", bridge, err)
150+
}
151+
return nil
152+
}
153+
145154
func (f *Flows) IsBridgeManagedByRailCNI(bridge, podID string) (bool, error) {
146155
out, err := f.Exec.Execute(fmt.Sprintf("ovs-vsctl br-get-external-id %s %s", bridge, podID))
147156
if err != nil {
@@ -168,3 +177,68 @@ func (f *Flows) getTorMac(torIP string) (string, error) {
168177

169178
return reply, nil
170179
}
180+
181+
func (f *Flows) CleanupStaleFlowsForBridges(ctx context.Context, existingPodUIDs map[string]bool) error {
182+
logr := log.FromContext(ctx)
183+
184+
// List all bridges once
185+
out, err := f.Exec.Execute("ovs-vsctl list-br")
186+
if err != nil {
187+
return fmt.Errorf("failed to list bridges: out: %s, err: %v", out, err)
188+
}
189+
190+
bridges := strings.Split(out, "\n")
191+
var errs error
192+
193+
for _, bridge := range bridges {
194+
// Get all external IDs for this bridge once
195+
externalIDsOut, err := f.Exec.Execute(fmt.Sprintf("ovs-vsctl br-get-external-id %s", bridge))
196+
if err != nil {
197+
errs = multierr.Append(errs,
198+
fmt.Errorf("failed to get external ids for bridge %s: out: %s, err: %v", bridge, externalIDsOut, err))
199+
continue
200+
}
201+
202+
// Collect all stale pods for this bridge
203+
var stalePods []struct {
204+
podID string
205+
cookie uint64
206+
}
207+
208+
// Parse external IDs to find pod IDs managed by rail CNI
209+
lines := strings.Split(externalIDsOut, "\n")
210+
for _, line := range lines {
211+
if strings.Contains(line, "="+RailPodID) {
212+
// Extract the key part (pod ID) before the =
213+
parts := strings.Split(line, "=")
214+
if len(parts) >= 2 {
215+
podID := parts[0]
216+
217+
// Check if this pod still exists
218+
if !existingPodUIDs[podID] {
219+
// Pod doesn't exist, add to cleanup list
220+
cookie := GenerateUint64FromString(podID)
221+
stalePods = append(stalePods, struct {
222+
podID string
223+
cookie uint64
224+
}{podID, cookie})
225+
}
226+
}
227+
}
228+
}
229+
230+
if len(stalePods) == 0 {
231+
continue
232+
}
233+
234+
for _, stalePod := range stalePods {
235+
logr.Info("Cleaning up stale flows for bridge", "bridge", bridge, "podID", stalePod.podID, "cookie", stalePod.cookie)
236+
if err := f.cleanBridgeFlows(bridge, stalePod.podID, stalePod.cookie); err != nil {
237+
errs = multierr.Append(errs, err)
238+
continue
239+
}
240+
}
241+
}
242+
243+
return errs
244+
}

internal/controller/flows_test.go

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package controller
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"net"
2223
"strings"
@@ -230,8 +231,8 @@ var _ = Describe("Flows", func() {
230231
Return("rail_pod_id", nil)
231232
execMock.EXPECT().Execute("ovs-ofctl del-flows br-rail1 cookie=0x5/-1").Return("", nil)
232233
execMock.EXPECT().Execute("ovs-ofctl del-flows br-rail2 cookie=0x5/-1").Return("", nil)
233-
execMock.EXPECT().Execute(`ovs-vsctl br-set-external-id br-rail1 test-pod-uid ""`).Return("", nil)
234-
execMock.EXPECT().Execute(`ovs-vsctl br-set-external-id br-rail2 test-pod-uid ""`).Return("", nil)
234+
execMock.EXPECT().Execute("ovs-vsctl remove bridge br-rail1 external_ids test-pod-uid").Return("", nil)
235+
execMock.EXPECT().Execute("ovs-vsctl remove bridge br-rail2 external_ids test-pod-uid").Return("", nil)
235236
err := flows.DeletePodRailFlows(0x5, "test-pod-uid")
236237
Expect(err).Should(Succeed())
237238
})
@@ -273,7 +274,7 @@ var _ = Describe("Flows", func() {
273274
Return("rail_pod_id", nil)
274275
execMock.EXPECT().Execute("ovs-ofctl del-flows br-rail1 cookie=0x5/-1").Return("", nil)
275276
execMock.EXPECT().
276-
Execute(`ovs-vsctl br-set-external-id br-rail1 test-pod-uid ""`).
277+
Execute("ovs-vsctl remove bridge br-rail1 external_ids test-pod-uid").
277278
Return("", fmt.Errorf("failed to clear external id"))
278279

279280
err := flows.DeletePodRailFlows(0x5, "test-pod-uid")
@@ -321,4 +322,71 @@ var _ = Describe("Flows", func() {
321322
Expect(isManaged).Should(BeFalse())
322323
})
323324
})
325+
326+
Context("CleanupStaleFlowsForBridges", func() {
327+
It("should cleanup stale flows for bridges", func() {
328+
execMock.EXPECT().Execute("ovs-vsctl list-br").Return("br-rail1\nbr-rail2", nil)
329+
execMock.EXPECT().Execute("ovs-vsctl br-get-external-id br-rail1").
330+
Return("test-pod-id=rail_pod_id", nil)
331+
execMock.EXPECT().Execute("ovs-vsctl br-get-external-id br-rail2").
332+
Return("test-pod-id=rail_pod_id", nil)
333+
execMock.EXPECT().
334+
Execute(fmt.Sprintf("ovs-ofctl del-flows br-rail1 cookie=0x%x/-1", GenerateUint64FromString("test-pod-id"))).
335+
Return("", nil)
336+
execMock.EXPECT().
337+
Execute(fmt.Sprintf("ovs-ofctl del-flows br-rail2 cookie=0x%x/-1", GenerateUint64FromString("test-pod-id"))).
338+
Return("", nil)
339+
execMock.EXPECT().Execute("ovs-vsctl remove bridge br-rail1 external_ids test-pod-id").Return("", nil)
340+
execMock.EXPECT().Execute("ovs-vsctl remove bridge br-rail2 external_ids test-pod-id").Return("", nil)
341+
err := flows.CleanupStaleFlowsForBridges(context.Background(), map[string]bool{})
342+
Expect(err).Should(Succeed())
343+
})
344+
345+
It("should return error if failed to list bridges", func() {
346+
execMock.EXPECT().Execute("ovs-vsctl list-br").Return("", fmt.Errorf("failed to list bridges"))
347+
err := flows.CleanupStaleFlowsForBridges(context.Background(), map[string]bool{})
348+
Expect(err).Should(HaveOccurred())
349+
})
350+
351+
It("should return error if failed to get external id", func() {
352+
execMock.EXPECT().Execute("ovs-vsctl list-br").Return("br-rail1", nil)
353+
execMock.EXPECT().Execute("ovs-vsctl br-get-external-id br-rail1").
354+
Return("", fmt.Errorf("failed to get external id"))
355+
err := flows.CleanupStaleFlowsForBridges(context.Background(), map[string]bool{})
356+
Expect(err).Should(HaveOccurred())
357+
})
358+
359+
It("should return error if failed to delete flows", func() {
360+
execMock.EXPECT().Execute("ovs-vsctl list-br").Return("br-rail1", nil)
361+
execMock.EXPECT().Execute("ovs-vsctl br-get-external-id br-rail1").
362+
Return("test-pod-id=rail_pod_id", nil)
363+
execMock.EXPECT().
364+
Execute(fmt.Sprintf("ovs-ofctl del-flows br-rail1 cookie=0x%x/-1", GenerateUint64FromString("test-pod-id"))).
365+
Return("", fmt.Errorf("failed to delete flows"))
366+
367+
err := flows.CleanupStaleFlowsForBridges(context.Background(), map[string]bool{})
368+
Expect(err).Should(HaveOccurred())
369+
})
370+
371+
It("should return error if failed to clear external id", func() {
372+
execMock.EXPECT().Execute("ovs-vsctl list-br").Return("br-rail1", nil)
373+
execMock.EXPECT().Execute("ovs-vsctl br-get-external-id br-rail1").
374+
Return("test-pod-id=rail_pod_id", nil)
375+
execMock.EXPECT().
376+
Execute(fmt.Sprintf("ovs-ofctl del-flows br-rail1 cookie=0x%x/-1", GenerateUint64FromString("test-pod-id"))).
377+
Return("", nil)
378+
execMock.EXPECT().Execute("ovs-vsctl remove bridge br-rail1 external_ids test-pod-id").
379+
Return("", fmt.Errorf("failed to clear external id"))
380+
err := flows.CleanupStaleFlowsForBridges(context.Background(), map[string]bool{})
381+
Expect(err).Should(HaveOccurred())
382+
})
383+
384+
It("should succeed if there are no stale pods", func() {
385+
execMock.EXPECT().Execute("ovs-vsctl list-br").Return("br-rail1", nil)
386+
execMock.EXPECT().Execute("ovs-vsctl br-get-external-id br-rail1").
387+
Return("test-pod-id=rail_pod_id", nil)
388+
err := flows.CleanupStaleFlowsForBridges(context.Background(), map[string]bool{"test-pod-id": true})
389+
Expect(err).Should(Succeed())
390+
})
391+
})
324392
})

internal/controller/mock_flows.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
Copyright 2025, NVIDIA CORPORATION & AFFILIATES
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package staleflows
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/Mellanox/spectrum-x-operator/internal/controller"
25+
26+
corev1 "k8s.io/api/core/v1"
27+
"sigs.k8s.io/controller-runtime/pkg/client"
28+
"sigs.k8s.io/controller-runtime/pkg/log"
29+
)
30+
31+
type Cleaner struct {
32+
client.Client
33+
CleanupInterval time.Duration
34+
Flows controller.FlowsAPI
35+
}
36+
37+
// StartCleanupRoutine starts a background goroutine that periodically cleans up stale flows
38+
func (c *Cleaner) StartCleanupRoutine(ctx context.Context) {
39+
go func() {
40+
// Use configured interval or default to 5 minutes
41+
interval := c.CleanupInterval
42+
if interval == 0 {
43+
interval = 5 * time.Minute
44+
}
45+
46+
logr := log.FromContext(ctx).WithName("flow-cleanup")
47+
logr.Info("Starting flow cleanup routine", "interval", interval)
48+
49+
ticker := time.NewTicker(interval)
50+
defer ticker.Stop()
51+
52+
for {
53+
select {
54+
case <-ctx.Done():
55+
logr.Info("Stopping flow cleanup routine")
56+
return
57+
case <-ticker.C:
58+
if err := c.cleanupStaleFlows(ctx); err != nil {
59+
logr.Error(err, "Failed to cleanup stale flows")
60+
}
61+
}
62+
}
63+
}()
64+
}
65+
66+
// cleanupStaleFlows removes flows for pods that no longer exist
67+
func (c *Cleaner) cleanupStaleFlows(ctx context.Context) error {
68+
pods := &corev1.PodList{}
69+
// clinet cache is set to watch pods on the same node as the reconciler
70+
if err := c.List(ctx, pods); err != nil {
71+
return fmt.Errorf("failed to list pods: %w", err)
72+
}
73+
74+
if len(pods.Items) == 0 {
75+
return nil
76+
}
77+
78+
existingPods := make(map[string]bool, len(pods.Items))
79+
for _, pod := range pods.Items {
80+
existingPods[string(pod.UID)] = true
81+
}
82+
83+
if err := c.Flows.CleanupStaleFlowsForBridges(ctx, existingPods); err != nil {
84+
return fmt.Errorf("failed to cleanup stale flows: %w", err)
85+
}
86+
87+
return nil
88+
}

0 commit comments

Comments
 (0)