Skip to content

Commit 4b5aa6b

Browse files
authored
Merge pull request #6902 from zhzhuang-zju/components
multi-components scheduling: refactor the logic of node resource plugin
2 parents ae55186 + 035afa6 commit 4b5aa6b

File tree

2 files changed

+76
-96
lines changed

2 files changed

+76
-96
lines changed

pkg/estimator/server/framework/plugins/noderesource/noderesource.go

Lines changed: 73 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ const (
3939
Name = "NodeResourceEstimator"
4040
// nodeResourceEstimator is enabled by default.
4141
enabled = true
42+
43+
// noNodeConstraint represents the value when there is no node resource constraint.
44+
noNodeConstraint = math.MaxInt32
4245
)
4346

4447
// nodeResourceEstimator is to estimate how many replicas/sets allowed by the node resources for a given pb.ReplicaRequirements.
@@ -67,7 +70,7 @@ func (pl *nodeResourceEstimator) Name() string {
6770
func (pl *nodeResourceEstimator) Estimate(ctx context.Context, snapshot *schedcache.Snapshot, requirements *pb.ReplicaRequirements) (int32, *framework.Result) {
6871
if !pl.enabled {
6972
klog.V(5).Info("Estimator Plugin", "name", Name, "enabled", pl.enabled)
70-
return math.MaxInt32, framework.NewResult(framework.Noopperation, fmt.Sprintf("%s is disabled", pl.Name()))
73+
return noNodeConstraint, framework.NewResult(framework.Noopperation, fmt.Sprintf("%s is disabled", pl.Name()))
7174
}
7275

7376
allNodes, err := snapshot.NodeInfos().List()
@@ -107,24 +110,27 @@ func (pl *nodeResourceEstimator) nodeMaxAvailableReplica(node *schedulerframewor
107110
return int32(rest.MaxDivided(rl)) // #nosec G115: integer overflow conversion int64 -> int32
108111
}
109112

110-
// EstimateComponents the sets allowed by the node resources for a given pb.Component.
113+
// EstimateComponents estimates the maximum number of complete component sets that can be scheduled.
114+
// It returns the number of sets that can fit on the available node resources.
111115
func (pl *nodeResourceEstimator) EstimateComponents(_ context.Context, snapshot *schedcache.Snapshot, components []pb.Component) (int32, *framework.Result) {
112116
if !pl.enabled {
113117
klog.V(5).Info("Estimator Plugin", "name", Name, "enabled", pl.enabled)
114-
return math.MaxInt32, framework.NewResult(framework.Noopperation, fmt.Sprintf("%s is disabled", pl.Name()))
118+
return noNodeConstraint, framework.NewResult(framework.Noopperation, fmt.Sprintf("%s is disabled", pl.Name()))
115119
}
116120

117121
if len(components) == 0 {
118-
return 0, framework.AsResult(fmt.Errorf("no components specified"))
122+
klog.V(5).Infof("%s: received empty components list", pl.Name())
123+
return noNodeConstraint, framework.NewResult(framework.Noopperation, fmt.Sprintf("%s received empty components list", pl.Name()))
119124
}
120125

121-
nodes, err := getNodeRestResource(snapshot)
126+
nodes, err := getNodesAvailableResources(snapshot)
122127
if err != nil {
123128
return 0, framework.AsResult(err)
124129
}
125130

126131
var sets int32
127-
for canAssignOneComponentSets(newTasks(components), nodes) {
132+
// Keep scheduling full component sets until one fails to fit.
133+
for scheduleComponentSet(components, nodes) {
128134
sets++
129135
}
130136

@@ -134,11 +140,10 @@ func (pl *nodeResourceEstimator) EstimateComponents(_ context.Context, snapshot
134140
return sets, framework.NewResult(framework.Success)
135141
}
136142

137-
// getNodeRestResource calculates the remaining available resources for each node in the cluster.
138-
// It clones each node and subtracts the already requested resources and existing pod count
139-
// to determine how much capacity is left for new workloads.
140-
// Returns a slice of NodeInfo with updated allocatable resources representing available capacity.
141-
func getNodeRestResource(snapshot *schedcache.Snapshot) ([]*schedulerframework.NodeInfo, error) {
143+
// getNodesAvailableResources retrieves and prepares the list of node information from the snapshot.
144+
// It clones each node's info and adjusts the allocatable resources by subtracting the requested resources.
145+
// So that the returned node infos reflect the actual available resources for scheduling.
146+
func getNodesAvailableResources(snapshot *schedcache.Snapshot) ([]*schedulerframework.NodeInfo, error) {
142147
allNodes, err := snapshot.NodeInfos().List()
143148
if err != nil {
144149
return nil, err
@@ -155,116 +160,91 @@ func getNodeRestResource(snapshot *schedcache.Snapshot) ([]*schedulerframework.N
155160
return rest, nil
156161
}
157162

158-
// canAssignOneComponentSets attempts to schedule one complete set of components across the available nodes.
163+
// scheduleComponentSet attempts to schedule one complete set of components across the available nodes.
159164
// It returns true if all components in the set can be successfully scheduled, false otherwise.
160165
// The function modifies the node resources as it assigns replicas to simulate actual scheduling.
161-
func canAssignOneComponentSets(ts *tasks, allNodes []*schedulerframework.NodeInfo) bool {
162-
for !ts.done() {
163-
i, t := ts.getTask()
164-
if i == -1 {
165-
// No more tasks to schedule, but done() returned false - this shouldn't happen
166+
func scheduleComponentSet(components []pb.Component, allNodes []*schedulerframework.NodeInfo) bool {
167+
for _, component := range components {
168+
if !scheduleComponent(component, allNodes) {
166169
return false
167170
}
171+
}
168172

169-
scheduled := false
170-
for _, node := range allNodes {
171-
if !matchNode(t, node) {
172-
continue
173-
}
174-
needResource := util.NewResource(t.ResourceRequest)
175-
needResource.AllowedPodNumber = 1
176-
if node.Allocatable.Allocatable(needResource) {
177-
// Assign one replica to this node.
178-
node.Allocatable.SubResource(needResource)
179-
ts.scheduleOne(i)
180-
scheduled = true
181-
break
182-
}
173+
return true
174+
}
175+
176+
// scheduleComponent attempts to schedule all replicas of a single component across the available nodes.
177+
// It iterates through nodes to find suitable ones and schedules as many replicas as possible on each node.
178+
// Returns true if all replicas of the component can be successfully scheduled, false otherwise.
179+
func scheduleComponent(component pb.Component, allNodes []*schedulerframework.NodeInfo) bool {
180+
t := newSchedulingTask(component)
181+
182+
for _, node := range allNodes {
183+
if !matchNode(t.nodeClaim, node) {
184+
continue
183185
}
184186

185-
if !scheduled {
186-
// No node can fit this task, cannot complete the component set
187-
return false
187+
for node.Allocatable.Allocatable(t.requiredResourcePerReplica) {
188+
// Assign one replica to this node.
189+
t.scheduleOnePod(node)
190+
if t.done() {
191+
// short path
192+
return true
193+
}
188194
}
189195
}
190196

191-
return ts.done()
197+
return t.done()
192198
}
193199

194-
// task represents a single component type with its scheduling requirements and remaining replicas.
195-
type task struct {
196-
// replicaRequirements defines the resource and scheduling constraints for each replica
197-
replicaRequirements pb.ReplicaRequirements
200+
// componentSchedulingTask represents a single component scheduling task with its requirements and state.
201+
type componentSchedulingTask struct {
202+
// nodeClaim represents the NodeAffinity, NodeSelector and Tolerations required by this component.
203+
nodeClaim *pb.NodeClaim
204+
// requiredResourcePerReplica represents the resources required by a single replica of this component.
205+
requiredResourcePerReplica *util.Resource
198206
// toBeScheduled tracks how many replicas of this component still need to be scheduled
199207
toBeScheduled int32
200208
}
201209

202-
// tasks manages a collection of component tasks for scheduling estimation.
203-
// It tracks the remaining replicas for each component type that need to be scheduled.
204-
type tasks struct {
205-
// items contains all component tasks to be scheduled
206-
items []task
207-
}
208-
209-
// newTasks creates a new task collection from the given components.
210-
// Each component is converted to a task with its replica requirements and count.
211-
func newTasks(components []pb.Component) *tasks {
212-
ts := make([]task, 0, len(components))
213-
for _, component := range components {
214-
ts = append(ts, task{
215-
replicaRequirements: component.ReplicaRequirements,
216-
toBeScheduled: component.Replicas,
217-
})
218-
}
219-
220-
return &tasks{
221-
items: ts,
222-
}
223-
}
224-
225-
// getTask returns the index and replica requirements of the first task that still needs to be scheduled.
226-
// It scans through all tasks to find one with remaining replicas to schedule.
227-
// Returns (-1, empty ReplicaRequirements) if no unfinished tasks are found.
228-
func (t *tasks) getTask() (int, pb.ReplicaRequirements) {
229-
for i := 0; i < len(t.items); i++ {
230-
if t.items[i].toBeScheduled > 0 {
231-
return i, t.items[i].replicaRequirements
232-
}
210+
// newSchedulingTask creates a new component scheduling task from the given component.
211+
// It initializes the task with the component's node claim, required resources per replica, and total replicas to be scheduled.
212+
func newSchedulingTask(component pb.Component) componentSchedulingTask {
213+
needResource := util.NewResource(component.ReplicaRequirements.ResourceRequest)
214+
needResource.AllowedPodNumber = 1
215+
return componentSchedulingTask{
216+
nodeClaim: component.ReplicaRequirements.NodeClaim,
217+
requiredResourcePerReplica: needResource,
218+
toBeScheduled: component.Replicas,
233219
}
234-
235-
return -1, pb.ReplicaRequirements{}
236220
}
237221

238-
// done returns true if all tasks have been completely scheduled (no replicas remaining).
239-
// This indicates that a complete component set has been successfully allocated.
240-
func (t *tasks) done() bool {
241-
for _, tk := range t.items {
242-
if tk.toBeScheduled > 0 {
243-
return false
244-
}
245-
}
246-
return true
222+
// done returns true if the task has been completely scheduled (no replicas remaining).
223+
// This indicates that a complete component has been successfully allocated.
224+
func (t *componentSchedulingTask) done() bool {
225+
return t.toBeScheduled == 0
247226
}
248227

249-
// scheduleOne decrements the replica count for the task at the specified index.
228+
// scheduleOnePod schedules one replica of this component on the specified node.
229+
// It decrements the remaining replica count and subtracts the required resources from the node.
250230
// This should be called when a replica has been successfully scheduled on a node.
251-
func (t *tasks) scheduleOne(index int) {
252-
if index < 0 || index >= len(t.items) {
253-
// Invalid index - defensive programming
231+
func (t *componentSchedulingTask) scheduleOnePod(node *schedulerframework.NodeInfo) {
232+
if t.toBeScheduled <= 0 {
233+
// No more replicas to schedule
254234
return
255235
}
256-
if t.items[index].toBeScheduled > 0 {
257-
t.items[index].toBeScheduled--
258-
}
236+
237+
node.Allocatable.SubResource(t.requiredResourcePerReplica)
238+
t.toBeScheduled--
259239
}
260240

261-
// matchNode checks whether the node matches the replicaRequirements' node affinity and tolerations.
262-
func matchNode(replicaRequirements pb.ReplicaRequirements, node *schedulerframework.NodeInfo) bool {
263-
affinity := nodeutil.GetRequiredNodeAffinity(replicaRequirements)
241+
// matchNode checks whether the node matches the scheduling constraints defined in the replica requirements.
242+
func matchNode(nodeClaim *pb.NodeClaim, node *schedulerframework.NodeInfo) bool {
243+
affinity := nodeutil.GetRequiredNodeAffinity(pb.ReplicaRequirements{NodeClaim: nodeClaim})
264244
var tolerations []corev1.Toleration
265245

266-
if replicaRequirements.NodeClaim != nil {
267-
tolerations = replicaRequirements.NodeClaim.Tolerations
246+
if nodeClaim != nil {
247+
tolerations = nodeClaim.Tolerations
268248
}
269249

270250
if !nodeutil.IsNodeAffinityMatched(node.Node(), affinity) || !nodeutil.IsTolerationMatched(node.Node(), tolerations) {

pkg/estimator/server/framework/plugins/noderesource/noderesource_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,8 @@ func TestNodeResourceEstimator_EstimateComponents(t *testing.T) {
336336
}),
337337
},
338338
components: []pb.Component{},
339-
expected: 0,
340-
wantCode: framework.Error,
339+
expected: noNodeConstraint,
340+
wantCode: framework.Noopperation,
341341
},
342342
}
343343

@@ -455,7 +455,7 @@ func TestMatchNode(t *testing.T) {
455455

456456
for _, tt := range tests {
457457
t.Run(tt.name, func(t *testing.T) {
458-
result := matchNode(tt.replicaRequirements, tt.node)
458+
result := matchNode(tt.replicaRequirements.NodeClaim, tt.node)
459459
if result != tt.expected {
460460
t.Errorf("matchNode() = %v, expected %v", result, tt.expected)
461461
}

0 commit comments

Comments
 (0)