Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) {
}()
return nil
})

// Start the NetworkPolicyValidator background routines
s.AddPostStartHook("start-validator-routines", func(context genericapiserver.PostStartHookContext) error {
go v.Run(context.Done())
return nil
})
}

if features.DefaultFeatureGate.Enabled(features.Egress) || features.DefaultFeatureGate.Enabled(features.ServiceExternalIP) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,30 @@ func NewNetworkPolicyController(kubeClient clientset.Interface,
return n
}

// SetupTierEventHandlersForValidator sets up event handlers for Tier changes to notify the validator.
// This enables the validator to track when Tiers are actually created/deleted and release priority reservations.
func (n *NetworkPolicyController) SetupTierEventHandlersForValidator(validator *NetworkPolicyValidator) {
for _, tv := range validator.tierValidators {
// Add event handler for Tier changes.
// We don't need UpdateFunc for priority tracking since priority updates are not allowed.
n.tierInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if tier, ok := obj.(*secv1beta1.Tier); ok {
klog.V(4).InfoS("Tier created, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority)
tv.OnTierCreate(tier)
}
},
DeleteFunc: func(obj interface{}) {
if tier, ok := obj.(*secv1beta1.Tier); ok {
klog.V(4).InfoS("Tier deleted, notifying validator", "tier", tier.Name, "priority", tier.Spec.Priority)
tv.OnTierDelete(tier)
}
},
})
klog.V(2).InfoS("Tier event handlers set up for validator")
}
}

