Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions pkg/peers/memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
DefaultSuspicionMult = 4
// DefaultRetransmitMult is the multiplier for the number of retransmissions
DefaultRetransmitMult = 4
// DefaultReconcileInterval is the interval between peer reconciliation checks
DefaultReconcileInterval = 60 * time.Second
)

type MemberlistPeers struct {
Expand Down Expand Up @@ -225,6 +227,43 @@ func (m *MemberlistPeers) LocalPort() int {
return int(m.list.LocalNode().Port)
}

// ReconcilePeers joins any peers not already known to the memberlist.
// It compares the provided peer addresses with current members and only
// attempts to join truly unknown nodes, preventing split-brain scenarios.
func (m *MemberlistPeers) ReconcilePeers(ctx context.Context, peers []string) error {
if !m.started.Load() || m.list == nil {
return fmt.Errorf("memberlist not started")
}

log := klog.FromContext(ctx)

knownAddrs := make(map[string]struct{})
for _, member := range m.list.Members() {
addr := fmt.Sprintf("%s:%d", member.Addr.String(), member.Port)
knownAddrs[addr] = struct{}{}
}

var unknownPeers []string
for _, p := range peers {
if _, known := knownAddrs[p]; !known {
unknownPeers = append(unknownPeers, p)
}
}

if len(unknownPeers) == 0 {
return nil
}

log.Info("reconciling unknown peers", "count", len(unknownPeers), "peers", unknownPeers)
joined, err := m.list.Join(unknownPeers)
if err != nil {
log.Error(err, "failed to join some peers during reconciliation", "joined", joined)
return err
}
log.Info("successfully reconciled peers", "joined", joined)
return nil
}

// eventDelegate handles memberlist membership change events
type eventDelegate struct {
parent *MemberlistPeers
Expand Down
71 changes: 71 additions & 0 deletions pkg/peers/memberlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,74 @@ func TestMemberlistPeers_Join_PartialFailure(t *testing.T) {

assert.True(t, peer.started.Load())
}

// TestMemberlistPeers_ReconcilePeers_JoinsUnknownNode tests that ReconcilePeers joins a node not in the memberlist
func TestMemberlistPeers_ReconcilePeers_JoinsUnknownNode(t *testing.T) {
peer1, port1 := createTestPeer(t, "reconcile-node-1")
peer2, port2 := createTestPeer(t, "reconcile-node-2")

ctx := context.Background()

// Start both nodes independently (no initial join — simulates split-brain)
err := peer1.Start(ctx, "127.0.0.1", port1, nil)
require.NoError(t, err)
defer func() { _ = peer1.Stop() }()

err = peer2.Start(ctx, "127.0.0.1", port2, nil)
require.NoError(t, err)
defer func() { _ = peer2.Stop() }()

// Verify they don't know each other
assert.Empty(t, peer1.GetPeers())
assert.Empty(t, peer2.GetPeers())

// Reconcile peer1 with peer2's address
err = peer1.ReconcilePeers(ctx, []string{fmt.Sprintf("127.0.0.1:%d", port2)})
require.NoError(t, err)

// Wait for gossip propagation
assert.Eventually(t, func() bool {
return len(peer1.GetPeers()) == 1
}, 5*time.Second, 100*time.Millisecond, "peer1 should discover peer2 after reconciliation")

assert.Eventually(t, func() bool {
return len(peer2.GetPeers()) == 1
}, 5*time.Second, 100*time.Millisecond, "peer2 should discover peer1 after reconciliation")
}

// TestMemberlistPeers_ReconcilePeers_SkipsKnownNode tests that ReconcilePeers skips already-known nodes
func TestMemberlistPeers_ReconcilePeers_SkipsKnownNode(t *testing.T) {
peer1, port1 := createTestPeer(t, "reconcile-known-1")
peer2, port2 := createTestPeer(t, "reconcile-known-2")

ctx := context.Background()

err := peer1.Start(ctx, "127.0.0.1", port1, nil)
require.NoError(t, err)
defer func() { _ = peer1.Stop() }()

err = peer2.Start(ctx, "127.0.0.1", port2, []string{fmt.Sprintf("127.0.0.1:%d", port1)})
require.NoError(t, err)
defer func() { _ = peer2.Stop() }()

// Wait for them to discover each other
assert.Eventually(t, func() bool {
return len(peer1.GetPeers()) == 1
}, 5*time.Second, 100*time.Millisecond)

// Reconcile with already-known peer — should be a no-op and return nil
err = peer1.ReconcilePeers(ctx, []string{fmt.Sprintf("127.0.0.1:%d", port2)})
assert.NoError(t, err)

// Peer count should remain 1
assert.Len(t, peer1.GetPeers(), 1)
}

// TestMemberlistPeers_ReconcilePeers_NotStarted tests that ReconcilePeers returns error when not started
func TestMemberlistPeers_ReconcilePeers_NotStarted(t *testing.T) {
peer := NewMemberlistPeers("not-started-reconcile")

err := peer.ReconcilePeers(context.Background(), []string{"127.0.0.1:9999"})
require.Error(t, err)
assert.Contains(t, err.Error(), "not started")
}
4 changes: 4 additions & 0 deletions pkg/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ type Peers interface {

// LocalPort returns the local node's port
LocalPort() int

// ReconcilePeers joins any peers not already known to the memberlist.
// It is safe to call periodically to heal split-brain scenarios.
ReconcilePeers(ctx context.Context, peers []string) error
}
76 changes: 76 additions & 0 deletions pkg/peers/reconcile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright 2026.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package peers

import (
"context"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"github.com/openkruise/agents/pkg/utils"
)

// reconcileInterval is a package-level variable for testability.
// Tests can override it to avoid waiting the full 60s between ticks.
var reconcileInterval = DefaultReconcileInterval

// RunPeerReconciliation starts a background goroutine that periodically lists
// peer pods from the Kubernetes API and reconciles them into the memberlist.
// This prevents split-brain scenarios when the initial memberlist join fails.
// If namespace or labelSelector is empty, the goroutine returns immediately.
func RunPeerReconciliation(ctx context.Context, client kubernetes.Interface, pm Peers,
namespace, labelSelector, localIP string, bindPort int) {
if namespace == "" || labelSelector == "" {
return
}

log := klog.FromContext(ctx)
ticker := time.NewTicker(reconcileInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
peerList, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
log.Error(err, "failed to list peer pods during reconciliation")
continue
}
var peerAddrs []string
for _, peer := range peerList.Items {
ip := peer.Status.PodIP
if ip == "" || ip == localIP || utils.IsLoopbackIP(ip) {
continue
}
peerAddrs = append(peerAddrs, fmt.Sprintf("%s:%d", ip, bindPort))
}
if len(peerAddrs) > 0 {
if err = pm.ReconcilePeers(ctx, peerAddrs); err != nil {
log.V(4).Info("peer reconciliation completed with errors", "error", err)
}
}
}
}
}
160 changes: 160 additions & 0 deletions pkg/peers/reconcile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
Copyright 2026.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package peers

