Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions operator/api/v1alpha1/deployment_policy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ type DeploymentBudget struct {
Count *int `json:"count,omitempty"`
}

// StrategyType represents the type of deployment strategy
type StrategyType string

const (
StrategyTypeFixed StrategyType = "fixed"
StrategyTypeLinear StrategyType = "linear"
StrategyTypeExponential StrategyType = "exponential"
StrategyTypeUnknown StrategyType = "unknown"
)

const (
DefaultCompartmentName = "__default__"
)
Expand Down
7 changes: 4 additions & 3 deletions operator/internal/controller/skyhook_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2295,16 +2295,17 @@ func setPodResources(pod *corev1.Pod, res *v1alpha1.ResourceRequirements) {
}
}

// PartitionNodesIntoCompartments partitions nodes for each skyhook that uses deployment policies
// PartitionNodesIntoCompartments partitions nodes for each skyhook that uses deployment policies.
func partitionNodesIntoCompartments(clusterState *clusterState) error {
for _, skyhook := range clusterState.skyhooks {
// Skip skyhooks that don't have compartments (no deployment policy)
if len(skyhook.GetCompartments()) == 0 {
continue
}

for _, node := range skyhook.GetNodes() {
compartmentName, err := wrapper.AssignNodeToCompartment(node, skyhook.GetCompartments())
allNodes := skyhook.GetNodes()
for _, node := range allNodes {
compartmentName, err := wrapper.AssignNodeToCompartment(node, skyhook.GetCompartments(), allNodes)
if err != nil {
return fmt.Errorf("error assigning node %s: %w", node.GetNode().Name, err)
}
Expand Down
137 changes: 132 additions & 5 deletions operator/internal/wrapper/compartment.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package wrapper

import (
"fmt"
"math"
"sort"

"github.com/NVIDIA/skyhook/operator/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -58,11 +60,90 @@ func (c *Compartment) AddNode(node SkyhookNode) {
c.Nodes = append(c.Nodes, node)
}

// AssignNodeToCompartment assigns a single node to the appropriate compartment
func AssignNodeToCompartment(node SkyhookNode, compartments map[string]*Compartment) (string, error) {
// strategySafetyOrder defines the safety ordering of strategies
// Lower values indicate safer strategies (less aggressive rollout)
// Strategy safety order: Fixed (0) > Linear (1) > Exponential (2)
var strategySafetyOrder = map[v1alpha1.StrategyType]int{
v1alpha1.StrategyTypeFixed: 0,
v1alpha1.StrategyTypeLinear: 1,
v1alpha1.StrategyTypeExponential: 2,
v1alpha1.StrategyTypeUnknown: 999, // Unknown is least safe
}

// getStrategyType returns the strategy type for a compartment
func getStrategyType(strategy *v1alpha1.DeploymentStrategy) v1alpha1.StrategyType {
if strategy == nil {
return v1alpha1.StrategyTypeUnknown
}
if strategy.Fixed != nil {
return v1alpha1.StrategyTypeFixed
}
if strategy.Linear != nil {
return v1alpha1.StrategyTypeLinear
}
if strategy.Exponential != nil {
return v1alpha1.StrategyTypeExponential
}
return v1alpha1.StrategyTypeUnknown
}

// strategyIsSafer returns true if strategy a is safer than strategy b
// Strategy safety order: Fixed > Linear > Exponential
func strategyIsSafer(a, b v1alpha1.StrategyType) bool {
return strategySafetyOrder[a] < strategySafetyOrder[b]
}

// computeEffectiveCapacity calculates the effective ceiling for a compartment's budget
// given the number of matched nodes
func computeEffectiveCapacity(budget v1alpha1.DeploymentBudget, matchedNodes int) int {
if budget.Count != nil {
return *budget.Count
}
if budget.Percent != nil {
// capacity = max(1, ceil(percent/100 × matched))
capacity := float64(*budget.Percent) / 100.0 * float64(matchedNodes)
return max(1, int(math.Ceil(capacity)))
}
// Should not happen due to validation
return 0
}

// compartmentMatch represents a compartment that matches a node
type compartmentMatch struct {
name string
strategyType v1alpha1.StrategyType
capacity int
}

// countMatchingNodes counts how many nodes from allNodes match the given selector
func countMatchingNodes(allNodes []SkyhookNode, selector metav1.LabelSelector) (int, error) {
labelSelector, err := metav1.LabelSelectorAsSelector(&selector)
if err != nil {
return 0, err
}

count := 0
for _, node := range allNodes {
if labelSelector.Matches(labels.Set(node.GetNode().Labels)) {
count++
}
}
return count, nil
}

// AssignNodeToCompartment assigns a single node to the appropriate compartment using overlap resolution.
// When a node matches multiple compartments, it resolves using:
// 1. Strategy safety order: Fixed is safer than Linear, which is safer than Exponential
// 2. Tie-break on same strategy: Choose compartment with smaller effective ceiling (window)
// 3. Final tie-break: Lexicographically by compartment name for determinism
// The allNodes parameter is used to compute effective capacity for percent-based budgets.
// Assignments are recalculated fresh on every reconcile based on current cluster state.
func AssignNodeToCompartment(node SkyhookNode, compartments map[string]*Compartment, allNodes []SkyhookNode) (string, error) {
nodeLabels := labels.Set(node.GetNode().Labels)

// Check all non-default compartments first
matches := []compartmentMatch{}

// Collect all matching compartments (excluding default)
for _, compartment := range compartments {
// Skip the default compartment - it's a fallback
if compartment.Name == v1alpha1.DefaultCompartmentName {
Expand All @@ -73,11 +154,57 @@ func AssignNodeToCompartment(node SkyhookNode, compartments map[string]*Compartm
if err != nil {
return "", fmt.Errorf("invalid selector for compartment %s: %w", compartment.Name, err)
}

if selector.Matches(nodeLabels) {
return compartment.Name, nil
// Count how many nodes in total match this compartment's selector
matchedCount, err := countMatchingNodes(allNodes, compartment.Selector)
if err != nil {
return "", fmt.Errorf("error counting matching nodes for compartment %s: %w", compartment.Name, err)
}

// Ensure at least 1 node for capacity calculation
if matchedCount == 0 {
matchedCount = 1
}

stratType := getStrategyType(compartment.Strategy)
capacity := computeEffectiveCapacity(compartment.Budget, matchedCount)

matches = append(matches, compartmentMatch{
name: compartment.Name,
strategyType: stratType,
capacity: capacity,
})
}
}

// No matches - assign to default
return v1alpha1.DefaultCompartmentName, nil
if len(matches) == 0 {
return v1alpha1.DefaultCompartmentName, nil
}

// Single match - return it
if len(matches) == 1 {
return matches[0].name, nil
}

// Multiple matches - apply overlap resolution
// Sort matches using the safety heuristic
sort.Slice(matches, func(i, j int) bool {
// 1. Strategy safety order: Fixed > Linear > Exponential
if matches[i].strategyType != matches[j].strategyType {
return strategyIsSafer(matches[i].strategyType, matches[j].strategyType)
}

// 2. Tie-break on same strategy: smaller window (capacity)
if matches[i].capacity != matches[j].capacity {
return matches[i].capacity < matches[j].capacity
}

// 3. Final tie-break: lexicographically by name for determinism
return matches[i].name < matches[j].name
})

// Return the safest compartment
return matches[0].name, nil
}
Loading