func (n *NetworkPolicyController) heartbeat(name string) {
if n.heartbeatCh != nil {
n.heartbeatCh <- heartbeat{
Expand Down
178 changes: 168 additions & 10 deletions pkg/controller/networkpolicy/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

admv1 "k8s.io/api/admission/v1"
authenticationv1 "k8s.io/api/authentication/v1"
Expand Down Expand Up @@ -65,15 +67,105 @@ type resourceValidator struct {
// policies.
type antreaPolicyValidator resourceValidator

// tierValidator implements the validator interface for Tier resources.
type tierValidator resourceValidator

// groupValidator implements the validator interface for the ClusterGroup resource.
type groupValidator resourceValidator

// adminPolicyValidator implements the validator interface for the AdminNetworkPolicy resource.
type adminPolicyValidator resourceValidator

// tierValidator implements the validator interface for Tier resources.
type tierValidator struct {
networkPolicyController *NetworkPolicyController
priorityTracker *tierPriorityTracker
}

// tierPriorityTracker manages in-memory tracking of Tier priorities currently being validated/created
// to prevent race conditions in priority overlap detection.
type tierPriorityTracker struct {
mu sync.Mutex
// pendingPriorities tracks priorities that are currently being validated/created
// Key is the priority value, value contains the reservation details
pendingPriorities map[int32]*priorityReservation
// creationTimeout for how long to wait for actual Tier creation in K8s
creationTimeout time.Duration
}

// priorityReservation tracks a single priority reservation
type priorityReservation struct {
tierName string
// createdAt tracks when this reservation was made
createdAt time.Time
}

func newTierPriorityTracker() *tierPriorityTracker {
return &tierPriorityTracker{
pendingPriorities: make(map[int32]*priorityReservation),
creationTimeout: 30 * time.Second, // timeout for waiting for actual K8s creation
}
}

// reservePriorityForValidation attempts to reserve a priority for validation.
// This reservation will persist until the Tier is actually created in K8s and detected by the informer.
// Returns true if the reservation was successful, false if the priority is already taken.
func (t *tierPriorityTracker) reservePriorityForValidation(priority int32, tierName string) bool {
t.mu.Lock()
defer t.mu.Unlock()

// Check if priority is already being processed
if existing, exists := t.pendingPriorities[priority]; exists {
// Check if the existing reservation has timed out
if time.Since(existing.createdAt) > t.creationTimeout {
klog.V(2).InfoS("Tier priority reservation has timed out, allowing new reservation", "priority", priority, "tier", existing.tierName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for logging this as V(2) when the other logs (which seem to be of equal importance) are logged with V(4)?

delete(t.pendingPriorities, priority)
} else {
klog.V(4).InfoS("Priority is already reserved by another Tier", "priority", priority, "existingTier", existing.tierName)
return false
}
}

// Reserve the priority
reservation := &priorityReservation{
tierName: tierName,
createdAt: time.Now(),
}
t.pendingPriorities[priority] = reservation

klog.V(4).InfoS("Reserved priority for Tier validation", "priority", priority, "tier", tierName)
return true
}

// releasePriorityReservation releases a priority reservation when the Tier creation is complete.
// This should be called when the informer detects the new Tier.
func (t *tierPriorityTracker) releasePriorityReservation(priority int32, tierName string) {
t.mu.Lock()
defer t.mu.Unlock()

if existing, exists := t.pendingPriorities[priority]; exists {
if existing.tierName == tierName {
delete(t.pendingPriorities, priority)
klog.V(4).InfoS("Released priority reservation for Tier", "priority", priority, "tier", tierName)
} else {
klog.InfoS("Attempted to release priority for a different Tier", "priority", priority, "tier", tierName, "existingTier", existing.tierName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this log, but can you think of an example situation where this could happen?

}
}
}

// cleanupExpiredReservations removes reservations that have exceeded the creation timeout.
// This should be called periodically to prevent memory leaks from failed creations.
func (t *tierPriorityTracker) cleanupExpiredReservations() {
t.mu.Lock()
defer t.mu.Unlock()

now := time.Now()
for priority, reservation := range t.pendingPriorities {
if now.Sub(reservation.createdAt) > t.creationTimeout {
klog.V(4).InfoS("Cleaning up expired priority reservation for Tier",
"priority", priority, "tier", reservation.tierName, "age", now.Sub(reservation.createdAt))
delete(t.pendingPriorities, priority)
}
}
}

var (
// reservedTierPriorities stores the reserved priority range from 251, 252, 254 and 255.
// The priority 250 is reserved for default Tier but not part of this set in order to be
Expand All @@ -98,7 +190,7 @@ func (v *NetworkPolicyValidator) RegisterAntreaPolicyValidator(a validator) {
// RegisterTierValidator registers a Tier validator to the resource registry.
// A new validator must be registered by calling this function before the Run
// phase of the APIServer.
func (v *NetworkPolicyValidator) RegisterTierValidator(t validator) {
func (v *NetworkPolicyValidator) RegisterTierValidator(t *tierValidator) {
v.tierValidators = append(v.tierValidators, t)
}

Expand All @@ -121,7 +213,7 @@ type NetworkPolicyValidator struct {
antreaPolicyValidators []validator
// tierValidators maintains a list of validator objects which
// implement the validator interface for Tier resources.
tierValidators []validator
tierValidators []*tierValidator
// groupValidators maintains a list of validator objects which
// implement the validator interface for ClusterGroup resources.
groupValidators []validator
Expand All @@ -143,6 +235,7 @@ func NewNetworkPolicyValidator(networkPolicyController *NetworkPolicyController)
// tv is an instance of tierValidator to validate Tier resource events.
tv := tierValidator{
networkPolicyController: networkPolicyController,
priorityTracker: newTierPriorityTracker(),
}
// gv is an instance of groupValidator to validate ClusterGroup
// resource events.
Expand All @@ -156,9 +249,21 @@ func NewNetworkPolicyValidator(networkPolicyController *NetworkPolicyController)
vr.RegisterTierValidator(&tv)
vr.RegisterGroupValidator(&gv)
vr.RegisterAdminNetworkPolicyValidator(&av)

// Set up Tier event handlers to notify validator when Tiers are actually created/deleted
networkPolicyController.SetupTierEventHandlersForValidator(&vr)
return &vr
}

// Run starts the background routines for the NetworkPolicyValidator.
func (v *NetworkPolicyValidator) Run(stopCh <-chan struct{}) {
// Start cleanup routine for expired reservations from the tierValidator
for _, tv := range v.tierValidators {
go tv.startCleanupRoutine(stopCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you call Run in a goroutine in s.AddPostStartHook, so you expect it to be blocking, so I wouldn't call tv.startCleanupRoutine in yet another goroutine

BTW, would it make sense to just replace v.tierValidators with a single v.tierValidator?

break // We only expect one tierValidator
}
}

// Validate function validates a Group, ClusterGroup, Tier or Antrea Policy object
func (v *NetworkPolicyValidator) Validate(ar *admv1.AdmissionReview) *admv1.AdmissionResponse {
var result *metav1.Status
Expand Down Expand Up @@ -498,6 +603,44 @@ func (v *NetworkPolicyValidator) validateTier(curTier, oldTier *crdv1beta1.Tier,
return warnings, reason, allowed
}

// startCleanupRoutine starts a background routine to clean up expired priority reservations
func (t *tierValidator) startCleanupRoutine(stopCh <-chan struct{}) {
if t.priorityTracker == nil {
klog.ErrorS(nil, "Priority tracker is nil, cannot start cleanup routine")
return
}
Comment on lines +608 to +611
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove that, it shouldn't be possible by design


ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be meaningful to use t.creationTimeout here instead of hardcoding 30s?

defer ticker.Stop()

for {
select {
case <-ticker.C:
t.priorityTracker.cleanupExpiredReservations()
case <-stopCh:
klog.InfoS("Stopping Tier priority tracker cleanup routine")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a Stopping log, I think you should have a corresponding Starting log at the beginning of the function

return
}
}
}

// OnTierCreate should be called when a new Tier is detected by the informer.
// This releases the priority reservation for the created Tier.
func (t *tierValidator) OnTierCreate(tier *crdv1beta1.Tier) {
if tier != nil && t.priorityTracker != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these nilness checks seem overly defensive here, I think there is no confusion that these values cannot possibly be nil

t.priorityTracker.releasePriorityReservation(tier.Spec.Priority, tier.Name)
}
}

// OnTierDelete should be called when a Tier is deleted.
// This ensures any stale reservations are cleaned up.
func (t *tierValidator) OnTierDelete(tier *crdv1beta1.Tier) {
if tier != nil && t.priorityTracker != nil {
// In case there was a stale reservation, clean it up
t.priorityTracker.releasePriorityReservation(tier.Spec.Priority, tier.Name)
}
}

func (v *antreaPolicyValidator) tierExists(name string) bool {
_, err := v.networkPolicyController.tierLister.Get(name)
return err == nil
Expand Down Expand Up @@ -997,15 +1140,30 @@ func (t *tierValidator) createValidate(curObj interface{}, userInfo authenticati
return nil, fmt.Sprintf("maximum number of Tiers supported: %d", maxSupportedTiers), false
}
curTier := curObj.(*crdv1beta1.Tier)
priority := curTier.Spec.Priority
tierName := curTier.Name

// Tier priority must not overlap reserved tier's priority.
if reservedTierPriorities.Has(curTier.Spec.Priority) {
return nil, fmt.Sprintf("tier %s priority %d is reserved", curTier.Name, curTier.Spec.Priority), false
if reservedTierPriorities.Has(priority) {
return nil, fmt.Sprintf("tier %s priority %d is reserved", tierName, priority), false
}
// The priorityTracker should always be available for tierValidator
if t.priorityTracker == nil {
Comment on lines +1150 to +1151
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return nil, "internal error: priority tracker not initialized", false
}
// Tier priority must not overlap existing tier's priority
trs, err := t.networkPolicyController.tierInformer.Informer().GetIndexer().ByIndex(PriorityIndex, strconv.FormatInt(int64(curTier.Spec.Priority), 10))

// Check for priority overlap with existing tiers first
trs, err := t.networkPolicyController.tierInformer.Informer().GetIndexer().ByIndex(PriorityIndex, strconv.FormatInt(int64(priority), 10))
if err != nil || len(trs) > 0 {
return nil, fmt.Sprintf("tier %s priority %d overlaps with existing Tier", curTier.Name, curTier.Spec.Priority), false
return nil, fmt.Sprintf("tier %s priority %d overlaps with existing Tier", tierName, priority), false
}

// Attempt to reserve the priority for this Tier creation
// This will fail immediately if the priority is already reserved by another validation
if !t.priorityTracker.reservePriorityForValidation(priority, tierName) {
return nil, fmt.Sprintf("tier priority %d is currently being validated by another request, please retry", priority), false
}
klog.InfoS("Reserved priority for Tier creation", "priority", priority, "tier", tierName)
return nil, "", true
}

Expand Down
Loading
Loading