From 25eb037095cef6ad33bfe78b967c1566debf46f2 Mon Sep 17 00:00:00 2001 From: Dyanngg Date: Fri, 5 Sep 2025 13:27:11 -0700 Subject: [PATCH 1/3] Fix concurrent Tier creation at the same priority Signed-off-by: Dyanngg --- pkg/apiserver/apiserver.go | 2 + .../networkpolicy/networkpolicy_controller.go | 26 +++ pkg/controller/networkpolicy/validate.go | 187 +++++++++++++++++- pkg/controller/networkpolicy/validate_test.go | 162 +++++++++++++++ test/e2e/antreapolicy_test.go | 51 +++++ 5 files changed, 423 insertions(+), 5 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index c8adca722e1..728750a2003 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -310,6 +310,8 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) { // Get new NetworkPolicyValidator v := controllernetworkpolicy.NewNetworkPolicyValidator(c.networkPolicyController) + // Set up Tier event handlers to notify validator when Tiers are actually created/deleted + c.networkPolicyController.SetupTierEventHandlersForValidator(v) // Install handlers for NetworkPolicy related validation s.Handler.NonGoRestfulMux.HandleFunc("/validate/tier", webhook.HandlerForValidateFunc(v.Validate)) s.Handler.NonGoRestfulMux.HandleFunc("/validate/acnp", webhook.HandlerForValidateFunc(v.Validate)) diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index beed358eab3..01d61870148 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -585,6 +585,32 @@ func NewNetworkPolicyController(kubeClient clientset.Interface, return n } +// SetupTierEventHandlersForValidator sets up event handlers for Tier changes to notify the validator. +// This enables the validator to track when Tiers are actually created/deleted and release priority reservations. +func (n *NetworkPolicyController) SetupTierEventHandlersForValidator(validator *NetworkPolicyValidator) { + for _, tierVal := range validator.tierValidators { + if tv, ok := tierVal.(*tierValidator); ok { + // Add event handler for Tier changes. + // We don't need UpdateFunc for priority tracking since priority updates are not allowed. + n.tierInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if tier, ok := obj.(*secv1beta1.Tier); ok { + klog.V(4).InfoS("Tier created, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority) + tv.OnTierCreate(tier) + } + }, + DeleteFunc: func(obj interface{}) { + if tier, ok := obj.(*secv1beta1.Tier); ok { + klog.V(4).InfoS("Tier deleted, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority) + tv.OnTierDelete(tier) + } + }, + }) + klog.V(2).InfoS("Tier event handlers set up for validator") + } + } +} + func (n *NetworkPolicyController) heartbeat(name string) { if n.heartbeatCh != nil { n.heartbeatCh <- heartbeat{ diff --git a/pkg/controller/networkpolicy/validate.go b/pkg/controller/networkpolicy/validate.go index 91be6b0bd21..1ce08def9bc 100644 --- a/pkg/controller/networkpolicy/validate.go +++ b/pkg/controller/networkpolicy/validate.go @@ -22,6 +22,8 @@ import ( "regexp" "strconv" "strings" + "sync" + "time" admv1 "k8s.io/api/admission/v1" authenticationv1 "k8s.io/api/authentication/v1" @@ -59,6 +61,9 @@ type validator interface { // interface. type resourceValidator struct { networkPolicyController *NetworkPolicyController + // priorityTracker is only used by tierValidator to track priority reservations. + // Other validators leave this as nil. + priorityTracker *tierPriorityTracker } // antreaPolicyValidator implements the validator interface for Antrea-native @@ -74,6 +79,124 @@ type groupValidator resourceValidator // adminPolicyValidator implements the validator interface for the AdminNetworkPolicy resource. type adminPolicyValidator resourceValidator +// tierPriorityTracker manages in-memory tracking of Tier priorities currently being validated/created +// to prevent race conditions in priority overlap detection. +type tierPriorityTracker struct { + mu sync.Mutex + // pendingPriorities tracks priorities that are currently being validated/created + // Key is the priority value, value contains the reservation details + pendingPriorities map[int32]*priorityReservation + // timeout for how long to wait for pending operations + validationTimeout time.Duration + // creationTimeout for how long to wait for actual Tier creation in K8s + creationTimeout time.Duration +} + +// priorityReservation tracks a single priority reservation +type priorityReservation struct { + tierName string + // waitChan is closed when this reservation is released + waitChan chan struct{} + // createdAt tracks when this reservation was made + createdAt time.Time +} + +func newTierPriorityTracker() *tierPriorityTracker { + return &tierPriorityTracker{ + pendingPriorities: make(map[int32]*priorityReservation), + validationTimeout: 30 * time.Second, // timeout for waiting for other validations + creationTimeout: 60 * time.Second, // timeout for waiting for actual K8s creation + } +} + +// reservePriorityForValidation attempts to reserve a priority for validation. +// This reservation will persist until the Tier is actually created in K8s and detected by the informer. +// Returns true if the reservation was successful, false if the priority is already taken. +func (t *tierPriorityTracker) reservePriorityForValidation(priority int32, tierName string) bool { + t.mu.Lock() + defer t.mu.Unlock() + + // Check if priority is already being processed + if existing, exists := t.pendingPriorities[priority]; exists { + // Check if the existing reservation has timed out + if time.Since(existing.createdAt) > t.creationTimeout { + klog.Warningf("Tier priority %d reservation for %s has timed out, allowing new reservation", priority, existing.tierName) + close(existing.waitChan) + delete(t.pendingPriorities, priority) + } else { + klog.V(4).InfoS("Priority is already reserved by another Tier", "priority", priority, "existingTier", existing.tierName) + return false + } + } + + // Reserve the priority + reservation := &priorityReservation{ + tierName: tierName, + waitChan: make(chan struct{}), + createdAt: time.Now(), + } + t.pendingPriorities[priority] = reservation + + klog.V(4).InfoS("Reserved priority for Tier validation", "priority", priority, "tier", tierName) + return true +} + +// waitForPriorityAvailable waits for a priority to become available if it's currently reserved. +// Returns true if the priority becomes available, false on timeout. +func (t *tierPriorityTracker) waitForPriorityAvailable(priority int32) bool { + t.mu.Lock() + existing, exists := t.pendingPriorities[priority] + if !exists { + t.mu.Unlock() + return true // Priority is not currently reserved for creation + } + klog.V(2).InfoS("Tier priority is currently reserved by another tier for creation", "priority", priority, "tier", existing.tierName) + waitChan := existing.waitChan + t.mu.Unlock() + + // Wait for the existing operation to complete + select { + case <-waitChan: + return true + case <-time.After(t.validationTimeout): + klog.Warningf("Timeout waiting for Tier priority %d to become available", priority) + return false + } +} + +// releasePriorityReservation releases a priority reservation when the Tier creation is complete. +// This should be called when the informer detects the new Tier. +func (t *tierPriorityTracker) releasePriorityReservation(priority int32, tierName string) { + t.mu.Lock() + defer t.mu.Unlock() + + if existing, exists := t.pendingPriorities[priority]; exists { + if existing.tierName == tierName { + close(existing.waitChan) + delete(t.pendingPriorities, priority) + klog.V(4).InfoS("Released priority reservation for Tier", "priority", priority, "tier", tierName) + } else { + klog.Warningf("Attempted to release priority %d for Tier %s, but it's reserved by %s", priority, tierName, existing.tierName) + } + } +} + +// cleanupExpiredReservations removes reservations that have exceeded the creation timeout. +// This should be called periodically to prevent memory leaks from failed creations. +func (t *tierPriorityTracker) cleanupExpiredReservations() { + t.mu.Lock() + defer t.mu.Unlock() + + now := time.Now() + for priority, reservation := range t.pendingPriorities { + if now.Sub(reservation.createdAt) > t.creationTimeout { + klog.Warningf("Cleaning up expired priority %d reservation for Tier %s (age: %v)", priority, reservation.tierName, now.Sub(reservation.createdAt)) + close(reservation.waitChan) + delete(t.pendingPriorities, priority) + } + } +} + var ( // reservedTierPriorities stores the reserved priority range from 251, 252, 254 and 255. // The priority 250 is reserved for default Tier but not part of this set in order to be @@ -143,6 +266,7 @@ func NewNetworkPolicyValidator(networkPolicyController *NetworkPolicyController) // tv is an instance of tierValidator to validate Tier resource events. tv := tierValidator{ networkPolicyController: networkPolicyController, + priorityTracker: newTierPriorityTracker(), } // gv is an instance of groupValidator to validate ClusterGroup // resource events. @@ -156,6 +280,10 @@ func NewNetworkPolicyValidator(networkPolicyController *NetworkPolicyController) vr.RegisterTierValidator(&tv) vr.RegisterGroupValidator(&gv) vr.RegisterAdminNetworkPolicyValidator(&av) + + // Start cleanup routine for expired reservations from the tierValidator + go tv.startCleanupRoutine() + return &vr } @@ -498,6 +626,38 @@ func (v *NetworkPolicyValidator) validateTier(curTier, oldTier *crdv1beta1.Tier, return warnings, reason, allowed } +// startCleanupRoutine starts a background routine to clean up expired priority reservations +func (t *tierValidator) startCleanupRoutine() { + if t.priorityTracker == nil { + klog.ErrorS(nil, "Priority tracker is nil, cannot start cleanup routine") + return + } + + ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds + defer ticker.Stop() + + for range ticker.C { + t.priorityTracker.cleanupExpiredReservations() + } +} + +// OnTierCreate should be called when a new Tier is detected by the informer. +// This releases the priority reservation for the created Tier. +func (t *tierValidator) OnTierCreate(tier *crdv1beta1.Tier) { + if tier != nil && t.priorityTracker != nil { + t.priorityTracker.releasePriorityReservation(tier.Spec.Priority, tier.Name) + } +} + +// OnTierDelete should be called when a Tier is deleted. +// This ensures any stale reservations are cleaned up. +func (t *tierValidator) OnTierDelete(tier *crdv1beta1.Tier) { + if tier != nil && t.priorityTracker != nil { + // In case there was a stale reservation, clean it up + t.priorityTracker.releasePriorityReservation(tier.Spec.Priority, tier.Name) + } +} + func (v *antreaPolicyValidator) tierExists(name string) bool { _, err := v.networkPolicyController.tierLister.Get(name) return err == nil @@ -997,15 +1157,32 @@ func (t *tierValidator) createValidate(curObj interface{}, userInfo authenticati return nil, fmt.Sprintf("maximum number of Tiers supported: %d", maxSupportedTiers), false } curTier := curObj.(*crdv1beta1.Tier) + priority := curTier.Spec.Priority + tierName := curTier.Name + // Tier priority must not overlap reserved tier's priority. - if reservedTierPriorities.Has(curTier.Spec.Priority) { - return nil, fmt.Sprintf("tier %s priority %d is reserved", curTier.Name, curTier.Spec.Priority), false + if reservedTierPriorities.Has(priority) { + return nil, fmt.Sprintf("tier %s priority %d is reserved", tierName, priority), false } - // Tier priority must not overlap existing tier's priority - trs, err := t.networkPolicyController.tierInformer.Informer().GetIndexer().ByIndex(PriorityIndex, strconv.FormatInt(int64(curTier.Spec.Priority), 10)) + // The priorityTracker should always be available for tierValidator + if t.priorityTracker == nil { + return nil, "internal error: priority tracker not initialized", false + } + // First, wait for the priority to be available if it's currently reserved + if !t.priorityTracker.waitForPriorityAvailable(priority) { + return nil, fmt.Sprintf("timeout waiting for priority %d to become available", priority), false + } + // Check for priority overlap with existing tiers (while no other validation is in progress) + trs, err := t.networkPolicyController.tierInformer.Informer().GetIndexer().ByIndex(PriorityIndex, strconv.FormatInt(int64(priority), 10)) if err != nil || len(trs) > 0 { - return nil, fmt.Sprintf("tier %s priority %d overlaps with existing Tier", curTier.Name, curTier.Spec.Priority), false + return nil, fmt.Sprintf("tier %s priority %d overlaps with existing Tier", tierName, priority), false + } + // Now reserve the priority for this Tier creation + // This reservation will persist until the Tier is actually created and detected by the informer + if !t.priorityTracker.reservePriorityForValidation(priority, tierName) { + return nil, fmt.Sprintf("failed to reserve priority %d for Tier %s (already taken)", priority, tierName), false } + klog.InfoS("Reserved priority for Tier creation", "priority", priority, "tier", tierName) return nil, "", true } diff --git a/pkg/controller/networkpolicy/validate_test.go b/pkg/controller/networkpolicy/validate_test.go index eb67dd6b6e3..6db9a64cc17 100644 --- a/pkg/controller/networkpolicy/validate_test.go +++ b/pkg/controller/networkpolicy/validate_test.go @@ -16,9 +16,12 @@ package networkpolicy import ( "fmt" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" admv1 "k8s.io/api/admission/v1" authenticationv1 "k8s.io/api/authentication/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -2873,6 +2876,165 @@ func TestValidateTier(t *testing.T) { } } +func TestTierPriorityTracker(t *testing.T) { + t.Run("ReservePriorityForValidation", func(t *testing.T) { + tracker := newTierPriorityTracker() + + // Test successful reservation + success := tracker.reservePriorityForValidation(100, "tier1") + assert.True(t, success, "Should successfully reserve priority 100") + + // Test reservation conflict - same priority should fail + success = tracker.reservePriorityForValidation(100, "tier2") + assert.False(t, success, "Should fail to reserve already reserved priority 100") + + // Test different priority should succeed + success = tracker.reservePriorityForValidation(200, "tier3") + assert.True(t, success, "Should successfully reserve different priority 200") + }) + + t.Run("WaitForPriorityAvailable", func(t *testing.T) { + tracker := newTierPriorityTracker() + tracker.validationTimeout = 100 * time.Millisecond // Short timeout for testing + + // Test available priority + available := tracker.waitForPriorityAvailable(100) + assert.True(t, available, "Should return true for available priority") + + // Reserve a priority + tracker.reservePriorityForValidation(200, "tier1") + + // Test waiting for reserved priority (should timeout) + start := time.Now() + available = tracker.waitForPriorityAvailable(200) + elapsed := time.Since(start) + + assert.False(t, available, "Should return false when waiting times out") + assert.GreaterOrEqual(t, elapsed, 100*time.Millisecond, "Should wait for at least the timeout duration") + }) + + t.Run("ReleasePriorityReservation", func(t *testing.T) { + tracker := newTierPriorityTracker() + + // Reserve a priority + success := tracker.reservePriorityForValidation(100, "tier1") + require.True(t, success) + + // Release the reservation + tracker.releasePriorityReservation(100, "tier1") + + // Should be able to reserve again + success = tracker.reservePriorityForValidation(100, "tier2") + assert.True(t, success, "Should be able to reserve priority after release") + + // Test releasing wrong tier name (should not release) + tracker.releasePriorityReservation(100, "wrong-tier") + success = tracker.reservePriorityForValidation(100, "tier3") + assert.False(t, success, "Should not release reservation for wrong tier name") + }) + + t.Run("CleanupExpiredReservations", func(t *testing.T) { + tracker := newTierPriorityTracker() + tracker.creationTimeout = 50 * time.Millisecond // Short timeout for testing + + // Reserve a priority + success := tracker.reservePriorityForValidation(100, "tier1") + require.True(t, success) + + // Wait for expiration + time.Sleep(100 * time.Millisecond) + + // Cleanup expired reservations + tracker.cleanupExpiredReservations() + + // Should be able to reserve again after cleanup + success = tracker.reservePriorityForValidation(100, "tier2") + assert.True(t, success, "Should be able to reserve priority after cleanup") + }) + + t.Run("ConcurrentReservations", func(t *testing.T) { + tracker := newTierPriorityTracker() + tracker.validationTimeout = 200 * time.Millisecond + + var wg sync.WaitGroup + var results sync.Map + + // Try to reserve the same priority from multiple goroutines + for i := 0; i < 5; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + tierName := fmt.Sprintf("tier%d", id) + success := tracker.reservePriorityForValidation(100, tierName) + results.Store(id, success) + }(i) + } + + wg.Wait() + + // Count successful reservations + successCount := 0 + results.Range(func(key, value interface{}) bool { + if value.(bool) { + successCount++ + } + return true + }) + + assert.Equal(t, 1, successCount, "Only one goroutine should successfully reserve the priority") + }) + + t.Run("ConcurrentWaitAndRelease", func(t *testing.T) { + tracker := newTierPriorityTracker() + tracker.validationTimeout = 500 * time.Millisecond + + // Reserve a priority + success := tracker.reservePriorityForValidation(100, "tier1") + require.True(t, success) + + var wg sync.WaitGroup + waitResult := make(chan bool, 1) + + // Start waiting in another goroutine + wg.Add(1) + go func() { + defer wg.Done() + available := tracker.waitForPriorityAvailable(100) + waitResult <- available + }() + // Release after a short delay + go func() { + time.Sleep(100 * time.Millisecond) + tracker.releasePriorityReservation(100, "tier1") + }() + wg.Wait() + + // The waiting should succeed after release + select { + case result := <-waitResult: + assert.True(t, result, "Wait should succeed after priority is released") + case <-time.After(1 * time.Second): + t.Fatal("Wait operation timed out") + } + }) + + t.Run("TimeoutExpiredReservation", func(t *testing.T) { + tracker := newTierPriorityTracker() + tracker.creationTimeout = 50 * time.Millisecond + + // Reserve a priority + success := tracker.reservePriorityForValidation(100, "tier1") + require.True(t, success) + + // Wait for expiration + time.Sleep(100 * time.Millisecond) + + // Try to reserve the same priority with a new tier - should succeed due to timeout + success = tracker.reservePriorityForValidation(100, "tier2") + assert.True(t, success, "Should allow new reservation when existing one has timed out") + }) +} + func TestValidateAdminNetworkPolicy(t *testing.T) { tests := []struct { name string diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index 5f4eca551d4..4d60757bc91 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -345,6 +345,56 @@ func testCreateValidationInvalidTier(t *testing.T) { failOnError(k8sUtils.DeleteTier(tr.Name), t) } +func testCreateValidationTwoTierOfSamePrioritySimultaneous(t *testing.T) { + // This test verifies that the race condition fix prevents two Tiers with the same priority + // from being created simultaneously. Only one should succeed. + const testPriority = int32(25) + tier1Name := "concurrent-tier-1" + tier2Name := "concurrent-tier-2" + + // Use channels to coordinate simultaneous creation attempts + startCh := make(chan struct{}) + result1Ch := make(chan error, 1) + result2Ch := make(chan error, 1) + + // Start first Tier creation in a goroutine + go func() { + <-startCh // Wait for signal to start + _, err := k8sUtils.CreateTier(tier1Name, testPriority) + result1Ch <- err + }() + // Start second Tier creation in another goroutine + go func() { + <-startCh // Wait for signal to start + _, err := k8sUtils.CreateTier(tier2Name, testPriority) + result2Ch <- err + }() + + // Signal both goroutines to start simultaneously and collect results + close(startCh) + err1 := <-result1Ch + err2 := <-result2Ch + + // Exactly one should succeed and one should fail + if err1 == nil && err2 == nil { + k8sUtils.DeleteTier(tier1Name) + k8sUtils.DeleteTier(tier2Name) + failOnError(fmt.Errorf("Tiers were both created with the same priority when applied simultaneously"), t) + } else if err1 != nil && err2 != nil { + // Both failed - this is unexpected + failOnError(fmt.Errorf("Both Tier creations failed: tier1 error: %v, tier2 error:%v", err1, err2), t) + } else { + // One succeeded, one failed - this is the expected behavior + t.Logf("Tier creation results: tier1 error: %v, tier2 error: %v", err1, err2) + // Clean up the successfully created tier + if err1 == nil { + failOnError(k8sUtils.DeleteTier(tier1Name), t) + } else { + failOnError(k8sUtils.DeleteTier(tier2Name), t) + } + } +} + func testCreateValidationInvalidCG(t *testing.T) { invalidErr := fmt.Errorf("ClusterGroup using podSelecter and serviceReference together created") cgBuilder := &ClusterGroupSpecBuilder{} @@ -4577,6 +4627,7 @@ func TestAntreaPolicy(t *testing.T) { t.Run("Case=CreateInvalidACNP", func(t *testing.T) { testCreateValidationInvalidACNP(t) }) t.Run("Case=CreateInvalidANNP", func(t *testing.T) { testCreateValidationInvalidANNP(t) }) t.Run("Case=CreateInvalidTier", func(t *testing.T) { testCreateValidationInvalidTier(t) }) + t.Run("Case=CreateTwoTierOfSamePrioritySimultaneous", func(t *testing.T) { testCreateValidationTwoTierOfSamePrioritySimultaneous(t) }) t.Run("Case=CreateInvalidClusterGroup", func(t *testing.T) { testCreateValidationInvalidCG(t) }) t.Run("Case=CreateInvalidGroup", func(t *testing.T) { testCreateValidationInvalidGroup(t) }) From 123fb5ab6a3d97b7618df76cbb1de185354dbd9d Mon Sep 17 00:00:00 2001 From: Dyanngg Date: Tue, 16 Sep 2025 13:52:07 -0700 Subject: [PATCH 2/3] Addressing comments Signed-off-by: Dyanngg --- pkg/apiserver/apiserver.go | 8 +++-- pkg/controller/networkpolicy/validate.go | 39 +++++++++++++++++------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 728750a2003..26dbbc6eda0 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -310,8 +310,6 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) { // Get new NetworkPolicyValidator v := controllernetworkpolicy.NewNetworkPolicyValidator(c.networkPolicyController) - // Set up Tier event handlers to notify validator when Tiers are actually created/deleted - c.networkPolicyController.SetupTierEventHandlersForValidator(v) // Install handlers for NetworkPolicy related validation s.Handler.NonGoRestfulMux.HandleFunc("/validate/tier", webhook.HandlerForValidateFunc(v.Validate)) s.Handler.NonGoRestfulMux.HandleFunc("/validate/acnp", webhook.HandlerForValidateFunc(v.Validate)) @@ -331,6 +329,12 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) { }() return nil }) + + // Start the NetworkPolicyValidator background routines + s.AddPostStartHook("start-validator-routines", func(context genericapiserver.PostStartHookContext) error { + go v.Run(context.Done()) + return nil + }) } if features.DefaultFeatureGate.Enabled(features.Egress) || features.DefaultFeatureGate.Enabled(features.ServiceExternalIP) { diff --git a/pkg/controller/networkpolicy/validate.go b/pkg/controller/networkpolicy/validate.go index 1ce08def9bc..061af49290e 100644 --- a/pkg/controller/networkpolicy/validate.go +++ b/pkg/controller/networkpolicy/validate.go @@ -104,7 +104,7 @@ type priorityReservation struct { func newTierPriorityTracker() *tierPriorityTracker { return &tierPriorityTracker{ pendingPriorities: make(map[int32]*priorityReservation), - validationTimeout: 30 * time.Second, // timeout for waiting for other validations + validationTimeout: 5 * time.Second, // timeout for waiting for other validations creationTimeout: 60 * time.Second, // timeout for waiting for actual K8s creation } } @@ -120,7 +120,7 @@ func (t *tierPriorityTracker) reservePriorityForValidation(priority int32, tierN if existing, exists := t.pendingPriorities[priority]; exists { // Check if the existing reservation has timed out if time.Since(existing.createdAt) > t.creationTimeout { - klog.Warningf("Tier priority %d reservation for %s has timed out, allowing new reservation", priority, existing.tierName) + klog.V(2).InfoS("Tier priority reservation for has timed out, allowing new reservation", "priority", priority, "tier", existing.tierName) close(existing.waitChan) delete(t.pendingPriorities, priority) } else { @@ -159,7 +159,7 @@ func (t *tierPriorityTracker) waitForPriorityAvailable(priority int32) bool { case <-waitChan: return true case <-time.After(t.validationTimeout): - klog.Warningf("Timeout waiting for Tier priority %d to become available", priority) + klog.InfoS("Timeout waiting for Tier priority to become available", "priority", priority) return false } } @@ -176,7 +176,7 @@ func (t *tierPriorityTracker) releasePriorityReservation(priority int32, tierNam delete(t.pendingPriorities, priority) klog.V(4).InfoS("Released priority reservation for Tier", "priority", priority, "tier", tierName) } else { - klog.Warningf("Attempted to release priority %d for Tier %s, but it's reserved by %s", priority, tierName, existing.tierName) + klog.InfoS("Attempted to release priority for a different Tier", "priority", priority, "tier", tierName, "existingTier", existing.tierName) } } } @@ -190,7 +190,8 @@ func (t *tierPriorityTracker) cleanupExpiredReservations() { now := time.Now() for priority, reservation := range t.pendingPriorities { if now.Sub(reservation.createdAt) > t.creationTimeout { - klog.Warningf("Cleaning up expired priority %d reservation for Tier %s (age: %v)", priority, reservation.tierName, now.Sub(reservation.createdAt)) + klog.V(4).InfoS("Cleaning up expired priority reservation for Tier", + "priority", priority, "tier", reservation.tierName, "age", now.Sub(reservation.createdAt)) close(reservation.waitChan) delete(t.pendingPriorities, priority) } @@ -281,12 +282,22 @@ func NewNetworkPolicyValidator(networkPolicyController *NetworkPolicyController) vr.RegisterGroupValidator(&gv) vr.RegisterAdminNetworkPolicyValidator(&av) - // Start cleanup routine for expired reservations from the tierValidator - go tv.startCleanupRoutine() - + // Set up Tier event handlers to notify validator when Tiers are actually created/deleted + networkPolicyController.SetupTierEventHandlersForValidator(&vr) return &vr } +// Run starts the background routines for the NetworkPolicyValidator. +func (v *NetworkPolicyValidator) Run(stopCh <-chan struct{}) { + // Start cleanup routine for expired reservations from the tierValidator + for _, val := range v.tierValidators { + if tv, ok := val.(*tierValidator); ok { + go tv.startCleanupRoutine(stopCh) + break + } + } +} + // Validate function validates a Group, ClusterGroup, Tier or Antrea Policy object func (v *NetworkPolicyValidator) Validate(ar *admv1.AdmissionReview) *admv1.AdmissionResponse { var result *metav1.Status @@ -627,7 +638,7 @@ func (v *NetworkPolicyValidator) validateTier(curTier, oldTier *crdv1beta1.Tier, } // startCleanupRoutine starts a background routine to clean up expired priority reservations -func (t *tierValidator) startCleanupRoutine() { +func (t *tierValidator) startCleanupRoutine(stopCh <-chan struct{}) { if t.priorityTracker == nil { klog.ErrorS(nil, "Priority tracker is nil, cannot start cleanup routine") return @@ -636,8 +647,14 @@ func (t *tierValidator) startCleanupRoutine() { ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds defer ticker.Stop() - for range ticker.C { - t.priorityTracker.cleanupExpiredReservations() + for { + select { + case <-ticker.C: + t.priorityTracker.cleanupExpiredReservations() + case <-stopCh: + klog.InfoS("Stopping Tier priority tracker cleanup routine") + return + } } } From 47aa97a4c81b62e8622ab18de6ecade585ef7fc8 Mon Sep 17 00:00:00 2001 From: Dyanngg Date: Wed, 17 Sep 2025 15:46:13 -0700 Subject: [PATCH 3/3] Addressing additional comments Signed-off-by: Dyanngg --- .../networkpolicy/networkpolicy_controller.go | 38 +++++----- pkg/controller/networkpolicy/validate.go | 74 +++++-------------- pkg/controller/networkpolicy/validate_test.go | 55 -------------- 3 files changed, 37 insertions(+), 130 deletions(-) diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index 01d61870148..a483715d431 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -588,26 +588,24 @@ func NewNetworkPolicyController(kubeClient clientset.Interface, // SetupTierEventHandlersForValidator sets up event handlers for Tier changes to notify the validator. // This enables the validator to track when Tiers are actually created/deleted and release priority reservations. func (n *NetworkPolicyController) SetupTierEventHandlersForValidator(validator *NetworkPolicyValidator) { - for _, tierVal := range validator.tierValidators { - if tv, ok := tierVal.(*tierValidator); ok { - // Add event handler for Tier changes. - // We don't need UpdateFunc for priority tracking since priority updates are not allowed. - n.tierInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if tier, ok := obj.(*secv1beta1.Tier); ok { - klog.V(4).InfoS("Tier created, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority) - tv.OnTierCreate(tier) - } - }, - DeleteFunc: func(obj interface{}) { - if tier, ok := obj.(*secv1beta1.Tier); ok { - klog.V(4).InfoS("Tier deleted, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority) - tv.OnTierDelete(tier) - } - }, - }) - klog.V(2).InfoS("Tier event handlers set up for validator") - } + for _, tv := range validator.tierValidators { + // Add event handler for Tier changes. + // We don't need UpdateFunc for priority tracking since priority updates are not allowed. + n.tierInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if tier, ok := obj.(*secv1beta1.Tier); ok { + klog.V(4).InfoS("Tier created, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority) + tv.OnTierCreate(tier) + } + }, + DeleteFunc: func(obj interface{}) { + if tier, ok := obj.(*secv1beta1.Tier); ok { + klog.V(4).InfoS("Tier deleted, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority) + tv.OnTierDelete(tier) + } + }, + }) + klog.V(2).InfoS("Tier event handlers set up for validator") } } diff --git a/pkg/controller/networkpolicy/validate.go b/pkg/controller/networkpolicy/validate.go index 061af49290e..b4130f8322c 100644 --- a/pkg/controller/networkpolicy/validate.go +++ b/pkg/controller/networkpolicy/validate.go @@ -61,24 +61,24 @@ type validator interface { // interface. type resourceValidator struct { networkPolicyController *NetworkPolicyController - // priorityTracker is only used by tierValidator to track priority reservations. - // Other validators leave this as nil. - priorityTracker *tierPriorityTracker } // antreaPolicyValidator implements the validator interface for Antrea-native // policies. type antreaPolicyValidator resourceValidator -// tierValidator implements the validator interface for Tier resources. -type tierValidator resourceValidator - // groupValidator implements the validator interface for the ClusterGroup resource. type groupValidator resourceValidator // adminPolicyValidator implements the validator interface for the AdminNetworkPolicy resource. type adminPolicyValidator resourceValidator +// tierValidator implements the validator interface for Tier resources. +type tierValidator struct { + networkPolicyController *NetworkPolicyController + priorityTracker *tierPriorityTracker +} + // tierPriorityTracker manages in-memory tracking of Tier priorities currently being validated/created // to prevent race conditions in priority overlap detection. type tierPriorityTracker struct { @@ -86,8 +86,6 @@ type tierPriorityTracker struct { // pendingPriorities tracks priorities that are currently being validated/created // Key is the priority value, value contains the reservation details pendingPriorities map[int32]*priorityReservation - // timeout for how long to wait for pending operations - validationTimeout time.Duration // creationTimeout for how long to wait for actual Tier creation in K8s creationTimeout time.Duration } @@ -95,8 +93,6 @@ type tierPriorityTracker struct { // priorityReservation tracks a single priority reservation type priorityReservation struct { tierName string - // waitChan is closed when this reservation is released - waitChan chan struct{} // createdAt tracks when this reservation was made createdAt time.Time } @@ -104,8 +100,7 @@ type priorityReservation struct { func newTierPriorityTracker() *tierPriorityTracker { return &tierPriorityTracker{ pendingPriorities: make(map[int32]*priorityReservation), - validationTimeout: 5 * time.Second, // timeout for waiting for other validations - creationTimeout: 60 * time.Second, // timeout for waiting for actual K8s creation + creationTimeout: 30 * time.Second, // timeout for waiting for actual K8s creation } } @@ -120,8 +115,7 @@ func (t *tierPriorityTracker) reservePriorityForValidation(priority int32, tierN if existing, exists := t.pendingPriorities[priority]; exists { // Check if the existing reservation has timed out if time.Since(existing.createdAt) > t.creationTimeout { - klog.V(2).InfoS("Tier priority reservation for has timed out, allowing new reservation", "priority", priority, "tier", existing.tierName) - close(existing.waitChan) + klog.V(2).InfoS("Tier priority reservation has timed out, allowing new reservation", "priority", priority, "tier", existing.tierName) delete(t.pendingPriorities, priority) } else { klog.V(4).InfoS("Priority is already reserved by another Tier", "priority", priority, "existingTier", existing.tierName) @@ -132,7 +126,6 @@ func (t *tierPriorityTracker) reservePriorityForValidation(priority int32, tierN // Reserve the priority reservation := &priorityReservation{ tierName: tierName, - waitChan: make(chan struct{}), createdAt: time.Now(), } t.pendingPriorities[priority] = reservation @@ -141,29 +134,6 @@ func (t *tierPriorityTracker) reservePriorityForValidation(priority int32, tierN return true } -// waitForPriorityAvailable waits for a priority to become available if it's currently reserved. -// Returns true if the priority becomes available, false on timeout. -func (t *tierPriorityTracker) waitForPriorityAvailable(priority int32) bool { - t.mu.Lock() - existing, exists := t.pendingPriorities[priority] - if !exists { - t.mu.Unlock() - return true // Priority is not currently reserved for creation - } - klog.V(2).InfoS("Tier priority is currently reserved by another tier for creation", "priority", priority, "tier", existing.tierName) - waitChan := existing.waitChan - t.mu.Unlock() - - // Wait for the existing operation to complete - select { - case <-waitChan: - return true - case <-time.After(t.validationTimeout): - klog.InfoS("Timeout waiting for Tier priority to become available", "priority", priority) - return false - } -} - // releasePriorityReservation releases a priority reservation when the Tier creation is complete. // This should be called when the informer detects the new Tier. func (t *tierPriorityTracker) releasePriorityReservation(priority int32, tierName string) { @@ -172,7 +142,6 @@ func (t *tierPriorityTracker) releasePriorityReservation(priority int32, tierNam if existing, exists := t.pendingPriorities[priority]; exists { if existing.tierName == tierName { - close(existing.waitChan) delete(t.pendingPriorities, priority) klog.V(4).InfoS("Released priority reservation for Tier", "priority", priority, "tier", tierName) } else { @@ -192,7 +161,6 @@ func (t *tierPriorityTracker) cleanupExpiredReservations() { if now.Sub(reservation.createdAt) > t.creationTimeout { klog.V(4).InfoS("Cleaning up expired priority reservation for Tier", "priority", priority, "tier", reservation.tierName, "age", now.Sub(reservation.createdAt)) - close(reservation.waitChan) delete(t.pendingPriorities, priority) } } @@ -222,7 +190,7 @@ func (v *NetworkPolicyValidator) RegisterAntreaPolicyValidator(a validator) { // RegisterTierValidator registers a Tier validator to the resource registry. // A new validator must be registered by calling this function before the Run // phase of the APIServer. -func (v *NetworkPolicyValidator) RegisterTierValidator(t validator) { +func (v *NetworkPolicyValidator) RegisterTierValidator(t *tierValidator) { v.tierValidators = append(v.tierValidators, t) } @@ -245,7 +213,7 @@ type NetworkPolicyValidator struct { antreaPolicyValidators []validator // tierValidators maintains a list of validator objects which // implement the validator interface for Tier resources. - tierValidators []validator + tierValidators []*tierValidator // groupValidators maintains a list of validator objects which // implement the validator interface for ClusterGroup resources. groupValidators []validator @@ -290,11 +258,9 @@ func NewNetworkPolicyValidator(networkPolicyController *NetworkPolicyController) // Run starts the background routines for the NetworkPolicyValidator. func (v *NetworkPolicyValidator) Run(stopCh <-chan struct{}) { // Start cleanup routine for expired reservations from the tierValidator - for _, val := range v.tierValidators { - if tv, ok := val.(*tierValidator); ok { - go tv.startCleanupRoutine(stopCh) - break - } + for _, tv := range v.tierValidators { + go tv.startCleanupRoutine(stopCh) + break // We only expect one tierValidator } } @@ -1185,19 +1151,17 @@ func (t *tierValidator) createValidate(curObj interface{}, userInfo authenticati if t.priorityTracker == nil { return nil, "internal error: priority tracker not initialized", false } - // First, wait for the priority to be available if it's currently reserved - if !t.priorityTracker.waitForPriorityAvailable(priority) { - return nil, fmt.Sprintf("timeout waiting for priority %d to become available", priority), false - } - // Check for priority overlap with existing tiers (while no other validation is in progress) + + // Check for priority overlap with existing tiers first trs, err := t.networkPolicyController.tierInformer.Informer().GetIndexer().ByIndex(PriorityIndex, strconv.FormatInt(int64(priority), 10)) if err != nil || len(trs) > 0 { return nil, fmt.Sprintf("tier %s priority %d overlaps with existing Tier", tierName, priority), false } - // Now reserve the priority for this Tier creation - // This reservation will persist until the Tier is actually created and detected by the informer + + // Attempt to reserve the priority for this Tier creation + // This will fail immediately if the priority is already reserved by another validation if !t.priorityTracker.reservePriorityForValidation(priority, tierName) { - return nil, fmt.Sprintf("failed to reserve priority %d for Tier %s (already taken)", priority, tierName), false + return nil, fmt.Sprintf("tier priority %d is currently being validated by another request, please retry", priority), false } klog.InfoS("Reserved priority for Tier creation", "priority", priority, "tier", tierName) return nil, "", true diff --git a/pkg/controller/networkpolicy/validate_test.go b/pkg/controller/networkpolicy/validate_test.go index 6db9a64cc17..bac37d6e80b 100644 --- a/pkg/controller/networkpolicy/validate_test.go +++ b/pkg/controller/networkpolicy/validate_test.go @@ -2893,26 +2893,6 @@ func TestTierPriorityTracker(t *testing.T) { assert.True(t, success, "Should successfully reserve different priority 200") }) - t.Run("WaitForPriorityAvailable", func(t *testing.T) { - tracker := newTierPriorityTracker() - tracker.validationTimeout = 100 * time.Millisecond // Short timeout for testing - - // Test available priority - available := tracker.waitForPriorityAvailable(100) - assert.True(t, available, "Should return true for available priority") - - // Reserve a priority - tracker.reservePriorityForValidation(200, "tier1") - - // Test waiting for reserved priority (should timeout) - start := time.Now() - available = tracker.waitForPriorityAvailable(200) - elapsed := time.Since(start) - - assert.False(t, available, "Should return false when waiting times out") - assert.GreaterOrEqual(t, elapsed, 100*time.Millisecond, "Should wait for at least the timeout duration") - }) - t.Run("ReleasePriorityReservation", func(t *testing.T) { tracker := newTierPriorityTracker() @@ -2954,7 +2934,6 @@ func TestTierPriorityTracker(t *testing.T) { t.Run("ConcurrentReservations", func(t *testing.T) { tracker := newTierPriorityTracker() - tracker.validationTimeout = 200 * time.Millisecond var wg sync.WaitGroup var results sync.Map @@ -2984,40 +2963,6 @@ func TestTierPriorityTracker(t *testing.T) { assert.Equal(t, 1, successCount, "Only one goroutine should successfully reserve the priority") }) - t.Run("ConcurrentWaitAndRelease", func(t *testing.T) { - tracker := newTierPriorityTracker() - tracker.validationTimeout = 500 * time.Millisecond - - // Reserve a priority - success := tracker.reservePriorityForValidation(100, "tier1") - require.True(t, success) - - var wg sync.WaitGroup - waitResult := make(chan bool, 1) - - // Start waiting in another goroutine - wg.Add(1) - go func() { - defer wg.Done() - available := tracker.waitForPriorityAvailable(100) - waitResult <- available - }() - // Release after a short delay - go func() { - time.Sleep(100 * time.Millisecond) - tracker.releasePriorityReservation(100, "tier1") - }() - wg.Wait() - - // The waiting should succeed after release - select { - case result := <-waitResult: - assert.True(t, result, "Wait should succeed after priority is released") - case <-time.After(1 * time.Second): - t.Fatal("Wait operation timed out") - } - }) - t.Run("TimeoutExpiredReservation", func(t *testing.T) { tracker := newTierPriorityTracker() tracker.creationTimeout = 50 * time.Millisecond