Skip to content

Commit 25cf101

Browse files
authored
Merge pull request #6912 from mszacillo/resource-models
Implementing maxAvailableComponentSets for resource models
2 parents ef8930f + 6b9dba0 commit 25cf101

File tree

2 files changed

+581
-10
lines changed

2 files changed

+581
-10
lines changed

pkg/estimator/client/general.go

Lines changed: 252 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package client
1919
import (
2020
"context"
2121
"fmt"
22+
"maps"
2223
"math"
24+
"sort"
2325

2426
corev1 "k8s.io/api/core/v1"
2527
"k8s.io/apimachinery/pkg/api/resource"
@@ -54,7 +56,7 @@ func (ge *GeneralEstimator) MaxAvailableReplicas(_ context.Context, clusters []*
5456
}
5557

5658
func (ge *GeneralEstimator) maxAvailableReplicas(cluster *clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) int32 {
57-
//Note: resourceSummary must be deep-copied before using in the function to avoid modifying the original data structure.
59+
// Note: resourceSummary must be deep-copied before using in the function to avoid modifying the original data structure.
5860
resourceSummary := cluster.Status.ResourceSummary.DeepCopy()
5961
if resourceSummary == nil {
6062
return 0
@@ -149,8 +151,7 @@ func (ge *GeneralEstimator) maxAvailableComponentSets(cluster *clusterv1alpha1.C
149151
}
150152

151153
if features.FeatureGate.Enabled(features.CustomizedClusterResourceModeling) && len(cluster.Status.ResourceSummary.AllocatableModelings) > 0 {
152-
num, err := getMaximumSetsBasedOnResourceModels(cluster, components)
153-
if err != nil {
154+
if num, err := getMaximumSetsBasedOnResourceModels(cluster, components, podBound); err != nil {
154155
klog.Warningf("Failed to get maximum sets based on resource models, skipping: %v", err)
155156
} else if num < maxSets {
156157
maxSets = num
@@ -160,13 +161,254 @@ func (ge *GeneralEstimator) maxAvailableComponentSets(cluster *clusterv1alpha1.C
160161
return int32(maxSets) // #nosec G115: integer overflow conversion int64 -> int32
161162
}
162163

163-
// getMaximumSetsBasedOnResourceModels is a placeholder for future implementation.
164-
// It should refine the maximum sets based on cluster resource models, similar
165-
// to getMaximumReplicasBasedOnResourceModels but adapted to full component sets.
166-
func getMaximumSetsBasedOnResourceModels(_ *clusterv1alpha1.Cluster, _ []workv1alpha2.Component) (int64, error) {
167-
// TODO: implement logic based on cluster.Spec.ResourceModels
168-
// For now, just return MaxInt64 so it never reduces the upper bound.
169-
return math.MaxInt64, nil
164+
// getMaximumSetsBasedOnResourceModels computes the maximum number of full sets that can be
165+
// placed on a cluster using the cluster's ResourceModels. It expands one set into
166+
// replica kinds (demand + count) and performs a first-fit-decreasing placement onto model-grade nodes.
167+
// `upperBound` caps the search. We can set this using the podBound (allowedPods / podsPerSet)
168+
func getMaximumSetsBasedOnResourceModels(
169+
cluster *clusterv1alpha1.Cluster,
170+
components []workv1alpha2.Component,
171+
upperBound int64,
172+
) (int64, error) {
173+
if upperBound <= 0 {
174+
return 0, nil
175+
}
176+
177+
// Compressed one-set: per-kind (identical replicas grouped)
178+
oneSetKinds := expandKindsOneSet(components)
179+
if len(oneSetKinds) == 0 {
180+
// If there are no pods to schedule, just return upperBound
181+
return upperBound, nil
182+
}
183+
184+
// Use cluster "available" totals (allocatable - allocated - allocating) for normalized scoring
185+
// This reflects what the cluster can actually accept now
186+
totals := availableResourceMap(cluster.Status.ResourceSummary)
187+
188+
for i := range oneSetKinds {
189+
oneSetKinds[i].score = demandScoreNormalized(oneSetKinds[i].dem, totals)
190+
}
191+
sort.Slice(oneSetKinds, func(i, j int) bool {
192+
if oneSetKinds[i].score == oneSetKinds[j].score {
193+
return demandSum(oneSetKinds[i].dem) > demandSum(oneSetKinds[j].dem)
194+
}
195+
return oneSetKinds[i].score > oneSetKinds[j].score
196+
})
197+
198+
//Build model nodes from Spec.ResourceModels and Status.AllocatableModelings
199+
nodes, err := buildModelNodes(cluster)
200+
if err != nil {
201+
return -1, err
202+
}
203+
if len(nodes) == 0 {
204+
return 0, nil
205+
}
206+
207+
var sets int64
208+
for sets < upperBound {
209+
if !placeOneSet(oneSetKinds, nodes) {
210+
break
211+
}
212+
sets++
213+
}
214+
return sets, nil
215+
}
216+
217+
// placeOneSet attempts to place exactly ONE full set (all kinds with their per-set replica counts)
218+
// onto the provided working node capacities (in-place)
219+
// Returns true if successful
220+
func placeOneSet(orderedKinds []replicaKind, work []modelNode) bool {
221+
for _, k := range orderedKinds {
222+
remaining := k.count
223+
if remaining <= 0 {
224+
continue
225+
}
226+
// first-fit across nodes
227+
for n := range work {
228+
if remaining <= 0 {
229+
break
230+
}
231+
fit := maxFit(work[n].cap, k.dem)
232+
if fit <= 0 {
233+
continue
234+
}
235+
place := fit
236+
if place > remaining {
237+
place = remaining
238+
}
239+
consumeMul(work[n].cap, k.dem, place)
240+
remaining -= place
241+
}
242+
if remaining > 0 {
243+
return false
244+
}
245+
}
246+
return true
247+
}
248+
249+
// modelNode holds remaining capacity for a given node across all resource types
250+
type modelNode struct {
251+
cap map[corev1.ResourceName]int64
252+
}
253+
254+
// buildModelNodes constructs identical nodes for each model grade using its Min vector,
255+
// repeated AllocatableModelings[grade].Count times. Grades are indexed directly.
256+
func buildModelNodes(cluster *clusterv1alpha1.Cluster) ([]modelNode, error) {
257+
if cluster == nil {
258+
return nil, fmt.Errorf("nil cluster")
259+
}
260+
if cluster.Status.ResourceSummary == nil {
261+
return nil, fmt.Errorf("resource summary is nil")
262+
}
263+
spec := cluster.Spec.ResourceModels
264+
allocs := cluster.Status.ResourceSummary.AllocatableModelings
265+
if len(spec) == 0 {
266+
return nil, fmt.Errorf("no resource models defined")
267+
}
268+
269+
// Build capacity template per grade
270+
capsByGrade := make(map[uint]map[corev1.ResourceName]int64, len(spec))
271+
for _, m := range spec {
272+
tmpl := make(map[corev1.ResourceName]int64, len(m.Ranges))
273+
for _, r := range m.Ranges {
274+
tmpl[r.Name] = quantityAsInt64(r.Min)
275+
}
276+
capsByGrade[m.Grade] = tmpl
277+
}
278+
279+
// Accumulate counts by grade
280+
countByGrade := make(map[uint]int, len(allocs))
281+
for _, a := range allocs {
282+
if a.Count < 0 {
283+
return nil, fmt.Errorf("negative node count for grade %d", a.Grade)
284+
}
285+
countByGrade[a.Grade] += a.Count
286+
}
287+
288+
// Collect grades and sort, so that order of nodes is
289+
grades := make([]int, 0, len(capsByGrade))
290+
for g := range capsByGrade {
291+
grades = append(grades, int(g)) // #nosec G115: integer overflow conversion uint -> int
292+
}
293+
sort.Ints(grades)
294+
295+
// Emit nodes for grades present in both spec & status.
296+
var nodes []modelNode
297+
for _, grade := range grades {
298+
tmpl, cnt := capsByGrade[uint(grade)], countByGrade[uint(grade)] // #nosec G115: integer overflow conversion int -> uint
299+
if tmpl == nil || cnt == 0 {
300+
continue
301+
}
302+
for range cnt {
303+
capCopy := maps.Clone(tmpl)
304+
nodes = append(nodes, modelNode{cap: capCopy})
305+
}
306+
}
307+
return nodes, nil
308+
}
309+
310+
// replicaKind represents a single type of component, including replica demand and count
311+
type replicaKind struct {
312+
dem map[corev1.ResourceName]int64 // per-replica demand
313+
count int64 // how many replicas
314+
score float64 // ordering heuristic (higher first)
315+
}
316+
317+
// expandKindsOneSet flattens components into a slice of unique replica kinds.
318+
// Each entry holds the per-replica demand and how many replicas of that kind a set needs.
319+
func expandKindsOneSet(components []workv1alpha2.Component) []replicaKind {
320+
kinds := make([]replicaKind, 0, len(components))
321+
for _, c := range components {
322+
if c.ReplicaRequirements == nil || c.ReplicaRequirements.ResourceRequest == nil {
323+
continue
324+
}
325+
// normalize per-replica demand
326+
base := make(map[corev1.ResourceName]int64, len(c.ReplicaRequirements.ResourceRequest))
327+
for name, qty := range c.ReplicaRequirements.ResourceRequest {
328+
base[name] = quantityAsInt64(qty)
329+
}
330+
// skip zero-demand or non-positive replica count
331+
if allZero(base) || c.Replicas <= 0 {
332+
continue
333+
}
334+
335+
k := replicaKind{
336+
dem: base,
337+
count: int64(c.Replicas),
338+
// score is filled later once we know cluster-wide totals
339+
}
340+
kinds = append(kinds, k)
341+
}
342+
return kinds
343+
}
344+
345+
// demandScoreNormalized returns the "max utilization ratio" of a demand vector against total capacities
346+
// If a resource is missing/zero in total, treat it as maximally constrained
347+
func demandScoreNormalized(
348+
demand map[corev1.ResourceName]int64,
349+
total map[corev1.ResourceName]int64,
350+
) float64 {
351+
var maxRatio float64
352+
for res, req := range demand {
353+
if req <= 0 {
354+
continue
355+
}
356+
totalCap := float64(total[res])
357+
if totalCap <= 0 {
358+
return math.MaxFloat64
359+
}
360+
ratio := float64(req) / totalCap
361+
if ratio > maxRatio {
362+
maxRatio = ratio
363+
}
364+
}
365+
return maxRatio
366+
}
367+
368+
// demandSum is used as a tie-breaker when initial scores are equal
369+
func demandSum(m map[corev1.ResourceName]int64) int64 {
370+
var s int64
371+
for _, v := range m {
372+
if v > 0 {
373+
s += v
374+
}
375+
}
376+
return s
377+
}
378+
379+
// maxFit returns how many copies of `dem` fit in `cap` simultaneously
380+
func maxFit(capacity map[corev1.ResourceName]int64, dem map[corev1.ResourceName]int64) int64 {
381+
var limit int64 = math.MaxInt64
382+
for k, req := range dem {
383+
if req <= 0 {
384+
continue
385+
}
386+
avail := capacity[k]
387+
if avail <= 0 {
388+
return 0
389+
}
390+
bound := avail / req
391+
if bound < limit {
392+
limit = bound
393+
}
394+
}
395+
if limit == math.MaxInt64 {
396+
return 0
397+
}
398+
return limit
399+
}
400+
401+
// consumeMul subtracts mult * dem from cap
402+
func consumeMul(capacity map[corev1.ResourceName]int64, dem map[corev1.ResourceName]int64, mult int64) {
403+
if mult <= 0 {
404+
return
405+
}
406+
for k, req := range dem {
407+
if req <= 0 {
408+
continue
409+
}
410+
capacity[k] -= req * mult
411+
}
170412
}
171413

172414
// podsInSet computes the total number of pods in the CRD

0 commit comments

Comments
 (0)