Skip to content

Commit 25eb037

Browse files
committed
Fix concurrent Tier creation at the same priority
Signed-off-by: Dyanngg <[email protected]>
1 parent ba32aa4 commit 25eb037

File tree

5 files changed

+423
-5
lines changed

5 files changed

+423
-5
lines changed

pkg/apiserver/apiserver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,8 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) {
310310

311311
// Get new NetworkPolicyValidator
312312
v := controllernetworkpolicy.NewNetworkPolicyValidator(c.networkPolicyController)
313+
// Set up Tier event handlers to notify validator when Tiers are actually created/deleted
314+
c.networkPolicyController.SetupTierEventHandlersForValidator(v)
313315
// Install handlers for NetworkPolicy related validation
314316
s.Handler.NonGoRestfulMux.HandleFunc("/validate/tier", webhook.HandlerForValidateFunc(v.Validate))
315317
s.Handler.NonGoRestfulMux.HandleFunc("/validate/acnp", webhook.HandlerForValidateFunc(v.Validate))

pkg/controller/networkpolicy/networkpolicy_controller.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,32 @@ func NewNetworkPolicyController(kubeClient clientset.Interface,
585585
return n
586586
}
587587

588+
// SetupTierEventHandlersForValidator sets up event handlers for Tier changes to notify the validator.
589+
// This enables the validator to track when Tiers are actually created/deleted and release priority reservations.
590+
func (n *NetworkPolicyController) SetupTierEventHandlersForValidator(validator *NetworkPolicyValidator) {
591+
for _, tierVal := range validator.tierValidators {
592+
if tv, ok := tierVal.(*tierValidator); ok {
593+
// Add event handler for Tier changes.
594+
// We don't need UpdateFunc for priority tracking since priority updates are not allowed.
595+
n.tierInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
596+
AddFunc: func(obj interface{}) {
597+
if tier, ok := obj.(*secv1beta1.Tier); ok {
598+
klog.V(4).InfoS("Tier created, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority)
599+
tv.OnTierCreate(tier)
600+
}
601+
},
602+
DeleteFunc: func(obj interface{}) {
603+
if tier, ok := obj.(*secv1beta1.Tier); ok {
604+
klog.V(4).InfoS("Tier deleted, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority)
605+
tv.OnTierDelete(tier)
606+
}
607+
},
608+
})
609+
klog.V(2).InfoS("Tier event handlers set up for validator")
610+
}
611+
}
612+
}
613+
588614
func (n *NetworkPolicyController) heartbeat(name string) {
589615
if n.heartbeatCh != nil {
590616
n.heartbeatCh <- heartbeat{

pkg/controller/networkpolicy/validate.go

Lines changed: 182 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"regexp"
2323
"strconv"
2424
"strings"
25+
"sync"
26+
"time"
2527

2628
admv1 "k8s.io/api/admission/v1"
2729
authenticationv1 "k8s.io/api/authentication/v1"
@@ -59,6 +61,9 @@ type validator interface {
5961
// interface.
6062
type resourceValidator struct {
6163
networkPolicyController *NetworkPolicyController
64+
// priorityTracker is only used by tierValidator to track priority reservations.
65+
// Other validators leave this as nil.
66+
priorityTracker *tierPriorityTracker
6267
}
6368

6469
// antreaPolicyValidator implements the validator interface for Antrea-native
@@ -74,6 +79,124 @@ type groupValidator resourceValidator
7479
// adminPolicyValidator implements the validator interface for the AdminNetworkPolicy resource.
7580
type adminPolicyValidator resourceValidator
7681

82+
// tierPriorityTracker manages in-memory tracking of Tier priorities currently being validated/created
83+
// to prevent race conditions in priority overlap detection.
84+
type tierPriorityTracker struct {
85+
mu sync.Mutex
86+
// pendingPriorities tracks priorities that are currently being validated/created
87+
// Key is the priority value, value contains the reservation details
88+
pendingPriorities map[int32]*priorityReservation
89+
// timeout for how long to wait for pending operations
90+
validationTimeout time.Duration
91+
// creationTimeout for how long to wait for actual Tier creation in K8s
92+
creationTimeout time.Duration
93+
}
94+
95+
// priorityReservation tracks a single priority reservation
96+
type priorityReservation struct {
97+
tierName string
98+
// waitChan is closed when this reservation is released
99+
waitChan chan struct{}
100+
// createdAt tracks when this reservation was made
101+
createdAt time.Time
102+
}
103+
104+
func newTierPriorityTracker() *tierPriorityTracker {
105+
return &tierPriorityTracker{
106+
pendingPriorities: make(map[int32]*priorityReservation),
107+
validationTimeout: 30 * time.Second, // timeout for waiting for other validations
108+
creationTimeout: 60 * time.Second, // timeout for waiting for actual K8s creation
109+
}
110+
}
111+
112+
// reservePriorityForValidation attempts to reserve a priority for validation.
113+
// This reservation will persist until the Tier is actually created in K8s and detected by the informer.
114+
// Returns true if the reservation was successful, false if the priority is already taken.
115+
func (t *tierPriorityTracker) reservePriorityForValidation(priority int32, tierName string) bool {
116+
t.mu.Lock()
117+
defer t.mu.Unlock()
118+
119+
// Check if priority is already being processed
120+
if existing, exists := t.pendingPriorities[priority]; exists {
121+
// Check if the existing reservation has timed out
122+
if time.Since(existing.createdAt) > t.creationTimeout {
123+
klog.Warningf("Tier priority %d reservation for %s has timed out, allowing new reservation", priority, existing.tierName)
124+
close(existing.waitChan)
125+
delete(t.pendingPriorities, priority)
126+
} else {
127+
klog.V(4).InfoS("Priority is already reserved by another Tier", "priority", priority, "existingTier", existing.tierName)
128+
return false
129+
}
130+
}
131+
132+
// Reserve the priority
133+
reservation := &priorityReservation{
134+
tierName: tierName,
135+
waitChan: make(chan struct{}),
136+
createdAt: time.Now(),
137+
}
138+
t.pendingPriorities[priority] = reservation
139+
140+
klog.V(4).InfoS("Reserved priority for Tier validation", "priority", priority, "tier", tierName)
141+
return true
142+
}
143+
144+
// waitForPriorityAvailable waits for a priority to become available if it's currently reserved.
145+
// Returns true if the priority becomes available, false on timeout.
146+
func (t *tierPriorityTracker) waitForPriorityAvailable(priority int32) bool {
147+
t.mu.Lock()
148+
existing, exists := t.pendingPriorities[priority]
149+
if !exists {
150+
t.mu.Unlock()
151+
return true // Priority is not currently reserved for creation
152+
}
153+
klog.V(2).InfoS("Tier priority is currently reserved by another tier for creation", "priority", priority, "tier", existing.tierName)
154+
waitChan := existing.waitChan
155+
t.mu.Unlock()
156+
157+
// Wait for the existing operation to complete
158+
select {
159+
case <-waitChan:
160+
return true
161+
case <-time.After(t.validationTimeout):
162+
klog.Warningf("Timeout waiting for Tier priority %d to become available", priority)
163+
return false
164+
}
165+
}
166+
167+
// releasePriorityReservation releases a priority reservation when the Tier creation is complete.
168+
// This should be called when the informer detects the new Tier.
169+
func (t *tierPriorityTracker) releasePriorityReservation(priority int32, tierName string) {
170+
t.mu.Lock()
171+
defer t.mu.Unlock()
172+
173+
if existing, exists := t.pendingPriorities[priority]; exists {
174+
if existing.tierName == tierName {
175+
close(existing.waitChan)
176+
delete(t.pendingPriorities, priority)
177+
klog.V(4).InfoS("Released priority reservation for Tier", "priority", priority, "tier", tierName)
178+
} else {
179+
klog.Warningf("Attempted to release priority %d for Tier %s, but it's reserved by %s", priority, tierName, existing.tierName)
180+
}
181+
}
182+
}
183+
184+
// cleanupExpiredReservations removes reservations that have exceeded the creation timeout.
185+
// This should be called periodically to prevent memory leaks from failed creations.
186+
func (t *tierPriorityTracker) cleanupExpiredReservations() {
187+
t.mu.Lock()
188+
defer t.mu.Unlock()
189+
190+
now := time.Now()
191+
for priority, reservation := range t.pendingPriorities {
192+
if now.Sub(reservation.createdAt) > t.creationTimeout {
193+
klog.Warningf("Cleaning up expired priority %d reservation for Tier %s (age: %v)", priority, reservation.tierName, now.Sub(reservation.createdAt))
194+
close(reservation.waitChan)
195+
delete(t.pendingPriorities, priority)
196+
}
197+
}
198+
}
199+
77200
var (
78201
// reservedTierPriorities stores the reserved priority range from 251, 252, 254 and 255.
79202
// The priority 250 is reserved for default Tier but not part of this set in order to be
@@ -143,6 +266,7 @@ func NewNetworkPolicyValidator(networkPolicyController *NetworkPolicyController)
143266
// tv is an instance of tierValidator to validate Tier resource events.
144267
tv := tierValidator{
145268
networkPolicyController: networkPolicyController,
269+
priorityTracker: newTierPriorityTracker(),
146270
}
147271
// gv is an instance of groupValidator to validate ClusterGroup
148272
// resource events.
@@ -156,6 +280,10 @@ func NewNetworkPolicyValidator(networkPolicyController *NetworkPolicyController)
156280
vr.RegisterTierValidator(&tv)
157281
vr.RegisterGroupValidator(&gv)
158282
vr.RegisterAdminNetworkPolicyValidator(&av)
283+
284+
// Start cleanup routine for expired reservations from the tierValidator
285+
go tv.startCleanupRoutine()
286+
159287
return &vr
160288
}
161289

@@ -498,6 +626,38 @@ func (v *NetworkPolicyValidator) validateTier(curTier, oldTier *crdv1beta1.Tier,
498626
return warnings, reason, allowed
499627
}
500628

629+
// startCleanupRoutine starts a background routine to clean up expired priority reservations
630+
func (t *tierValidator) startCleanupRoutine() {
631+
if t.priorityTracker == nil {
632+
klog.ErrorS(nil, "Priority tracker is nil, cannot start cleanup routine")
633+
return
634+
}
635+
636+
ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
637+
defer ticker.Stop()
638+
639+
for range ticker.C {
640+
t.priorityTracker.cleanupExpiredReservations()
641+
}
642+
}
643+
644+
// OnTierCreate should be called when a new Tier is detected by the informer.
645+
// This releases the priority reservation for the created Tier.
646+
func (t *tierValidator) OnTierCreate(tier *crdv1beta1.Tier) {
647+
if tier != nil && t.priorityTracker != nil {
648+
t.priorityTracker.releasePriorityReservation(tier.Spec.Priority, tier.Name)
649+
}
650+
}
651+
652+
// OnTierDelete should be called when a Tier is deleted.
653+
// This ensures any stale reservations are cleaned up.
654+
func (t *tierValidator) OnTierDelete(tier *crdv1beta1.Tier) {
655+
if tier != nil && t.priorityTracker != nil {
656+
// In case there was a stale reservation, clean it up
657+
t.priorityTracker.releasePriorityReservation(tier.Spec.Priority, tier.Name)
658+
}
659+
}
660+
501661
func (v *antreaPolicyValidator) tierExists(name string) bool {
502662
_, err := v.networkPolicyController.tierLister.Get(name)
503663
return err == nil
@@ -997,15 +1157,32 @@ func (t *tierValidator) createValidate(curObj interface{}, userInfo authenticati
9971157
return nil, fmt.Sprintf("maximum number of Tiers supported: %d", maxSupportedTiers), false
9981158
}
9991159
curTier := curObj.(*crdv1beta1.Tier)
1160+
priority := curTier.Spec.Priority
1161+
tierName := curTier.Name
1162+
10001163
// Tier priority must not overlap reserved tier's priority.
1001-
if reservedTierPriorities.Has(curTier.Spec.Priority) {
1002-
return nil, fmt.Sprintf("tier %s priority %d is reserved", curTier.Name, curTier.Spec.Priority), false
1164+
if reservedTierPriorities.Has(priority) {
1165+
return nil, fmt.Sprintf("tier %s priority %d is reserved", tierName, priority), false
10031166
}
1004-
// Tier priority must not overlap existing tier's priority
1005-
trs, err := t.networkPolicyController.tierInformer.Informer().GetIndexer().ByIndex(PriorityIndex, strconv.FormatInt(int64(curTier.Spec.Priority), 10))
1167+
// The priorityTracker should always be available for tierValidator
1168+
if t.priorityTracker == nil {
1169+
return nil, "internal error: priority tracker not initialized", false
1170+
}
1171+
// First, wait for the priority to be available if it's currently reserved
1172+
if !t.priorityTracker.waitForPriorityAvailable(priority) {
1173+
return nil, fmt.Sprintf("timeout waiting for priority %d to become available", priority), false
1174+
}
1175+
// Check for priority overlap with existing tiers (while no other validation is in progress)
1176+
trs, err := t.networkPolicyController.tierInformer.Informer().GetIndexer().ByIndex(PriorityIndex, strconv.FormatInt(int64(priority), 10))
10061177
if err != nil || len(trs) > 0 {
1007-
return nil, fmt.Sprintf("tier %s priority %d overlaps with existing Tier", curTier.Name, curTier.Spec.Priority), false
1178+
return nil, fmt.Sprintf("tier %s priority %d overlaps with existing Tier", tierName, priority), false
1179+
}
1180+
// Now reserve the priority for this Tier creation
1181+
// This reservation will persist until the Tier is actually created and detected by the informer
1182+
if !t.priorityTracker.reservePriorityForValidation(priority, tierName) {
1183+
return nil, fmt.Sprintf("failed to reserve priority %d for Tier %s (already taken)", priority, tierName), false
10081184
}
1185+
klog.InfoS("Reserved priority for Tier creation", "priority", priority, "tier", tierName)
10091186
return nil, "", true
10101187
}
10111188

0 commit comments

Comments
 (0)