From f2461505ff6097b011d1454b7272404426c168d4 Mon Sep 17 00:00:00 2001 From: Tommy Lam Date: Fri, 17 Oct 2025 15:15:41 -0700 Subject: [PATCH 1/2] feat: compartment status --- chart/templates/skyhook-crd.yaml | 79 +++++++--- operator/api/v1alpha1/skyhook_types.go | 27 +++- .../api/v1alpha1/zz_generated.deepcopy.go | 28 +++- .../bases/skyhook.nvidia.com_skyhooks.yaml | 79 +++++++--- .../internal/controller/cluster_state_v2.go | 146 ++++++++++++------ .../controller/cluster_state_v2_test.go | 130 ---------------- .../internal/controller/skyhook_controller.go | 7 +- operator/internal/wrapper/compartment.go | 30 +--- operator/internal/wrapper/compartment_test.go | 8 +- 9 files changed, 272 insertions(+), 262 deletions(-) diff --git a/chart/templates/skyhook-crd.yaml b/chart/templates/skyhook-crd.yaml index f9d5c854..22b4b7fa 100644 --- a/chart/templates/skyhook-crd.yaml +++ b/chart/templates/skyhook-crd.yaml @@ -498,38 +498,69 @@ spec: status: description: SkyhookStatus defines the observed state of Skyhook properties: - compartmentBatchStates: + compartmentStatuses: additionalProperties: - description: BatchProcessingState tracks the current state of batch - processing for a compartment + description: CompartmentStatus tracks the detailed state of a compartment properties: - completedNodes: - description: Total number of nodes that have completed successfully - (cumulative across all batches) + batchState: + description: BatchState tracks the batch processing state for + this compartment + properties: + completedNodes: + description: Total number of nodes that have completed successfully + (cumulative across all batches) + type: integer + consecutiveFailures: + description: Number of consecutive failures + type: integer + currentBatch: + description: Current batch number (starts at 1) + type: integer + failedNodes: + description: Total number of nodes that have failed (cumulative + across all batches) + type: integer + lastBatchFailed: + description: Whether the last batch failed (for slowdown + logic) + type: boolean + lastBatchSize: + description: Last batch size (for slowdown calculations) + type: integer + shouldStop: + description: Whether the strategy should stop processing + due to failures + type: boolean + type: object + ceiling: + description: Ceiling is the maximum number of nodes that can + be in progress at once type: integer - consecutiveFailures: - description: Number of consecutive failures + completed: + description: Completed is the number of nodes that have completed + successfully type: integer - currentBatch: - description: Current batch number (starts at 1) + inProgress: + description: InProgress is the number of nodes currently in + progress type: integer - failedNodes: - description: Total number of nodes that have failed (cumulative - across all batches) + matched: + description: Matched is the number of nodes that match this + compartment's selector type: integer - lastBatchFailed: - description: Whether the last batch failed (for slowdown logic) - type: boolean - lastBatchSize: - description: Last batch size (for slowdown calculations) + progressPercent: + description: ProgressPercent is the percentage of nodes completed + (0-100) type: integer - shouldStop: - description: Whether the strategy should stop processing due - to failures - type: boolean + required: + - ceiling + - completed + - inProgress + - matched + - progressPercent type: object - description: CompartmentBatchStates tracks batch processing state - per compartment + description: CompartmentStatuses tracks the detailed status of each + compartment type: object completeNodes: default: 0/0 diff --git a/operator/api/v1alpha1/skyhook_types.go b/operator/api/v1alpha1/skyhook_types.go index ec4d0f77..9375be25 100644 --- a/operator/api/v1alpha1/skyhook_types.go +++ b/operator/api/v1alpha1/skyhook_types.go @@ -280,6 +280,28 @@ const ( RESTART_ALL_SERVICES InterruptType = "restartAllServices" ) +// CompartmentStatus tracks the detailed state of a compartment +type CompartmentStatus struct { + // Matched is the number of nodes that match this compartment's selector + Matched int `json:"matched"` + + // Ceiling is the maximum number of nodes that can be in progress at once + Ceiling int `json:"ceiling"` + + // InProgress is the number of nodes currently in progress + InProgress int `json:"inProgress"` + + // Completed is the number of nodes that have completed successfully + Completed int `json:"completed"` + + // ProgressPercent is the percentage of nodes completed (0-100) + ProgressPercent int `json:"progressPercent"` + + // BatchState tracks the batch processing state for this compartment + // +optional + BatchState *BatchProcessingState `json:"batchState,omitempty"` +} + // SkyhookStatus defines the observed state of Skyhook type SkyhookStatus struct { @@ -316,8 +338,9 @@ type SkyhookStatus struct { // ConfigUpdates tracks config updates ConfigUpdates map[string][]string `json:"configUpdates,omitempty"` - // CompartmentBatchStates tracks batch processing state per compartment - CompartmentBatchStates map[string]BatchProcessingState `json:"compartmentBatchStates,omitempty"` + // CompartmentStatuses tracks the detailed status of each compartment + // +optional + CompartmentStatuses map[string]CompartmentStatus `json:"compartmentStatuses,omitempty"` // +kubebuilder:example=3 // +kubebuilder:default=0 diff --git a/operator/api/v1alpha1/zz_generated.deepcopy.go b/operator/api/v1alpha1/zz_generated.deepcopy.go index 0619a4ee..a19c6fbe 100644 --- a/operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/operator/api/v1alpha1/zz_generated.deepcopy.go @@ -65,6 +65,26 @@ func (in *Compartment) DeepCopy() *Compartment { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CompartmentStatus) DeepCopyInto(out *CompartmentStatus) { + *out = *in + if in.BatchState != nil { + in, out := &in.BatchState, &out.BatchState + *out = new(BatchProcessingState) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CompartmentStatus. +func (in *CompartmentStatus) DeepCopy() *CompartmentStatus { + if in == nil { + return nil + } + out := new(CompartmentStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeploymentBudget) DeepCopyInto(out *DeploymentBudget) { *out = *in @@ -703,11 +723,11 @@ func (in *SkyhookStatus) DeepCopyInto(out *SkyhookStatus) { (*out)[key] = outVal } } - if in.CompartmentBatchStates != nil { - in, out := &in.CompartmentBatchStates, &out.CompartmentBatchStates - *out = make(map[string]BatchProcessingState, len(*in)) + if in.CompartmentStatuses != nil { + in, out := &in.CompartmentStatuses, &out.CompartmentStatuses + *out = make(map[string]CompartmentStatus, len(*in)) for key, val := range *in { - (*out)[key] = val + (*out)[key] = *val.DeepCopy() } } } diff --git a/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml b/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml index 54689a82..0aa52980 100644 --- a/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml +++ b/operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml @@ -499,38 +499,69 @@ spec: status: description: SkyhookStatus defines the observed state of Skyhook properties: - compartmentBatchStates: + compartmentStatuses: additionalProperties: - description: BatchProcessingState tracks the current state of batch - processing for a compartment + description: CompartmentStatus tracks the detailed state of a compartment properties: - completedNodes: - description: Total number of nodes that have completed successfully - (cumulative across all batches) + batchState: + description: BatchState tracks the batch processing state for + this compartment + properties: + completedNodes: + description: Total number of nodes that have completed successfully + (cumulative across all batches) + type: integer + consecutiveFailures: + description: Number of consecutive failures + type: integer + currentBatch: + description: Current batch number (starts at 1) + type: integer + failedNodes: + description: Total number of nodes that have failed (cumulative + across all batches) + type: integer + lastBatchFailed: + description: Whether the last batch failed (for slowdown + logic) + type: boolean + lastBatchSize: + description: Last batch size (for slowdown calculations) + type: integer + shouldStop: + description: Whether the strategy should stop processing + due to failures + type: boolean + type: object + ceiling: + description: Ceiling is the maximum number of nodes that can + be in progress at once type: integer - consecutiveFailures: - description: Number of consecutive failures + completed: + description: Completed is the number of nodes that have completed + successfully type: integer - currentBatch: - description: Current batch number (starts at 1) + inProgress: + description: InProgress is the number of nodes currently in + progress type: integer - failedNodes: - description: Total number of nodes that have failed (cumulative - across all batches) + matched: + description: Matched is the number of nodes that match this + compartment's selector type: integer - lastBatchFailed: - description: Whether the last batch failed (for slowdown logic) - type: boolean - lastBatchSize: - description: Last batch size (for slowdown calculations) + progressPercent: + description: ProgressPercent is the percentage of nodes completed + (0-100) type: integer - shouldStop: - description: Whether the strategy should stop processing due - to failures - type: boolean + required: + - ceiling + - completed + - inProgress + - matched + - progressPercent type: object - description: CompartmentBatchStates tracks batch processing state - per compartment + description: CompartmentStatuses tracks the detailed status of each + compartment type: object completeNodes: default: 0/0 diff --git a/operator/internal/controller/cluster_state_v2.go b/operator/internal/controller/cluster_state_v2.go index 3b3c2e9b..1f8518c2 100644 --- a/operator/internal/controller/cluster_state_v2.go +++ b/operator/internal/controller/cluster_state_v2.go @@ -108,9 +108,9 @@ func BuildState(skyhooks *v1alpha1.SkyhookList, nodes *corev1.NodeList, deployme if skyhook.Spec.DeploymentPolicy == "" { // Load persisted batch state if it exists var defaultBatchState *v1alpha1.BatchProcessingState - if skyhook.Status.CompartmentBatchStates != nil { - if state, exists := skyhook.Status.CompartmentBatchStates[v1alpha1.DefaultCompartmentName]; exists { - defaultBatchState = &state + if skyhook.Status.CompartmentStatuses != nil { + if status, exists := skyhook.Status.CompartmentStatuses[v1alpha1.DefaultCompartmentName]; exists && status.BatchState != nil { + defaultBatchState = status.BatchState } } @@ -128,20 +128,20 @@ func BuildState(skyhooks *v1alpha1.SkyhookList, nodes *corev1.NodeList, deployme for _, deploymentPolicy := range deploymentPolicies.Items { if deploymentPolicy.Name == skyhook.Spec.DeploymentPolicy { for _, compartment := range deploymentPolicy.Spec.Compartments { - // Load persisted batch state if it exists + // Load persisted batch state from CompartmentStatuses if it exists var batchState *v1alpha1.BatchProcessingState - if skyhook.Status.CompartmentBatchStates != nil { - if state, exists := skyhook.Status.CompartmentBatchStates[compartment.Name]; exists { - batchState = &state + if skyhook.Status.CompartmentStatuses != nil { + if status, exists := skyhook.Status.CompartmentStatuses[compartment.Name]; exists && status.BatchState != nil { + batchState = status.BatchState } } ret.skyhooks[idx].AddCompartment(compartment.Name, wrapper.NewCompartmentWrapper(&compartment, batchState)) } // use policy default var defaultBatchState *v1alpha1.BatchProcessingState - if skyhook.Status.CompartmentBatchStates != nil { - if state, exists := skyhook.Status.CompartmentBatchStates[v1alpha1.DefaultCompartmentName]; exists { - defaultBatchState = &state + if skyhook.Status.CompartmentStatuses != nil { + if status, exists := skyhook.Status.CompartmentStatuses[v1alpha1.DefaultCompartmentName]; exists && status.BatchState != nil { + defaultBatchState = status.BatchState } } ret.skyhooks[idx].AddCompartment(v1alpha1.DefaultCompartmentName, wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ @@ -259,7 +259,6 @@ type SkyhookNodes interface { GetCompartments() map[string]*wrapper.Compartment AddCompartment(name string, compartment *wrapper.Compartment) AddCompartmentNode(name string, node wrapper.SkyhookNode) - PersistCompartmentBatchStates() bool AssignNodeToCompartment(node wrapper.SkyhookNode) (string, error) } @@ -537,36 +536,6 @@ func (np *NodePicker) selectNodesWithCompartments(s SkyhookNodes, compartments m return selectedNodes } -// PersistCompartmentBatchStates saves the current batch state for all compartments to the Skyhook status -func (s *skyhookNodes) PersistCompartmentBatchStates() bool { - compartments := s.GetCompartments() - if len(compartments) == 0 { - return false // No compartments, nothing to persist - } - - // Initialize the batch states map if needed - if s.skyhook.Status.CompartmentBatchStates == nil { - s.skyhook.Status.CompartmentBatchStates = make(map[string]v1alpha1.BatchProcessingState) - } - - changed := false - for _, compartment := range compartments { - // Always persist batch state to maintain cumulative counters - batchState := compartment.GetBatchState() - // Only persist if there's meaningful state (batch has started or there are nodes) - if batchState.CurrentBatch > 0 || len(compartment.GetNodes()) > 0 { - s.skyhook.Status.CompartmentBatchStates[compartment.GetName()] = batchState - changed = true - } - } - - if changed { - s.skyhook.Updated = true - } - - return changed -} - // updateTaintToleranceCondition updates the taint tolerance condition on the skyhook func (np *NodePicker) updateTaintToleranceCondition(s SkyhookNodes, nodesWithTaintTolerationIssue []string) { if len(nodesWithTaintTolerationIssue) > 0 { @@ -651,11 +620,14 @@ func evaluateCompletedBatches(skyhook SkyhookNodes) bool { // Update the compartment's batch state using strategy logic compartment.EvaluateAndUpdateBatchState(batchSize, successCount, failureCount) - // Persist the updated batch state to the skyhook status - if skyhook.GetSkyhook().Status.CompartmentBatchStates == nil { - skyhook.GetSkyhook().Status.CompartmentBatchStates = make(map[string]v1alpha1.BatchProcessingState) + // Persist the updated batch state to the skyhook status immediately + if skyhook.GetSkyhook().Status.CompartmentStatuses == nil { + skyhook.GetSkyhook().Status.CompartmentStatuses = make(map[string]v1alpha1.CompartmentStatus) } - skyhook.GetSkyhook().Status.CompartmentBatchStates[compartment.GetName()] = compartment.GetBatchState() + // Build and persist the compartment status with the updated batch state + newStatus := buildCompartmentStatus(compartment) + skyhook.GetSkyhook().Status.CompartmentStatuses[compartment.GetName()] = newStatus + skyhook.GetSkyhook().Updated = true changed = true } @@ -722,6 +694,73 @@ func UpdateSkyhookPauseStatus(skyhook SkyhookNodes) bool { return changed } +// compartmentStatusEqual compares two CompartmentStatus for equality +func compartmentStatusEqual(a, b v1alpha1.CompartmentStatus) bool { + if a.Matched != b.Matched || a.Ceiling != b.Ceiling || a.InProgress != b.InProgress || + a.Completed != b.Completed || a.ProgressPercent != b.ProgressPercent { + return false + } + + // Compare BatchState if present + if (a.BatchState == nil) != (b.BatchState == nil) { + return false + } + if a.BatchState != nil && b.BatchState != nil { + return *a.BatchState == *b.BatchState + } + return true +} + +// buildCompartmentStatus creates a CompartmentStatus for a given compartment +func buildCompartmentStatus(compartment *wrapper.Compartment) v1alpha1.CompartmentStatus { + matched := len(compartment.GetNodes()) + ceiling := wrapper.CalculateCeiling(compartment.Budget, matched) + + // Count inProgress and completed nodes + inProgress := 0 + completed := 0 + for _, node := range compartment.GetNodes() { + if node.Status() == v1alpha1.StatusInProgress { + inProgress++ + } + if node.IsComplete() { + completed++ + } + } + + // Calculate progress percentage + progressPercent := 0 + if matched > 0 { + progressPercent = (completed * 100) / matched + } + + // Get batch state + batchState := compartment.GetBatchState() + + // Copy batch state for status + var batchStateCopy *v1alpha1.BatchProcessingState + if compartment.Strategy != nil { + batchStateCopy = &v1alpha1.BatchProcessingState{ + CurrentBatch: batchState.CurrentBatch, + ConsecutiveFailures: batchState.ConsecutiveFailures, + CompletedNodes: batchState.CompletedNodes, + FailedNodes: batchState.FailedNodes, + ShouldStop: batchState.ShouldStop, + LastBatchSize: batchState.LastBatchSize, + LastBatchFailed: batchState.LastBatchFailed, + } + } + + return v1alpha1.CompartmentStatus{ + Matched: matched, + Ceiling: ceiling, + InProgress: inProgress, + Completed: completed, + ProgressPercent: progressPercent, + BatchState: batchStateCopy, + } +} + // ReportState collects the current state of the skyhook and reports it to the skyhook status for printer columns func (skyhook *skyhookNodes) ReportState() { CleanupRemovedNodes(skyhook) @@ -767,6 +806,21 @@ func (skyhook *skyhookNodes) ReportState() { } } + // Update compartment statuses if compartments exist + if len(skyhook.compartments) > 0 { + if skyhook.skyhook.Status.CompartmentStatuses == nil { + skyhook.skyhook.Status.CompartmentStatuses = make(map[string]v1alpha1.CompartmentStatus) + } + + for name, compartment := range skyhook.compartments { + newStatus := buildCompartmentStatus(compartment) + if existing, ok := skyhook.skyhook.Status.CompartmentStatuses[name]; !ok || !compartmentStatusEqual(existing, newStatus) { + skyhook.skyhook.Status.CompartmentStatuses[name] = newStatus + skyhook.skyhook.Updated = true + } + } + } + // reset metrics to zero ResetSkyhookMetricsToZero(skyhook) @@ -931,7 +985,7 @@ func (skyhook *skyhookNodes) AssignNodeToCompartment(node wrapper.SkyhookNode) ( } stratType := wrapper.GetStrategyType(compartment.Strategy) - capacity := wrapper.ComputeEffectiveCapacity(compartment.Budget, matchedCount) + capacity := wrapper.CalculateCeiling(compartment.Budget, matchedCount) matches = append(matches, compartmentMatch{ name: compartment.Name, diff --git a/operator/internal/controller/cluster_state_v2_test.go b/operator/internal/controller/cluster_state_v2_test.go index b2b7ee63..bb437bf1 100644 --- a/operator/internal/controller/cluster_state_v2_test.go +++ b/operator/internal/controller/cluster_state_v2_test.go @@ -562,136 +562,6 @@ var _ = Describe("CleanupRemovedNodes", func() { }) }) - Describe("PersistCompartmentBatchStates", func() { - var skyhook *wrapper.Skyhook - var sn *skyhookNodes - - BeforeEach(func() { - skyhook = &wrapper.Skyhook{ - Skyhook: &v1alpha1.Skyhook{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-skyhook", - }, - Status: v1alpha1.SkyhookStatus{}, - }, - } - - sn = &skyhookNodes{ - skyhook: skyhook, - nodes: []wrapper.SkyhookNode{}, - compartments: make(map[string]*wrapper.Compartment), - } - }) - - It("should return false when there are no compartments", func() { - result := sn.PersistCompartmentBatchStates() - Expect(result).To(BeFalse()) - Expect(skyhook.Updated).To(BeFalse()) - }) - - It("should persist batch state when compartment has CurrentBatch > 0", func() { - // Create a compartment with batch state - batchState := &v1alpha1.BatchProcessingState{ - CurrentBatch: 1, - CompletedNodes: 4, - FailedNodes: 1, - } - compartment := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ - Name: "compartment1", - Budget: v1alpha1.DeploymentBudget{ - Count: kptr.To(10), - }, - Strategy: &v1alpha1.DeploymentStrategy{ - Fixed: &v1alpha1.FixedStrategy{InitialBatch: kptr.To(5)}, - }, - }, batchState) - - sn.AddCompartment("compartment1", compartment) - - result := sn.PersistCompartmentBatchStates() - - Expect(result).To(BeTrue()) - Expect(skyhook.Updated).To(BeTrue()) - Expect(skyhook.Status.CompartmentBatchStates).ToNot(BeNil()) - Expect(skyhook.Status.CompartmentBatchStates).To(HaveKey("compartment1")) - Expect(skyhook.Status.CompartmentBatchStates["compartment1"].CurrentBatch).To(Equal(1)) - Expect(skyhook.Status.CompartmentBatchStates["compartment1"].CompletedNodes).To(Equal(4)) - }) - - It("should persist batch state when compartment has nodes", func() { - // Create a compartment with nodes but no batch started yet - compartment := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ - Name: "compartment1", - Budget: v1alpha1.DeploymentBudget{ - Count: kptr.To(10), - }, - Strategy: &v1alpha1.DeploymentStrategy{ - Fixed: &v1alpha1.FixedStrategy{InitialBatch: kptr.To(5)}, - }, - }, nil) - - // Add a node to the compartment - node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}} - skyhookNode, err := wrapper.NewSkyhookNode(node, skyhook.Skyhook) - Expect(err).NotTo(HaveOccurred()) - compartment.AddNode(skyhookNode) - - sn.AddCompartment("compartment1", compartment) - - result := sn.PersistCompartmentBatchStates() - - Expect(result).To(BeTrue()) - Expect(skyhook.Updated).To(BeTrue()) - Expect(skyhook.Status.CompartmentBatchStates).ToNot(BeNil()) - Expect(skyhook.Status.CompartmentBatchStates).To(HaveKey("compartment1")) - }) - - It("should persist multiple compartments with meaningful state", func() { - // Create multiple compartments - batchState1 := &v1alpha1.BatchProcessingState{ - CurrentBatch: 1, - CompletedNodes: 5, - FailedNodes: 0, - } - compartment1 := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ - Name: "compartment1", - Budget: v1alpha1.DeploymentBudget{ - Count: kptr.To(10), - }, - Strategy: &v1alpha1.DeploymentStrategy{ - Fixed: &v1alpha1.FixedStrategy{InitialBatch: kptr.To(5)}, - }, - }, batchState1) - - batchState2 := &v1alpha1.BatchProcessingState{ - CurrentBatch: 2, - CompletedNodes: 8, - FailedNodes: 2, - } - compartment2 := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ - Name: "compartment2", - Budget: v1alpha1.DeploymentBudget{ - Count: kptr.To(5), - }, - Strategy: &v1alpha1.DeploymentStrategy{ - Linear: &v1alpha1.LinearStrategy{}, - }, - }, batchState2) - - sn.AddCompartment("compartment1", compartment1) - sn.AddCompartment("compartment2", compartment2) - - result := sn.PersistCompartmentBatchStates() - - Expect(result).To(BeTrue()) - Expect(skyhook.Updated).To(BeTrue()) - Expect(skyhook.Status.CompartmentBatchStates).ToNot(BeNil()) - Expect(skyhook.Status.CompartmentBatchStates).To(HaveLen(2)) - Expect(skyhook.Status.CompartmentBatchStates["compartment1"].CurrentBatch).To(Equal(1)) - Expect(skyhook.Status.CompartmentBatchStates["compartment2"].CurrentBatch).To(Equal(2)) - }) - }) - Describe("IntrospectSkyhook", func() { var testSkyhook *v1alpha1.Skyhook var testNode *corev1.Node diff --git a/operator/internal/controller/skyhook_controller.go b/operator/internal/controller/skyhook_controller.go index 36e3c47a..94413db9 100644 --- a/operator/internal/controller/skyhook_controller.go +++ b/operator/internal/controller/skyhook_controller.go @@ -561,9 +561,6 @@ func (r *SkyhookReconciler) RunSkyhookPackages(ctx context.Context, clusterState selectedNode := nodePicker.SelectNodes(skyhook) - // Persist compartment batch states after node selection - skyhook.PersistCompartmentBatchStates() - for _, node := range selectedNode { if node.IsComplete() && !node.Changed() { @@ -2302,8 +2299,8 @@ func setPodResources(pod *corev1.Pod, res *v1alpha1.ResourceRequirements) { // PartitionNodesIntoCompartments partitions nodes for each skyhook that uses deployment policies. func partitionNodesIntoCompartments(clusterState *clusterState) error { for _, skyhook := range clusterState.skyhooks { - // Skip skyhooks that don't have compartments (no deployment policy) - if len(skyhook.GetCompartments()) == 0 { + // Skip skyhooks without a deployment policy (they use the default compartment created in BuildState) + if skyhook.GetSkyhook().Spec.DeploymentPolicy == "" { continue } diff --git a/operator/internal/wrapper/compartment.go b/operator/internal/wrapper/compartment.go index 4626932b..0b6eff97 100644 --- a/operator/internal/wrapper/compartment.go +++ b/operator/internal/wrapper/compartment.go @@ -66,16 +66,16 @@ func (c *Compartment) AddNode(node SkyhookNode) { c.Nodes = append(c.Nodes, node) } -func (c *Compartment) calculateCeiling() int { - if c.Budget.Count != nil { - return *c.Budget.Count +// CalculateCeiling is a public helper to calculate ceiling from budget and matched nodes +func CalculateCeiling(budget v1alpha1.DeploymentBudget, matched int) int { + if budget.Count != nil { + return *budget.Count } - if c.Budget.Percent != nil { - matched := len(c.Nodes) + if budget.Percent != nil { if matched == 0 { return 0 } - limit := float64(*c.Budget.Percent) / 100 + limit := float64(*budget.Percent) / 100 return max(1, int(float64(matched)*limit)) } return 0 @@ -120,7 +120,7 @@ func (c *Compartment) createNewBatch() []SkyhookNode { if c.Strategy != nil { batchSize = c.Strategy.CalculateBatchSize(len(c.Nodes), &c.BatchState) } else { - ceiling := c.calculateCeiling() + ceiling := CalculateCeiling(c.Budget, len(c.Nodes)) availableCapacity := ceiling - c.getInProgressCount() batchSize = max(0, availableCapacity) } @@ -248,19 +248,3 @@ func GetStrategyType(strategy *v1alpha1.DeploymentStrategy) v1alpha1.StrategyTyp func StrategyIsSafer(a, b v1alpha1.StrategyType) bool { return strategySafetyOrder[a] < strategySafetyOrder[b] } - -// ComputeEffectiveCapacity calculates the effective ceiling for a compartment's budget -// given the number of matched nodes -func ComputeEffectiveCapacity(budget v1alpha1.DeploymentBudget, matchedNodes int) int { - if budget.Count != nil { - return *budget.Count - } - if budget.Percent != nil { - // capacity = max(1, floor(percent/100 × matched)) - // Use floor for safer rollouts - never exceed the intended percentage - capacity := float64(*budget.Percent) / 100.0 * float64(matchedNodes) - return max(1, int(capacity)) - } - // Should not happen due to validation - return 0 -} diff --git a/operator/internal/wrapper/compartment_test.go b/operator/internal/wrapper/compartment_test.go index df2869ec..d6c873ed 100644 --- a/operator/internal/wrapper/compartment_test.go +++ b/operator/internal/wrapper/compartment_test.go @@ -39,7 +39,7 @@ var _ = Describe("Compartment", func() { compartment.Nodes = append(compartment.Nodes, nil) } - ceiling := compartment.calculateCeiling() + ceiling := CalculateCeiling(compartment.Budget, len(compartment.Nodes)) Expect(ceiling).To(Equal(3)) }) @@ -55,7 +55,7 @@ var _ = Describe("Compartment", func() { compartment.Nodes = append(compartment.Nodes, nil) } - ceiling := compartment.calculateCeiling() + ceiling := CalculateCeiling(compartment.Budget, len(compartment.Nodes)) Expect(ceiling).To(Equal(3)) // max(1, int(10 * 0.3)) = 3 }) @@ -71,7 +71,7 @@ var _ = Describe("Compartment", func() { compartment.Nodes = append(compartment.Nodes, nil) } - ceiling := compartment.calculateCeiling() + ceiling := CalculateCeiling(compartment.Budget, len(compartment.Nodes)) Expect(ceiling).To(Equal(1)) // max(1, int(2 * 0.3)) = max(1, 0) = 1 }) @@ -82,7 +82,7 @@ var _ = Describe("Compartment", func() { }, } - ceiling := compartment.calculateCeiling() + ceiling := CalculateCeiling(compartment.Budget, len(compartment.Nodes)) Expect(ceiling).To(Equal(0)) }) }) From 2b96a219afe1564528f2852b20c5702ede662ada Mon Sep 17 00:00:00 2001 From: Tommy Lam Date: Tue, 21 Oct 2025 12:10:10 -0700 Subject: [PATCH 2/2] add tests --- .../controller/cluster_state_v2_test.go | 484 ++++++++++++++++++ 1 file changed, 484 insertions(+) diff --git a/operator/internal/controller/cluster_state_v2_test.go b/operator/internal/controller/cluster_state_v2_test.go index bb437bf1..f62b62dc 100644 --- a/operator/internal/controller/cluster_state_v2_test.go +++ b/operator/internal/controller/cluster_state_v2_test.go @@ -1212,3 +1212,487 @@ var _ = Describe("CleanupRemovedNodes", func() { }) }) }) + +var _ = Describe("Compartment Status Tests", func() { + Describe("compartmentStatusEqual", func() { + It("should return true for equal compartment statuses without batch state", func() { + a := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 3, + ProgressPercent: 30, + BatchState: nil, + } + b := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 3, + ProgressPercent: 30, + BatchState: nil, + } + Expect(compartmentStatusEqual(a, b)).To(BeTrue()) + }) + + It("should return true for equal compartment statuses with batch state", func() { + batchState := v1alpha1.BatchProcessingState{ + CurrentBatch: 2, + ConsecutiveFailures: 0, + CompletedNodes: 5, + FailedNodes: 1, + ShouldStop: false, + LastBatchSize: 3, + LastBatchFailed: false, + } + a := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 5, + ProgressPercent: 50, + BatchState: &batchState, + } + batchStateCopy := batchState + b := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 5, + ProgressPercent: 50, + BatchState: &batchStateCopy, + } + Expect(compartmentStatusEqual(a, b)).To(BeTrue()) + }) + + It("should return false when matched count differs", func() { + a := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 3, + ProgressPercent: 30, + } + b := v1alpha1.CompartmentStatus{ + Matched: 15, + Ceiling: 5, + InProgress: 2, + Completed: 3, + ProgressPercent: 30, + } + Expect(compartmentStatusEqual(a, b)).To(BeFalse()) + }) + + It("should return false when ceiling differs", func() { + a := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 3, + ProgressPercent: 30, + } + b := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 10, + InProgress: 2, + Completed: 3, + ProgressPercent: 30, + } + Expect(compartmentStatusEqual(a, b)).To(BeFalse()) + }) + + It("should return false when in progress count differs", func() { + a := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 3, + ProgressPercent: 30, + } + b := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 3, + Completed: 3, + ProgressPercent: 30, + } + Expect(compartmentStatusEqual(a, b)).To(BeFalse()) + }) + + It("should return false when completed count differs", func() { + a := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 3, + ProgressPercent: 30, + } + b := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 5, + ProgressPercent: 30, + } + Expect(compartmentStatusEqual(a, b)).To(BeFalse()) + }) + + It("should return false when progress percent differs", func() { + a := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 3, + ProgressPercent: 30, + } + b := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 3, + ProgressPercent: 50, + } + Expect(compartmentStatusEqual(a, b)).To(BeFalse()) + }) + + It("should return false when batch states differ", func() { + batchState1 := v1alpha1.BatchProcessingState{ + CurrentBatch: 2, + ConsecutiveFailures: 0, + CompletedNodes: 5, + FailedNodes: 1, + ShouldStop: false, + LastBatchSize: 3, + LastBatchFailed: false, + } + batchState2 := v1alpha1.BatchProcessingState{ + CurrentBatch: 2, + ConsecutiveFailures: 1, // Different + CompletedNodes: 5, + FailedNodes: 1, + ShouldStop: false, + LastBatchSize: 3, + LastBatchFailed: false, + } + a := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 5, + ProgressPercent: 50, + BatchState: &batchState1, + } + b := v1alpha1.CompartmentStatus{ + Matched: 10, + Ceiling: 5, + InProgress: 2, + Completed: 5, + ProgressPercent: 50, + BatchState: &batchState2, + } + Expect(compartmentStatusEqual(a, b)).To(BeFalse()) + }) + }) + + Describe("buildCompartmentStatus", func() { + It("should build status for compartment with no nodes", func() { + compartment := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ + Name: "test-compartment", + Budget: v1alpha1.DeploymentBudget{ + Count: ptr(5), + }, + }, nil) + + status := buildCompartmentStatus(compartment) + + Expect(status.Matched).To(Equal(0)) + Expect(status.Ceiling).To(Equal(5)) // Count budget returns the count value even with 0 nodes + Expect(status.InProgress).To(Equal(0)) + Expect(status.Completed).To(Equal(0)) + Expect(status.ProgressPercent).To(Equal(0)) + Expect(status.BatchState).To(BeNil()) + }) + + It("should build status for compartment with strategy and batch state", func() { + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "test-skyhook"}, + Spec: v1alpha1.SkyhookSpec{ + Packages: map[string]v1alpha1.Package{ + "test-package": { + PackageRef: v1alpha1.PackageRef{Name: "test-package", Version: "1.0.0"}, + Image: "test-image", + }, + }, + }, + } + + node1 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}} + node2 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node2"}} + + skyhookNode1, err := wrapper.NewSkyhookNode(node1, skyhook) + Expect(err).NotTo(HaveOccurred()) + skyhookNode2, err := wrapper.NewSkyhookNode(node2, skyhook) + Expect(err).NotTo(HaveOccurred()) + + skyhookNode1.SetStatus(v1alpha1.StatusComplete) + skyhookNode2.SetStatus(v1alpha1.StatusInProgress) + + compartment := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ + Name: "test-compartment", + Budget: v1alpha1.DeploymentBudget{ + Percent: ptr(50), + }, + Strategy: &v1alpha1.DeploymentStrategy{ + Fixed: &v1alpha1.FixedStrategy{ + InitialBatch: ptr(1), + }, + }, + }, nil) + + compartment.AddNode(skyhookNode1) + compartment.AddNode(skyhookNode2) + + status := buildCompartmentStatus(compartment) + + Expect(status.Matched).To(Equal(2)) + Expect(status.Ceiling).To(Equal(1)) // 50% of 2 = 1 + Expect(status.InProgress).To(Equal(1)) + Expect(status.Completed).To(Equal(0)) // No nodes actually complete (packages not done) + Expect(status.ProgressPercent).To(Equal(0)) + Expect(status.BatchState).NotTo(BeNil()) + Expect(status.BatchState.CurrentBatch).To(Equal(1)) + Expect(status.BatchState.ConsecutiveFailures).To(Equal(0)) + }) + + It("should calculate 100% progress when all nodes are complete", func() { + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "test-skyhook"}, + } + + node1 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}} + node2 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node2"}} + + skyhookNode1, err := wrapper.NewSkyhookNode(node1, skyhook) + Expect(err).NotTo(HaveOccurred()) + skyhookNode2, err := wrapper.NewSkyhookNode(node2, skyhook) + Expect(err).NotTo(HaveOccurred()) + + skyhookNode1.SetStatus(v1alpha1.StatusComplete) + skyhookNode2.SetStatus(v1alpha1.StatusComplete) + + compartment := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ + Name: "test-compartment", + Budget: v1alpha1.DeploymentBudget{ + Count: ptr(2), + }, + }, nil) + + compartment.AddNode(skyhookNode1) + compartment.AddNode(skyhookNode2) + + status := buildCompartmentStatus(compartment) + + Expect(status.ProgressPercent).To(Equal(100)) + Expect(status.Completed).To(Equal(2)) + Expect(status.InProgress).To(Equal(0)) + }) + }) + + Describe("should persist compartment status to skyhook status", func() { + It("should persist compartment status to skyhook status in ReportState", func() { + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "test-skyhook"}, + Spec: v1alpha1.SkyhookSpec{ + Packages: map[string]v1alpha1.Package{ + "test-package": { + PackageRef: v1alpha1.PackageRef{Name: "test-package", Version: "1.0.0"}, + Image: "test-image", + }, + }, + }, + Status: v1alpha1.SkyhookStatus{ + CompartmentStatuses: nil, + }, + } + + // Create nodes + node1 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}} + node2 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node2"}} + node3 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node3"}} + + skyhookNode1, err := wrapper.NewSkyhookNode(node1, skyhook) + Expect(err).NotTo(HaveOccurred()) + skyhookNode2, err := wrapper.NewSkyhookNode(node2, skyhook) + Expect(err).NotTo(HaveOccurred()) + skyhookNode3, err := wrapper.NewSkyhookNode(node3, skyhook) + Expect(err).NotTo(HaveOccurred()) + + skyhookNode1.SetStatus(v1alpha1.StatusComplete) + skyhookNode2.SetStatus(v1alpha1.StatusInProgress) + skyhookNode3.SetStatus(v1alpha1.StatusUnknown) + + // Create compartments + compartment1 := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ + Name: "compartment-1", + Budget: v1alpha1.DeploymentBudget{ + Percent: ptr(50), + }, + }, nil) + compartment1.AddNode(skyhookNode1) + compartment1.AddNode(skyhookNode2) + + compartment2 := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ + Name: "compartment-2", + Budget: v1alpha1.DeploymentBudget{ + Percent: ptr(100), + }, + }, nil) + compartment2.AddNode(skyhookNode3) + + // Create skyhookNodes + skyhookNodes := &skyhookNodes{ + skyhook: wrapper.NewSkyhookWrapper(skyhook), + nodes: []wrapper.SkyhookNode{skyhookNode1, skyhookNode2, skyhookNode3}, + compartments: make(map[string]*wrapper.Compartment), + } + skyhookNodes.AddCompartment("compartment-1", compartment1) + skyhookNodes.AddCompartment("compartment-2", compartment2) + + // Call ReportState + skyhookNodes.ReportState() + + // Verify compartment statuses were persisted + Expect(skyhookNodes.skyhook.Status.CompartmentStatuses).NotTo(BeNil()) + Expect(skyhookNodes.skyhook.Status.CompartmentStatuses).To(HaveLen(2)) + + // Verify compartment-1 status + comp1Status, ok := skyhookNodes.skyhook.Status.CompartmentStatuses["compartment-1"] + Expect(ok).To(BeTrue()) + Expect(comp1Status.Matched).To(Equal(2)) + Expect(comp1Status.Ceiling).To(Equal(1)) // 50% of 2 = 1 + Expect(comp1Status.InProgress).To(Equal(1)) + Expect(comp1Status.Completed).To(Equal(0)) // No packages completed + Expect(comp1Status.ProgressPercent).To(Equal(0)) + + // Verify compartment-2 status + comp2Status, ok := skyhookNodes.skyhook.Status.CompartmentStatuses["compartment-2"] + Expect(ok).To(BeTrue()) + Expect(comp2Status.Matched).To(Equal(1)) + Expect(comp2Status.Ceiling).To(Equal(1)) + Expect(comp2Status.InProgress).To(Equal(0)) + Expect(comp2Status.Completed).To(Equal(0)) + Expect(comp2Status.ProgressPercent).To(Equal(0)) + + // Verify Updated flag was set + Expect(skyhookNodes.skyhook.Updated).To(BeTrue()) + }) + + It("should not update status when compartment status hasn't changed", func() { + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "test-skyhook"}, + Status: v1alpha1.SkyhookStatus{ + CompartmentStatuses: make(map[string]v1alpha1.CompartmentStatus), + }, + } + + // Create a node + node1 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}} + skyhookNode1, err := wrapper.NewSkyhookNode(node1, skyhook) + Expect(err).NotTo(HaveOccurred()) + skyhookNode1.SetStatus(v1alpha1.StatusComplete) + + // Create compartment + compartment := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ + Name: "compartment-1", + Budget: v1alpha1.DeploymentBudget{ + Count: ptr(2), + }, + }, nil) + compartment.AddNode(skyhookNode1) + + // Pre-populate status with correct values + skyhook.Status.CompartmentStatuses["compartment-1"] = v1alpha1.CompartmentStatus{ + Matched: 1, + Ceiling: 2, + InProgress: 0, + Completed: 1, + ProgressPercent: 100, + BatchState: nil, + } + + // Create skyhookNodes + skyhookNodes := &skyhookNodes{ + skyhook: wrapper.NewSkyhookWrapper(skyhook), + nodes: []wrapper.SkyhookNode{skyhookNode1}, + compartments: make(map[string]*wrapper.Compartment), + } + skyhookNodes.AddCompartment("compartment-1", compartment) + + // Reset Updated flag + skyhookNodes.skyhook.Updated = false + + // Call ReportState + skyhookNodes.ReportState() + + // Verify Updated flag was NOT set since status didn't change + // Note: ReportState might set Updated for other fields, but we're checking + // that compartment status equality check is working + comp1Status := skyhookNodes.skyhook.Status.CompartmentStatuses["compartment-1"] + Expect(comp1Status.Matched).To(Equal(1)) + Expect(comp1Status.Completed).To(Equal(1)) + }) + + It("should persist batch state in compartment status", func() { + skyhook := &v1alpha1.Skyhook{ + ObjectMeta: metav1.ObjectMeta{Name: "test-skyhook"}, + Status: v1alpha1.SkyhookStatus{ + CompartmentStatuses: nil, + }, + } + + // Create nodes + node1 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}} + node2 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node2"}} + + skyhookNode1, err := wrapper.NewSkyhookNode(node1, skyhook) + Expect(err).NotTo(HaveOccurred()) + skyhookNode2, err := wrapper.NewSkyhookNode(node2, skyhook) + Expect(err).NotTo(HaveOccurred()) + + skyhookNode1.SetStatus(v1alpha1.StatusComplete) + skyhookNode2.SetStatus(v1alpha1.StatusInProgress) + + // Create compartment with strategy (to have batch state) + compartment := wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{ + Name: "compartment-with-strategy", + Budget: v1alpha1.DeploymentBudget{ + Count: ptr(2), + }, + Strategy: &v1alpha1.DeploymentStrategy{ + Fixed: &v1alpha1.FixedStrategy{ + InitialBatch: ptr(1), + }, + }, + }, nil) + compartment.AddNode(skyhookNode1) + compartment.AddNode(skyhookNode2) + + // Create skyhookNodes + skyhookNodes := &skyhookNodes{ + skyhook: wrapper.NewSkyhookWrapper(skyhook), + nodes: []wrapper.SkyhookNode{skyhookNode1, skyhookNode2}, + compartments: make(map[string]*wrapper.Compartment), + } + skyhookNodes.AddCompartment("compartment-with-strategy", compartment) + + // Call ReportState + skyhookNodes.ReportState() + + // Verify batch state was persisted + Expect(skyhookNodes.skyhook.Status.CompartmentStatuses).NotTo(BeNil()) + compStatus := skyhookNodes.skyhook.Status.CompartmentStatuses["compartment-with-strategy"] + Expect(compStatus.BatchState).NotTo(BeNil()) + Expect(compStatus.BatchState.CurrentBatch).To(Equal(1)) + }) + }) +})