import (
"context"
"net"
"sync"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

"github.com/stretchr/testify/assert"
)

// mockPeers implements Peers interface for testing.
type mockPeers struct {
reconcileFunc func(ctx context.Context, peers []string) error
}

func (m *mockPeers) Start(_ context.Context, _ string, _ int, _ []string) error {
return nil
}

func (m *mockPeers) Stop() error { return nil }

func (m *mockPeers) GetPeers() []Peer { return nil }

func (m *mockPeers) GetAllMembers() []Peer { return nil }

func (m *mockPeers) WaitForPeers(_ context.Context, _ int) error { return nil }

func (m *mockPeers) LocalAddr() net.IP { return nil }

func (m *mockPeers) LocalPort() int { return 0 }

func (m *mockPeers) ReconcilePeers(ctx context.Context, peers []string) error {
if m.reconcileFunc != nil {
return m.reconcileFunc(ctx, peers)
}
return nil
}

// TestRunPeerReconciliation_EmptyNamespace verifies that an empty namespace causes
// an immediate return without starting the ticker loop.
func TestRunPeerReconciliation_EmptyNamespace(t *testing.T) {
RunPeerReconciliation(context.Background(), nil, &mockPeers{}, "", "x=y", "10.0.0.1", 7946)
// If we reach here without hanging, the early return works.
}

// TestRunPeerReconciliation_EmptyLabelSelector verifies that an empty labelSelector
// causes an immediate return.
func TestRunPeerReconciliation_EmptyLabelSelector(t *testing.T) {
RunPeerReconciliation(context.Background(), nil, &mockPeers{}, "ns", "", "10.0.0.1", 7946)
}

