Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions chart/templates/skyhook-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,39 @@ spec:
status:
description: SkyhookStatus defines the observed state of Skyhook
properties:
compartmentBatchStates:
additionalProperties:
description: BatchProcessingState tracks the current state of batch
processing for a compartment
properties:
completedNodes:
description: Total number of nodes that have completed successfully
(cumulative across all batches)
type: integer
consecutiveFailures:
description: Number of consecutive failures
type: integer
currentBatch:
description: Current batch number (starts at 1)
type: integer
failedNodes:
description: Total number of nodes that have failed (cumulative
across all batches)
type: integer
lastBatchFailed:
description: Whether the last batch failed (for slowdown logic)
type: boolean
lastBatchSize:
description: Last batch size (for slowdown calculations)
type: integer
shouldStop:
description: Whether the strategy should stop processing due
to failures
type: boolean
type: object
description: CompartmentBatchStates tracks batch processing state
per compartment
type: object
completeNodes:
default: 0/0
description: |-
Expand Down
197 changes: 192 additions & 5 deletions operator/api/v1alpha1/deployment_policy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
* limitations under the License.
*/

/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package v1alpha1

import (
Expand Down Expand Up @@ -242,6 +237,198 @@ func (s *DeploymentStrategy) Validate() error {
return nil
}

// BatchProcessingState tracks the current state of batch processing for a compartment
type BatchProcessingState struct {
// Current batch number (starts at 1)
CurrentBatch int `json:"currentBatch,omitempty"`
// Number of consecutive failures
ConsecutiveFailures int `json:"consecutiveFailures,omitempty"`
// Total number of nodes that have completed successfully (cumulative across all batches)
CompletedNodes int `json:"completedNodes,omitempty"`
// Total number of nodes that have failed (cumulative across all batches)
FailedNodes int `json:"failedNodes,omitempty"`
// Whether the strategy should stop processing due to failures
ShouldStop bool `json:"shouldStop,omitempty"`
// Last batch size (for slowdown calculations)
LastBatchSize int `json:"lastBatchSize,omitempty"`
// Whether the last batch failed (for slowdown logic)
LastBatchFailed bool `json:"lastBatchFailed,omitempty"`
}

// CalculateBatchSize calculates the next batch size based on the strategy
func (s *DeploymentStrategy) CalculateBatchSize(totalNodes int, state *BatchProcessingState) int {
switch {
case s.Fixed != nil:
return s.Fixed.CalculateBatchSize(totalNodes, state)
case s.Linear != nil:
return s.Linear.CalculateBatchSize(totalNodes, state)
case s.Exponential != nil:
return s.Exponential.CalculateBatchSize(totalNodes, state)
default:
return 1 // fallback
}
}

// EvaluateBatchResult evaluates the result of a batch and records the outcome
func (s *DeploymentStrategy) EvaluateBatchResult(state *BatchProcessingState, batchSize int, successCount int, failureCount int, totalNodes int) {
// Note: successCount and failureCount are deltas from the current batch
// CompletedNodes and FailedNodes are already updated in EvaluateCurrentBatch before this is called

// Avoid divide by zero
if batchSize == 0 {
return
}

// Calculate success percentage for this batch
successPercentage := (successCount * 100) / batchSize

// Calculate overall progress percentage
processedNodes := state.CompletedNodes + state.FailedNodes
var progressPercent int
if totalNodes > 0 {
progressPercent = (processedNodes * 100) / totalNodes
}

// Record the batch outcome
batchFailed := successPercentage < s.getBatchThreshold()
state.LastBatchSize = batchSize
state.LastBatchFailed = batchFailed

if batchFailed {
state.ConsecutiveFailures++
// Check if we should stop processing
if progressPercent < s.getSafetyLimit() && state.ConsecutiveFailures >= s.getFailureThreshold() {
state.ShouldStop = true
}
} else {
state.ConsecutiveFailures = 0
}

state.CurrentBatch++
}

// getBatchThreshold returns the batch threshold from the active strategy
func (s *DeploymentStrategy) getBatchThreshold() int {
switch {
case s.Fixed != nil:
return *s.Fixed.BatchThreshold
case s.Linear != nil:
return *s.Linear.BatchThreshold
case s.Exponential != nil:
return *s.Exponential.BatchThreshold
default:
return 100
}
}

// getSafetyLimit returns the safety limit from the active strategy
func (s *DeploymentStrategy) getSafetyLimit() int {
switch {
case s.Fixed != nil:
return *s.Fixed.SafetyLimit
case s.Linear != nil:
return *s.Linear.SafetyLimit
case s.Exponential != nil:
return *s.Exponential.SafetyLimit
default:
return 50
}
}

// getFailureThreshold returns the failure threshold from the active strategy
func (s *DeploymentStrategy) getFailureThreshold() int {
switch {
case s.Fixed != nil:
return *s.Fixed.FailureThreshold
case s.Linear != nil:
return *s.Linear.FailureThreshold
case s.Exponential != nil:
return *s.Exponential.FailureThreshold
default:
return 3
}
}

func (s *FixedStrategy) CalculateBatchSize(totalNodes int, state *BatchProcessingState) int {
// Fixed strategy doesn't change batch size, but respects remaining nodes
batchSize := *s.InitialBatch
processedNodes := state.CompletedNodes + state.FailedNodes
remaining := totalNodes - processedNodes
if batchSize > remaining {
batchSize = remaining
}
return max(1, batchSize)
}

func (s *LinearStrategy) CalculateBatchSize(totalNodes int, state *BatchProcessingState) int {
// Avoid divide by zero
if totalNodes == 0 {
return 0
}

var batchSize int
if state.LastBatchSize > 0 {
// Calculate next size based on last batch outcome
processedNodes := state.CompletedNodes + state.FailedNodes
progressPercent := (processedNodes * 100) / totalNodes

if state.LastBatchFailed && progressPercent < *s.SafetyLimit {
// Slow down: reduce by delta
batchSize = max(1, state.LastBatchSize-*s.Delta)
} else {
// Normal growth: grow by delta
batchSize = state.LastBatchSize + *s.Delta
}
} else {
// First batch: use initial batch size
batchSize = *s.InitialBatch
}

processedNodes := state.CompletedNodes + state.FailedNodes
remaining := totalNodes - processedNodes
if batchSize > remaining {
batchSize = remaining
}
return max(1, batchSize)
}

func (s *ExponentialStrategy) CalculateBatchSize(totalNodes int, state *BatchProcessingState) int {
// Avoid divide by zero
if totalNodes == 0 {
return 0
}

var batchSize int
if state.LastBatchSize > 0 && *s.GrowthFactor > 0 {
// Calculate next size based on last batch outcome
processedNodes := state.CompletedNodes + state.FailedNodes
progressPercent := (processedNodes * 100) / totalNodes

if state.LastBatchFailed && progressPercent < *s.SafetyLimit {
// Slow down: divide by growth factor
batchSize = max(1, state.LastBatchSize / *s.GrowthFactor)
} else {
// Normal growth: multiply by growth factor
batchSize = state.LastBatchSize * *s.GrowthFactor
}

// Cap at total nodes to prevent unreasonably large batch sizes
if batchSize > totalNodes {
batchSize = totalNodes
}
} else {
// First batch: use initial batch size
batchSize = *s.InitialBatch
}

processedNodes := state.CompletedNodes + state.FailedNodes
remaining := totalNodes - processedNodes
if batchSize > remaining {
batchSize = remaining
}
return max(1, batchSize)
}

// Validate validates the Compartment
func (c *Compartment) Validate() error {
// Validate compartment budget
Expand Down
3 changes: 3 additions & 0 deletions operator/api/v1alpha1/skyhook_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ type SkyhookStatus struct {
// ConfigUpdates tracks config updates
ConfigUpdates map[string][]string `json:"configUpdates,omitempty"`

// CompartmentBatchStates tracks batch processing state per compartment
CompartmentBatchStates map[string]BatchProcessingState `json:"compartmentBatchStates,omitempty"`

// +kubebuilder:example=3
// +kubebuilder:default=0
// NodesInProgress displays the number of nodes that are currently in progress and is
Expand Down
26 changes: 24 additions & 2 deletions operator/api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,39 @@ spec:
status:
description: SkyhookStatus defines the observed state of Skyhook
properties:
compartmentBatchStates:
additionalProperties:
description: BatchProcessingState tracks the current state of batch
processing for a compartment
properties:
completedNodes:
description: Total number of nodes that have completed successfully
(cumulative across all batches)
type: integer
consecutiveFailures:
description: Number of consecutive failures
type: integer
currentBatch:
description: Current batch number (starts at 1)
type: integer
failedNodes:
description: Total number of nodes that have failed (cumulative
across all batches)
type: integer
lastBatchFailed:
description: Whether the last batch failed (for slowdown logic)
type: boolean
lastBatchSize:
description: Last batch size (for slowdown calculations)
type: integer
shouldStop:
description: Whether the strategy should stop processing due
to failures
type: boolean
type: object
description: CompartmentBatchStates tracks batch processing state
per compartment
type: object
completeNodes:
default: 0/0
description: |-
Expand Down
Loading