diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index c8adca722e1..26dbbc6eda0 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -329,6 +329,12 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) { }() return nil }) + + // Start the NetworkPolicyValidator background routines + s.AddPostStartHook("start-validator-routines", func(context genericapiserver.PostStartHookContext) error { + go v.Run(context.Done()) + return nil + }) } if features.DefaultFeatureGate.Enabled(features.Egress) || features.DefaultFeatureGate.Enabled(features.ServiceExternalIP) { diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index beed358eab3..a483715d431 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -585,6 +585,30 @@ func NewNetworkPolicyController(kubeClient clientset.Interface, return n } +// SetupTierEventHandlersForValidator sets up event handlers for Tier changes to notify the validator. +// This enables the validator to track when Tiers are actually created/deleted and release priority reservations. +func (n *NetworkPolicyController) SetupTierEventHandlersForValidator(validator *NetworkPolicyValidator) { + for _, tv := range validator.tierValidators { + // Add event handler for Tier changes. + // We don't need UpdateFunc for priority tracking since priority updates are not allowed. + n.tierInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if tier, ok := obj.(*secv1beta1.Tier); ok { + klog.V(4).InfoS("Tier created, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority) + tv.OnTierCreate(tier) + } + }, + DeleteFunc: func(obj interface{}) { + if tier, ok := obj.(*secv1beta1.Tier); ok { + klog.V(4).InfoS("Tier deleted, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority) + tv.OnTierDelete(tier) + } + }, + }) + klog.V(2).InfoS("Tier event handlers set up for validator") + } +} + func (n *NetworkPolicyController) heartbeat(name string) { if n.heartbeatCh != nil { n.heartbeatCh <- heartbeat{ diff --git a/pkg/controller/networkpolicy/validate.go b/pkg/controller/networkpolicy/validate.go index 91be6b0bd21..b4130f8322c 100644 --- a/pkg/controller/networkpolicy/validate.go +++ b/pkg/controller/networkpolicy/validate.go @@ -22,6 +22,8 @@ import ( "regexp" "strconv" "strings" + "sync" + "time" admv1 "k8s.io/api/admission/v1" authenticationv1 "k8s.io/api/authentication/v1" @@ -65,15 +67,105 @@ type resourceValidator struct { // policies. type antreaPolicyValidator resourceValidator -// tierValidator implements the validator interface for Tier resources. -type tierValidator resourceValidator - // groupValidator implements the validator interface for the ClusterGroup resource. type groupValidator resourceValidator // adminPolicyValidator implements the validator interface for the AdminNetworkPolicy resource. type adminPolicyValidator resourceValidator +// tierValidator implements the validator interface for Tier resources. +type tierValidator struct { + networkPolicyController *NetworkPolicyController + priorityTracker *tierPriorityTracker +} + +// tierPriorityTracker manages in-memory tracking of Tier priorities currently being validated/created +// to prevent race conditions in priority overlap detection. +type tierPriorityTracker struct { + mu sync.Mutex + // pendingPriorities tracks priorities that are currently being validated/created + // Key is the priority value, value contains the reservation details + pendingPriorities map[int32]*priorityReservation + // creationTimeout for how long to wait for actual Tier creation in K8s + creationTimeout time.Duration +} + +// priorityReservation tracks a single priority reservation +type priorityReservation struct { + tierName string + // createdAt tracks when this reservation was made + createdAt time.Time +} + +func newTierPriorityTracker() *tierPriorityTracker { + return &tierPriorityTracker{ + pendingPriorities: make(map[int32]*priorityReservation), + creationTimeout: 30 * time.Second, // timeout for waiting for actual K8s creation + } +} + +// reservePriorityForValidation attempts to reserve a priority for validation. +// This reservation will persist until the Tier is actually created in K8s and detected by the informer. +// Returns true if the reservation was successful, false if the priority is already taken. +func (t *tierPriorityTracker) reservePriorityForValidation(priority int32, tierName string) bool { + t.mu.Lock() + defer t.mu.Unlock() + + // Check if priority is already being processed + if existing, exists := t.pendingPriorities[priority]; exists { + // Check if the existing reservation has timed out + if time.Since(existing.createdAt) > t.creationTimeout { + klog.V(2).InfoS("Tier priority reservation has timed out, allowing new reservation", "priority", priority, "tier", existing.tierName) + delete(t.pendingPriorities, priority) + } else { + klog.V(4).InfoS("Priority is already reserved by another Tier", "priority", priority, "existingTier", existing.tierName) + return false + } + } + + // Reserve the priority + reservation := &priorityReservation{ + tierName: tierName, + createdAt: time.Now(), + } + t.pendingPriorities[priority] = reservation + + klog.V(4).InfoS("Reserved priority for Tier validation", "priority", priority, "tier", tierName) + return true +} + +// releasePriorityReservation releases a priority reservation when the Tier creation is complete. +// This should be called when the informer detects the new Tier. +func (t *tierPriorityTracker) releasePriorityReservation(priority int32, tierName string) { + t.mu.Lock() + defer t.mu.Unlock() + + if existing, exists := t.pendingPriorities[priority]; exists { + if existing.tierName == tierName { + delete(t.pendingPriorities, priority) + klog.V(4).InfoS("Released priority reservation for Tier", "priority", priority, "tier", tierName) + } else { + klog.InfoS("Attempted to release priority for a different Tier", "priority", priority, "tier", tierName, "existingTier", existing.tierName) + } + } +} + +// cleanupExpiredReservations removes reservations that have exceeded the creation timeout. +// This should be called periodically to prevent memory leaks from failed creations. +func (t *tierPriorityTracker) cleanupExpiredReservations() { + t.mu.Lock() + defer t.mu.Unlock() + + now := time.Now() + for priority, reservation := range t.pendingPriorities { + if now.Sub(reservation.createdAt) > t.creationTimeout { + klog.V(4).InfoS("Cleaning up expired priority reservation for Tier", + "priority", priority, "tier", reservation.tierName, "age", now.Sub(reservation.createdAt)) + delete(t.pendingPriorities, priority) + } + } +} + var ( // reservedTierPriorities stores the reserved priority range from 251, 252, 254 and 255. // The priority 250 is reserved for default Tier but not part of this set in order to be @@ -98,7 +190,7 @@ func (v *NetworkPolicyValidator) RegisterAntreaPolicyValidator(a validator) { // RegisterTierValidator registers a Tier validator to the resource registry. // A new validator must be registered by calling this function before the Run // phase of the APIServer. -func (v *NetworkPolicyValidator) RegisterTierValidator(t validator) { +func (v *NetworkPolicyValidator) RegisterTierValidator(t *tierValidator) { v.tierValidators = append(v.tierValidators, t) } @@ -121,7 +213,7 @@ type NetworkPolicyValidator struct { antreaPolicyValidators []validator // tierValidators maintains a list of validator objects which // implement the validator interface for Tier resources. - tierValidators []validator + tierValidators []*tierValidator // groupValidators maintains a list of validator objects which // implement the validator interface for ClusterGroup resources. groupValidators []validator @@ -143,6 +235,7 @@ func NewNetworkPolicyValidator(networkPolicyController *NetworkPolicyController) // tv is an instance of tierValidator to validate Tier resource events. tv := tierValidator{ networkPolicyController: networkPolicyController, + priorityTracker: newTierPriorityTracker(), } // gv is an instance of groupValidator to validate ClusterGroup // resource events. @@ -156,9 +249,21 @@ func NewNetworkPolicyValidator(networkPolicyController *NetworkPolicyController) vr.RegisterTierValidator(&tv) vr.RegisterGroupValidator(&gv) vr.RegisterAdminNetworkPolicyValidator(&av) + + // Set up Tier event handlers to notify validator when Tiers are actually created/deleted + networkPolicyController.SetupTierEventHandlersForValidator(&vr) return &vr } +// Run starts the background routines for the NetworkPolicyValidator. +func (v *NetworkPolicyValidator) Run(stopCh <-chan struct{}) { + // Start cleanup routine for expired reservations from the tierValidator + for _, tv := range v.tierValidators { + go tv.startCleanupRoutine(stopCh) + break // We only expect one tierValidator + } +} + // Validate function validates a Group, ClusterGroup, Tier or Antrea Policy object func (v *NetworkPolicyValidator) Validate(ar *admv1.AdmissionReview) *admv1.AdmissionResponse { var result *metav1.Status @@ -498,6 +603,44 @@ func (v *NetworkPolicyValidator) validateTier(curTier, oldTier *crdv1beta1.Tier, return warnings, reason, allowed } +// startCleanupRoutine starts a background routine to clean up expired priority reservations +func (t *tierValidator) startCleanupRoutine(stopCh <-chan struct{}) { + if t.priorityTracker == nil { + klog.ErrorS(nil, "Priority tracker is nil, cannot start cleanup routine") + return + } + + ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds + defer ticker.Stop() + + for { + select { + case <-ticker.C: + t.priorityTracker.cleanupExpiredReservations() + case <-stopCh: + klog.InfoS("Stopping Tier priority tracker cleanup routine") + return + } + } +} + +// OnTierCreate should be called when a new Tier is detected by the informer. +// This releases the priority reservation for the created Tier. +func (t *tierValidator) OnTierCreate(tier *crdv1beta1.Tier) { + if tier != nil && t.priorityTracker != nil { + t.priorityTracker.releasePriorityReservation(tier.Spec.Priority, tier.Name) + } +} + +// OnTierDelete should be called when a Tier is deleted. +// This ensures any stale reservations are cleaned up. +func (t *tierValidator) OnTierDelete(tier *crdv1beta1.Tier) { + if tier != nil && t.priorityTracker != nil { + // In case there was a stale reservation, clean it up + t.priorityTracker.releasePriorityReservation(tier.Spec.Priority, tier.Name) + } +} + func (v *antreaPolicyValidator) tierExists(name string) bool { _, err := v.networkPolicyController.tierLister.Get(name) return err == nil @@ -997,15 +1140,30 @@ func (t *tierValidator) createValidate(curObj interface{}, userInfo authenticati return nil, fmt.Sprintf("maximum number of Tiers supported: %d", maxSupportedTiers), false } curTier := curObj.(*crdv1beta1.Tier) + priority := curTier.Spec.Priority + tierName := curTier.Name + // Tier priority must not overlap reserved tier's priority. - if reservedTierPriorities.Has(curTier.Spec.Priority) { - return nil, fmt.Sprintf("tier %s priority %d is reserved", curTier.Name, curTier.Spec.Priority), false + if reservedTierPriorities.Has(priority) { + return nil, fmt.Sprintf("tier %s priority %d is reserved", tierName, priority), false + } + // The priorityTracker should always be available for tierValidator + if t.priorityTracker == nil { + return nil, "internal error: priority tracker not initialized", false } - // Tier priority must not overlap existing tier's priority - trs, err := t.networkPolicyController.tierInformer.Informer().GetIndexer().ByIndex(PriorityIndex, strconv.FormatInt(int64(curTier.Spec.Priority), 10)) + + // Check for priority overlap with existing tiers first + trs, err := t.networkPolicyController.tierInformer.Informer().GetIndexer().ByIndex(PriorityIndex, strconv.FormatInt(int64(priority), 10)) if err != nil || len(trs) > 0 { - return nil, fmt.Sprintf("tier %s priority %d overlaps with existing Tier", curTier.Name, curTier.Spec.Priority), false + return nil, fmt.Sprintf("tier %s priority %d overlaps with existing Tier", tierName, priority), false + } + + // Attempt to reserve the priority for this Tier creation + // This will fail immediately if the priority is already reserved by another validation + if !t.priorityTracker.reservePriorityForValidation(priority, tierName) { + return nil, fmt.Sprintf("tier priority %d is currently being validated by another request, please retry", priority), false } + klog.InfoS("Reserved priority for Tier creation", "priority", priority, "tier", tierName) return nil, "", true } diff --git a/pkg/controller/networkpolicy/validate_test.go b/pkg/controller/networkpolicy/validate_test.go index eb67dd6b6e3..bac37d6e80b 100644 --- a/pkg/controller/networkpolicy/validate_test.go +++ b/pkg/controller/networkpolicy/validate_test.go @@ -16,9 +16,12 @@ package networkpolicy import ( "fmt" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" admv1 "k8s.io/api/admission/v1" authenticationv1 "k8s.io/api/authentication/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -2873,6 +2876,110 @@ func TestValidateTier(t *testing.T) { } } +func TestTierPriorityTracker(t *testing.T) { + t.Run("ReservePriorityForValidation", func(t *testing.T) { + tracker := newTierPriorityTracker() + + // Test successful reservation + success := tracker.reservePriorityForValidation(100, "tier1") + assert.True(t, success, "Should successfully reserve priority 100") + + // Test reservation conflict - same priority should fail + success = tracker.reservePriorityForValidation(100, "tier2") + assert.False(t, success, "Should fail to reserve already reserved priority 100") + + // Test different priority should succeed + success = tracker.reservePriorityForValidation(200, "tier3") + assert.True(t, success, "Should successfully reserve different priority 200") + }) + + t.Run("ReleasePriorityReservation", func(t *testing.T) { + tracker := newTierPriorityTracker() + + // Reserve a priority + success := tracker.reservePriorityForValidation(100, "tier1") + require.True(t, success) + + // Release the reservation + tracker.releasePriorityReservation(100, "tier1") + + // Should be able to reserve again + success = tracker.reservePriorityForValidation(100, "tier2") + assert.True(t, success, "Should be able to reserve priority after release") + + // Test releasing wrong tier name (should not release) + tracker.releasePriorityReservation(100, "wrong-tier") + success = tracker.reservePriorityForValidation(100, "tier3") + assert.False(t, success, "Should not release reservation for wrong tier name") + }) + + t.Run("CleanupExpiredReservations", func(t *testing.T) { + tracker := newTierPriorityTracker() + tracker.creationTimeout = 50 * time.Millisecond // Short timeout for testing + + // Reserve a priority + success := tracker.reservePriorityForValidation(100, "tier1") + require.True(t, success) + + // Wait for expiration + time.Sleep(100 * time.Millisecond) + + // Cleanup expired reservations + tracker.cleanupExpiredReservations() + + // Should be able to reserve again after cleanup + success = tracker.reservePriorityForValidation(100, "tier2") + assert.True(t, success, "Should be able to reserve priority after cleanup") + }) + + t.Run("ConcurrentReservations", func(t *testing.T) { + tracker := newTierPriorityTracker() + + var wg sync.WaitGroup + var results sync.Map + + // Try to reserve the same priority from multiple goroutines + for i := 0; i < 5; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + tierName := fmt.Sprintf("tier%d", id) + success := tracker.reservePriorityForValidation(100, tierName) + results.Store(id, success) + }(i) + } + + wg.Wait() + + // Count successful reservations + successCount := 0 + results.Range(func(key, value interface{}) bool { + if value.(bool) { + successCount++ + } + return true + }) + + assert.Equal(t, 1, successCount, "Only one goroutine should successfully reserve the priority") + }) + + t.Run("TimeoutExpiredReservation", func(t *testing.T) { + tracker := newTierPriorityTracker() + tracker.creationTimeout = 50 * time.Millisecond + + // Reserve a priority + success := tracker.reservePriorityForValidation(100, "tier1") + require.True(t, success) + + // Wait for expiration + time.Sleep(100 * time.Millisecond) + + // Try to reserve the same priority with a new tier - should succeed due to timeout + success = tracker.reservePriorityForValidation(100, "tier2") + assert.True(t, success, "Should allow new reservation when existing one has timed out") + }) +} + func TestValidateAdminNetworkPolicy(t *testing.T) { tests := []struct { name string diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index 5f4eca551d4..4d60757bc91 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -345,6 +345,56 @@ func testCreateValidationInvalidTier(t *testing.T) { failOnError(k8sUtils.DeleteTier(tr.Name), t) } +func testCreateValidationTwoTierOfSamePrioritySimultaneous(t *testing.T) { + // This test verifies that the race condition fix prevents two Tiers with the same priority + // from being created simultaneously. Only one should succeed. + const testPriority = int32(25) + tier1Name := "concurrent-tier-1" + tier2Name := "concurrent-tier-2" + + // Use channels to coordinate simultaneous creation attempts + startCh := make(chan struct{}) + result1Ch := make(chan error, 1) + result2Ch := make(chan error, 1) + + // Start first Tier creation in a goroutine + go func() { + <-startCh // Wait for signal to start + _, err := k8sUtils.CreateTier(tier1Name, testPriority) + result1Ch <- err + }() + // Start second Tier creation in another goroutine + go func() { + <-startCh // Wait for signal to start + _, err := k8sUtils.CreateTier(tier2Name, testPriority) + result2Ch <- err + }() + + // Signal both goroutines to start simultaneously and collect results + close(startCh) + err1 := <-result1Ch + err2 := <-result2Ch + + // Exactly one should succeed and one should fail + if err1 == nil && err2 == nil { + k8sUtils.DeleteTier(tier1Name) + k8sUtils.DeleteTier(tier2Name) + failOnError(fmt.Errorf("Tiers were both created with the same priority when applied simultaneously"), t) + } else if err1 != nil && err2 != nil { + // Both failed - this is unexpected + failOnError(fmt.Errorf("Both Tier creations failed: tier1 error: %v, tier2 error:%v", err1, err2), t) + } else { + // One succeeded, one failed - this is the expected behavior + t.Logf("Tier creation results: tier1 error: %v, tier2 error: %v", err1, err2) + // Clean up the successfully created tier + if err1 == nil { + failOnError(k8sUtils.DeleteTier(tier1Name), t) + } else { + failOnError(k8sUtils.DeleteTier(tier2Name), t) + } + } +} + func testCreateValidationInvalidCG(t *testing.T) { invalidErr := fmt.Errorf("ClusterGroup using podSelecter and serviceReference together created") cgBuilder := &ClusterGroupSpecBuilder{} @@ -4577,6 +4627,7 @@ func TestAntreaPolicy(t *testing.T) { t.Run("Case=CreateInvalidACNP", func(t *testing.T) { testCreateValidationInvalidACNP(t) }) t.Run("Case=CreateInvalidANNP", func(t *testing.T) { testCreateValidationInvalidANNP(t) }) t.Run("Case=CreateInvalidTier", func(t *testing.T) { testCreateValidationInvalidTier(t) }) + t.Run("Case=CreateTwoTierOfSamePrioritySimultaneous", func(t *testing.T) { testCreateValidationTwoTierOfSamePrioritySimultaneous(t) }) t.Run("Case=CreateInvalidClusterGroup", func(t *testing.T) { testCreateValidationInvalidCG(t) }) t.Run("Case=CreateInvalidGroup", func(t *testing.T) { testCreateValidationInvalidGroup(t) })