// TestRunPeerReconciliation_CancelledContext verifies that a cancelled context
// causes an immediate return from the select loop.
func TestRunPeerReconciliation_CancelledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately

RunPeerReconciliation(ctx, nil, &mockPeers{}, "ns", "x=y", "10.0.0.1", 7946)
}

// TestRunPeerReconciliation_FullFlow verifies the reconciliation flow:
// - Self IP, loopback IP, and empty PodIP are filtered out
// - Valid peer IPs are formatted as ip:port and passed to ReconcilePeers
// - A short test interval is used to avoid waiting 60s
func TestRunPeerReconciliation_FullFlow(t *testing.T) {
// Override the reconcile interval for fast testing
oldInterval := reconcileInterval
reconcileInterval = 10 * time.Millisecond
t.Cleanup(func() { reconcileInterval = oldInterval })

localIP := "10.0.0.1"
bindPort := 7946

// Set up fake K8s client with a mix of pods.
// Pods must carry the label that matches the selector used in RunPeerReconciliation.
labels := map[string]string{"app": "peer"}
fakeClient := fake.NewSimpleClientset()
pods := []*corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "self", Labels: labels}, Status: corev1.PodStatus{PodIP: localIP}},
{ObjectMeta: metav1.ObjectMeta{Name: "peer-a", Labels: labels}, Status: corev1.PodStatus{PodIP: "10.0.0.2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "peer-b", Labels: labels}, Status: corev1.PodStatus{PodIP: "10.0.0.3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "loopback", Labels: labels}, Status: corev1.PodStatus{PodIP: "127.0.0.1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "no-ip", Labels: labels}, Status: corev1.PodStatus{PodIP: ""}},
}
for _, pod := range pods {
_, err := fakeClient.CoreV1().Pods("ns").Create(context.Background(), pod, metav1.CreateOptions{})
assert.NoError(t, err)
}

// Track what ReconcilePeers receives
var (
mu sync.Mutex
gotAddresses []string
)
reconciled := make(chan struct{})

mock := &mockPeers{
reconcileFunc: func(_ context.Context, peers []string) error {
mu.Lock()
gotAddresses = peers
mu.Unlock()
// Signal first reconciliation via channel (safe to close once)
select {
case <-reconciled:
default:
close(reconciled)
}
return nil
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go RunPeerReconciliation(ctx, fakeClient, mock, "ns", "app=peer", localIP, bindPort)

// Wait for first tick to fire (10ms ticker), with timeout guard
select {
case <-reconciled:
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for first reconciliation")
}

mu.Lock()
defer mu.Unlock()

// Should not include self
assert.NotContains(t, gotAddresses, "10.0.0.1:7946")
// Should not include loopback
assert.NotContains(t, gotAddresses, "127.0.0.1:7946")
// Should not include empty-ip pod
assert.NotContains(t, gotAddresses, ":7946")
// Should include valid peers
assert.Contains(t, gotAddresses, "10.0.0.2:7946")
assert.Contains(t, gotAddresses, "10.0.0.3:7946")
// Should have exactly 2 valid peers
assert.Len(t, gotAddresses, 2)
}
3 changes: 2 additions & 1 deletion pkg/proxy/ext_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,15 @@ func (s *Server) handleRequestHeaders(requestHeaders *extProcPb.ProcessingReques
}
log.Info("request mapped", "sandboxID", sandboxID, "sandboxPort", sandboxPort, "extraHeaders", extraHeaders)

errorMsg := fmt.Sprintf("healthy sandbox %s not found", sandboxID)
route, ok := s.LoadRoute(sandboxID)
if !ok {
log.Info("route not found", "sandboxID", sandboxID)
errorMsg := fmt.Sprintf("sandbox %s route not found", sandboxID)
return s.logAndCreateErrorResponse(http.StatusBadGateway, errorMsg, log)
}
if route.State != agentsv1alpha1.SandboxStateRunning {
log.Info("sandbox is not running", "sandboxID", sandboxID, "route", route)
errorMsg := fmt.Sprintf("sandbox %s is not running (state: %s)", sandboxID, route.State)
return s.logAndCreateErrorResponse(http.StatusBadGateway, errorMsg, log)
}
if extraHeaders == nil {
Expand Down
Loading
Loading