Skip to content

Commit f383f36

Browse files
committed
only persist key info
1 parent eb0312a commit f383f36

File tree

6 files changed

+92
-108
lines changed

6 files changed

+92
-108
lines changed

operator/api/v1alpha1/deployment_policy_types.go

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -239,17 +239,13 @@ type BatchProcessingState struct {
239239
CurrentBatch int `json:"currentBatch,omitempty"`
240240
// Number of consecutive failures
241241
ConsecutiveFailures int `json:"consecutiveFailures,omitempty"`
242-
// Total number of nodes processed so far
243-
ProcessedNodes int `json:"processedNodes,omitempty"`
244-
// Number of successful nodes in current batch
245-
SuccessfulInBatch int `json:"successfulInBatch,omitempty"`
246-
// Number of failed nodes in current batch
247-
FailedInBatch int `json:"failedInBatch,omitempty"`
242+
// Total number of nodes that have completed successfully (cumulative across all batches)
243+
CompletedNodes int `json:"completedNodes,omitempty"`
244+
// Total number of nodes that have failed (cumulative across all batches)
245+
FailedNodes int `json:"failedNodes,omitempty"`
248246
// Whether the strategy should stop processing due to failures
249247
ShouldStop bool `json:"shouldStop,omitempty"`
250-
// Names of nodes in the current batch (persisted across reconciles)
251-
CurrentBatchNodes []string `json:"currentBatchNodes,omitempty"`
252-
// Last successful batch size (for slowdown calculations)
248+
// Last batch size (for slowdown calculations)
253249
LastBatchSize int `json:"lastBatchSize,omitempty"`
254250
// Whether the last batch failed (for slowdown logic)
255251
LastBatchFailed bool `json:"lastBatchFailed,omitempty"`
@@ -271,19 +267,22 @@ func (s *DeploymentStrategy) CalculateBatchSize(totalNodes int, state *BatchProc
271267

272268
// EvaluateBatchResult evaluates the result of a batch and updates state
273269
func (s *DeploymentStrategy) EvaluateBatchResult(state *BatchProcessingState, batchSize int, successCount int, failureCount int, totalNodes int) {
274-
state.SuccessfulInBatch = successCount
275-
state.FailedInBatch = failureCount
276-
state.ProcessedNodes += batchSize
270+
// Note: successCount and failureCount are deltas from the current batch
271+
// CompletedNodes and FailedNodes are already updated in EvaluateCurrentBatch before this is called
277272

278273
// Avoid divide by zero
279274
if batchSize == 0 {
280275
return
281276
}
282277

278+
// Calculate success percentage for this batch
283279
successPercentage := (successCount * 100) / batchSize
280+
281+
// Calculate overall progress percentage
282+
processedNodes := state.CompletedNodes + state.FailedNodes
284283
var progressPercent int
285284
if totalNodes > 0 {
286-
progressPercent = (state.ProcessedNodes * 100) / totalNodes
285+
progressPercent = (processedNodes * 100) / totalNodes
287286
}
288287

289288
if successPercentage >= s.getBatchThreshold() {
@@ -346,7 +345,8 @@ func (s *DeploymentStrategy) getFailureThreshold() int {
346345
func (s *FixedStrategy) CalculateBatchSize(totalNodes int, state *BatchProcessingState) int {
347346
// Fixed strategy doesn't change batch size, but respects remaining nodes
348347
batchSize := *s.InitialBatch
349-
remaining := totalNodes - state.ProcessedNodes
348+
processedNodes := state.CompletedNodes + state.FailedNodes
349+
remaining := totalNodes - processedNodes
350350
if batchSize > remaining {
351351
batchSize = remaining
352352
}
@@ -362,7 +362,8 @@ func (s *LinearStrategy) CalculateBatchSize(totalNodes int, state *BatchProcessi
362362
}
363363

364364
// Check if we should slow down due to last batch failure
365-
progressPercent := (state.ProcessedNodes * 100) / totalNodes
365+
processedNodes := state.CompletedNodes + state.FailedNodes
366+
progressPercent := (processedNodes * 100) / totalNodes
366367
if state.LastBatchFailed && progressPercent < *s.SafetyLimit && state.LastBatchSize > 0 {
367368
// Slow down: reduce by delta from last batch size
368369
batchSize = max(1, state.LastBatchSize-*s.Delta)
@@ -371,7 +372,7 @@ func (s *LinearStrategy) CalculateBatchSize(totalNodes int, state *BatchProcessi
371372
batchSize = *s.InitialBatch + (state.CurrentBatch-1)*(*s.Delta)
372373
}
373374

374-
remaining := totalNodes - state.ProcessedNodes
375+
remaining := totalNodes - processedNodes
375376
if batchSize > remaining {
376377
batchSize = remaining
377378
}
@@ -387,7 +388,8 @@ func (s *ExponentialStrategy) CalculateBatchSize(totalNodes int, state *BatchPro
387388
}
388389

389390
// Check if we should slow down due to last batch failure
390-
progressPercent := (state.ProcessedNodes * 100) / totalNodes
391+
processedNodes := state.CompletedNodes + state.FailedNodes
392+
progressPercent := (processedNodes * 100) / totalNodes
391393
if state.LastBatchFailed && progressPercent < *s.SafetyLimit && state.LastBatchSize > 0 && *s.GrowthFactor > 0 {
392394
// Slow down: divide last batch size by growth factor
393395
batchSize = max(1, state.LastBatchSize / *s.GrowthFactor)
@@ -405,7 +407,7 @@ func (s *ExponentialStrategy) CalculateBatchSize(totalNodes int, state *BatchPro
405407
}
406408
}
407409

408-
remaining := totalNodes - state.ProcessedNodes
410+
remaining := totalNodes - processedNodes
409411
if batchSize > remaining {
410412
batchSize = remaining
411413
}

operator/api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 1 addition & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

operator/config/crd/bases/skyhook.nvidia.com_skyhooks.yaml

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -504,37 +504,30 @@ spec:
504504
description: BatchProcessingState tracks the current state of batch
505505
processing for a compartment
506506
properties:
507+
completedNodes:
508+
description: Total number of nodes that have completed successfully
509+
(cumulative across all batches)
510+
type: integer
507511
consecutiveFailures:
508512
description: Number of consecutive failures
509513
type: integer
510514
currentBatch:
511515
description: Current batch number (starts at 1)
512516
type: integer
513-
currentBatchNodes:
514-
description: Names of nodes in the current batch (persisted
515-
across reconciles)
516-
items:
517-
type: string
518-
type: array
519-
failedInBatch:
520-
description: Number of failed nodes in current batch
517+
failedNodes:
518+
description: Total number of nodes that have failed (cumulative
519+
across all batches)
521520
type: integer
522521
lastBatchFailed:
523522
description: Whether the last batch failed (for slowdown logic)
524523
type: boolean
525524
lastBatchSize:
526-
description: Last successful batch size (for slowdown calculations)
527-
type: integer
528-
processedNodes:
529-
description: Total number of nodes processed so far
525+
description: Last batch size (for slowdown calculations)
530526
type: integer
531527
shouldStop:
532528
description: Whether the strategy should stop processing due
533529
to failures
534530
type: boolean
535-
successfulInBatch:
536-
description: Number of successful nodes in current batch
537-
type: integer
538531
type: object
539532
description: CompartmentBatchStates tracks batch processing state
540533
per compartment

operator/internal/controller/cluster_state_v2.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -479,9 +479,11 @@ func PersistCompartmentBatchStates(skyhook SkyhookNodes) bool {
479479

480480
changed := false
481481
for _, compartment := range compartments {
482-
// Only persist if there are nodes in the current batch
483-
if len(compartment.GetBatchState().CurrentBatchNodes) > 0 {
484-
skyhook.GetSkyhook().Status.CompartmentBatchStates[compartment.GetName()] = compartment.GetBatchState()
482+
// Always persist batch state to maintain cumulative counters
483+
batchState := compartment.GetBatchState()
484+
// Only persist if there's meaningful state (batch has started or there are nodes)
485+
if batchState.CurrentBatch > 0 || len(compartment.GetNodes()) > 0 {
486+
skyhook.GetSkyhook().Status.CompartmentBatchStates[compartment.GetName()] = batchState
485487
changed = true
486488
}
487489
}

operator/internal/wrapper/compartment.go

Lines changed: 40 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -100,24 +100,23 @@ func (c *Compartment) GetNodesForNextBatch() []SkyhookNode {
100100
return nil
101101
}
102102

103-
if len(c.BatchState.CurrentBatchNodes) > 0 {
104-
return c.getCurrentBatchNodes()
103+
// If there's a batch in progress (nodes are InProgress), don't start a new one
104+
if c.getInProgressCount() > 0 {
105+
return c.getInProgressNodes()
105106
}
106107

108+
// No batch in progress, create a new one
107109
return c.createNewBatch()
108110
}
109111

110-
func (c *Compartment) getCurrentBatchNodes() []SkyhookNode {
111-
currentBatchNodes := make([]SkyhookNode, 0)
112-
for _, nodeName := range c.BatchState.CurrentBatchNodes {
113-
for _, node := range c.Nodes {
114-
if node.GetNode().Name == nodeName {
115-
currentBatchNodes = append(currentBatchNodes, node)
116-
break
117-
}
112+
func (c *Compartment) getInProgressNodes() []SkyhookNode {
113+
inProgressNodes := make([]SkyhookNode, 0)
114+
for _, node := range c.Nodes {
115+
if node.Status() == v1alpha1.StatusInProgress {
116+
inProgressNodes = append(inProgressNodes, node)
118117
}
119118
}
120-
return currentBatchNodes
119+
return inProgressNodes
121120
}
122121

123122
func (c *Compartment) createNewBatch() []SkyhookNode {
@@ -154,66 +153,54 @@ func (c *Compartment) createNewBatch() []SkyhookNode {
154153
}
155154
}
156155

157-
nodeNames := make([]string, len(selectedNodes))
158-
for i, node := range selectedNodes {
159-
nodeNames[i] = node.GetNode().Name
160-
}
161-
c.BatchState.CurrentBatchNodes = nodeNames
162-
163156
return selectedNodes
164157
}
165158

166159
// IsBatchComplete checks if the current batch has reached terminal states
160+
// A batch is complete when there are no nodes in InProgress status
167161
func (c *Compartment) IsBatchComplete() bool {
168-
if len(c.BatchState.CurrentBatchNodes) == 0 {
169-
return true // No batch in progress
170-
}
171-
172-
// Check if all batch nodes have reached terminal states
173-
for _, nodeName := range c.BatchState.CurrentBatchNodes {
174-
for _, node := range c.Nodes {
175-
if node.GetNode().Name == nodeName {
176-
if node.Status() == v1alpha1.StatusInProgress {
177-
return false // Still processing
178-
}
179-
break
180-
}
181-
}
182-
}
183-
return true // All nodes are Complete or Erroring
162+
return c.getInProgressCount() == 0
184163
}
185164

186165
// EvaluateCurrentBatch evaluates the current batch result if it's complete
166+
// Uses delta-based tracking: compares current state to last checkpoint
187167
func (c *Compartment) EvaluateCurrentBatch() (bool, int, int) {
188168
if !c.IsBatchComplete() {
189169
return false, 0, 0 // Batch not complete yet
190170
}
191171

192-
if len(c.BatchState.CurrentBatchNodes) == 0 {
193-
return false, 0, 0 // No batch to evaluate
172+
// If this is the first batch (nothing has been processed yet), skip evaluation
173+
// The batch will be started in the next reconcile
174+
if c.BatchState.CurrentBatch == 0 {
175+
c.BatchState.CurrentBatch = 1
176+
return false, 0, 0
194177
}
195178

196-
successCount := 0
197-
failureCount := 0
198-
199-
// Count successes and failures from the batch nodes
200-
for _, nodeName := range c.BatchState.CurrentBatchNodes {
201-
for _, node := range c.Nodes {
202-
if node.GetNode().Name == nodeName {
203-
if node.IsComplete() {
204-
successCount++
205-
} else if node.Status() == v1alpha1.StatusErroring {
206-
failureCount++
207-
}
208-
break
209-
}
179+
// Count current state in the compartment
180+
currentCompleted := 0
181+
currentFailed := 0
182+
for _, node := range c.Nodes {
183+
if node.IsComplete() {
184+
currentCompleted++
185+
} else if node.Status() == v1alpha1.StatusErroring {
186+
currentFailed++
210187
}
211188
}
212189

213-
// Clear the current batch since we're evaluating it
214-
c.BatchState.CurrentBatchNodes = nil
190+
// Calculate delta from last checkpoint
191+
deltaCompleted := currentCompleted - c.BatchState.CompletedNodes
192+
deltaFailed := currentFailed - c.BatchState.FailedNodes
193+
194+
// Only evaluate if there's actually a change (batch was processed)
195+
if deltaCompleted == 0 && deltaFailed == 0 {
196+
return false, 0, 0
197+
}
198+
199+
// Update checkpoints
200+
c.BatchState.CompletedNodes = currentCompleted
201+
c.BatchState.FailedNodes = currentFailed
215202

216-
return true, successCount, failureCount
203+
return true, deltaCompleted, deltaFailed
217204
}
218205

219206
// EvaluateAndUpdateBatchState evaluates a completed batch and updates the persistent state
@@ -223,10 +210,8 @@ func (c *Compartment) EvaluateAndUpdateBatchState(batchSize int, successCount in
223210
c.Strategy.EvaluateBatchResult(&c.BatchState, batchSize, successCount, failureCount, len(c.Nodes))
224211
} else {
225212
// No strategy: just update basic counters
226-
c.BatchState.ProcessedNodes += batchSize
227-
c.BatchState.SuccessfulInBatch = successCount
228-
c.BatchState.FailedInBatch = failureCount
229213
c.BatchState.CurrentBatch++
214+
c.BatchState.LastBatchSize = batchSize
230215
}
231216
}
232217

0 commit comments

Comments
 (0)