Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ type AutoscalingOptions struct {
DynamicResourceAllocationEnabled bool
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
ClusterSnapshotParallelism int
// PredicateParallelism is the number of goroutines to use for running scheduler predicates.
PredicateParallelism int
// CheckCapacityProcessorInstance is the name of the processor instance.
// Only ProvisioningRequests that define this name in their parameters with the key "processorInstance" will be processed by this CA instance.
// It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance.
Expand Down
6 changes: 6 additions & 0 deletions cluster-autoscaler/config/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ var (
forceDeleteFailedNodes = flag.Bool("force-delete-failed-nodes", false, "Whether to enable force deletion of failed nodes, regardless of the min size of the node group the belong to.")
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
predicateParallelism = flag.Int("predicate-parallelism", 4, "Maximum parallelism of scheduler predicate checking.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we fail if someone passes in 0 (and/or should we enforce an upper boundary?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea - added validation for lower bound. For upper bound I don't see the need to add an artificial limit, so not checking it.

checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.")
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not")
Expand Down Expand Up @@ -286,6 +287,10 @@ func createAutoscalingOptions() config.AutoscalingOptions {
}
}

if *predicateParallelism < 1 {
klog.Fatalf("Invalid value for --predicate-parallelism flag: %d", *predicateParallelism)
}

return config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: *scaleDownUtilizationThreshold,
Expand Down Expand Up @@ -400,6 +405,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ForceDeleteFailedNodes: *forceDeleteFailedNodes,
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
PredicateParallelism: *predicateParallelism,
CheckCapacityProcessorInstance: *checkCapacityProcessorInstance,
MaxInactivityTime: *maxInactivityTimeFlag,
MaxFailingTime: *maxFailingTimeFlag,
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto
opts.FrameworkHandle = fwHandle
}
if opts.ClusterSnapshot == nil {
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled)
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.PredicateParallelism)
}
if opts.RemainingPdbTracker == nil {
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled)
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.PredicateParallelism)
pods, err := a.autoscalingCtx.AllPodLister().List()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func buildAutoscaler(ctx context.Context, debuggingSnapshotter debuggingsnapshot
opts := coreoptions.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
FrameworkHandle: fwHandle,
ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled),
ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.PredicateParallelism),
KubeClient: kubeClient,
InformerFactory: informerFactory,
DebuggingSnapshotter: debuggingSnapshotter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,27 @@ import (
"context"
"fmt"
"strings"
"sync"

"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/client-go/util/workqueue"

apiv1 "k8s.io/api/core/v1"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework.
type SchedulerPluginRunner struct {
fwHandle *framework.Handle
snapshot clustersnapshot.ClusterSnapshot
lastIndex int
fwHandle *framework.Handle
snapshot clustersnapshot.ClusterSnapshot
lastIndex int
parallelism int
}

// NewSchedulerPluginRunner builds a SchedulerPluginRunner.
func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapshot.ClusterSnapshot) *SchedulerPluginRunner {
return &SchedulerPluginRunner{fwHandle: fwHandle, snapshot: snapshot}
func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapshot.ClusterSnapshot, parallelism int) *SchedulerPluginRunner {
return &SchedulerPluginRunner{fwHandle: fwHandle, snapshot: snapshot, parallelism: parallelism}
}

// RunFiltersUntilPassingNode runs the scheduler framework PreFilter phase once, and then keeps running the Filter phase for all nodes in the cluster that match the provided
Expand All @@ -64,38 +67,63 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
}

for i := range nodeInfosList {
// Determine which NodeInfo to check next.
nodeInfo := nodeInfosList[(p.lastIndex+i)%len(nodeInfosList)]
var (
foundNode *apiv1.Node
foundCycleState *schedulerframework.CycleState
foundIndex int
mu sync.Mutex
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

checkNode := func(i int) {
nodeIndex := (p.lastIndex + i) % len(nodeInfosList)
nodeInfo := nodeInfosList[nodeIndex]

// Plugins can filter some Nodes out during the PreFilter phase, if they're sure the Nodes won't work for the Pod at that stage.
// Filters are only run for Nodes that haven't been filtered out during the PreFilter phase. Match that behavior here - skip such Nodes.
if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) {
continue
return
}

// Nodes with the Unschedulable bit set will be rejected by one of the plugins during the Filter phase below. We can check that quickly here
// and short-circuit to avoid running the expensive Filter phase at all in this case.
if nodeInfo.Node().Spec.Unschedulable {
continue
return
}

// Check if the NodeInfo matches the provided filtering condition. This should be less expensive than running the Filter phase below, so
// check this first.
if !nodeMatches(nodeInfo) {
continue
return
}

// Run the Filter phase of the framework. Plugins retrieve the state they saved during PreFilter from CycleState, and answer whether the
// given Pod can be scheduled on the given Node.
filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler())
clonedState := state.Clone()
filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), clonedState, pod, nodeInfo.ToScheduler())
if filterStatus.IsSuccess() {
// Filter passed for all plugins, so this pod can be scheduled on this Node.
p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList)
return nodeInfo.Node(), state, nil
mu.Lock()
defer mu.Unlock()
if foundNode == nil {
foundNode = nodeInfo.Node()
foundCycleState = clonedState.(*schedulerframework.CycleState)
foundIndex = nodeIndex
cancel()
}
}
// Filter didn't pass for some plugin, so this Node won't work - move on to the next one.
}

workqueue.ParallelizeUntil(ctx, p.parallelism, len(nodeInfosList), checkNode)

if foundNode != nil {
p.lastIndex = (foundIndex + 1) % len(nodeInfosList)
return foundNode, foundCycleState, nil
}

return nil, nil, clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package predicate

import (
"fmt"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -363,6 +364,58 @@ func newTestPluginRunnerAndSnapshot(schedConfig *config.KubeSchedulerConfigurati
if err != nil {
return nil, nil, err
}
snapshot := NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true)
return NewSchedulerPluginRunner(fwHandle, snapshot), snapshot, nil
snapshot := NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true, 1)
return NewSchedulerPluginRunner(fwHandle, snapshot, 1), snapshot, nil
}

func BenchmarkRunFiltersUntilPassingNode(b *testing.B) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add something like go test -run=^$ -bench=. ./... to a make target so we can start getting visibility into benchmark tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make benchmark? Yeah, I think that makes sense. Will follow up with a PR.

pod := BuildTestPod("p", 100, 1000)
nodes := make([]*apiv1.Node, 0, 5001)
podsOnNodes := make(map[string][]*apiv1.Pod)

for i := 0; i < 5000; i++ {
nodeName := fmt.Sprintf("n-%d", i)
node := BuildTestNode(nodeName, 10, 1000)
nodes = append(nodes, node)
// Add 10 small pods to each node
pods := make([]*apiv1.Pod, 0, 10)
for j := 0; j < 10; j++ {
pods = append(pods, BuildTestPod(fmt.Sprintf("p-%d-%d", i, j), 1, 1))
}
podsOnNodes[nodeName] = pods
}
// Last node is the only one that can fit the pod.
lastNodeName := fmt.Sprintf("n-%d", len(nodes))
lastNode := BuildTestNode(lastNodeName, 1000, 1000)
nodes = append(nodes, lastNode)

pluginRunner, snapshot, err := newTestPluginRunnerAndSnapshot(nil)
assert.NoError(b, err)

for _, node := range nodes {
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, podsOnNodes[node.Name]...))
assert.NoError(b, err)
}

testCases := []struct {
parallelism int
}{
{parallelism: 1},
{parallelism: 2},
{parallelism: 4},
{parallelism: 8},
{parallelism: 16},
}

for _, tc := range testCases {
b.Run(fmt.Sprintf("parallelism-%d", tc.parallelism), func(b *testing.B) {
pluginRunner.parallelism = tc.parallelism
b.ResetTimer()
for i := 0; i < b.N; i++ {
pluginRunner.lastIndex = 0 // Reset state for each run
_, _, err := pluginRunner.RunFiltersUntilPassingNode(pod, func(info *framework.NodeInfo) bool { return true })
assert.NoError(b, err)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type PredicateSnapshot struct {
}

// NewPredicateSnapshot builds a PredicateSnapshot.
func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fwHandle *framework.Handle, draEnabled bool) *PredicateSnapshot {
func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fwHandle *framework.Handle, draEnabled bool, parallelism int) *PredicateSnapshot {
snapshot := &PredicateSnapshot{
ClusterSnapshotStore: snapshotStore,
draEnabled: draEnabled,
Expand All @@ -46,7 +46,7 @@ func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fw
// which operate on *framework.NodeInfo. The only object that allows obtaining *framework.NodeInfos is PredicateSnapshot, so we have an ugly circular
// dependency between PluginRunner and PredicateSnapshot.
// TODO: Refactor PluginRunner so that it doesn't depend on PredicateSnapshot (e.g. move retrieving NodeInfos out of PluginRunner, to PredicateSnapshot).
snapshot.pluginRunner = NewSchedulerPluginRunner(fwHandle, snapshot)
snapshot.pluginRunner = NewSchedulerPluginRunner(fwHandle, snapshot, parallelism)
return snapshot
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){
if err != nil {
return nil, err
}
return NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true), nil
return NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true, 1), nil
},
"delta": func() (clustersnapshot.ClusterSnapshot, error) {
fwHandle, err := framework.NewTestFrameworkHandle()
if err != nil {
return nil, err
}
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true), nil
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true, 1), nil
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ func NewCustomTestSnapshotAndHandle(snapshotStore clustersnapshot.ClusterSnapsho
if err != nil {
return nil, nil, err
}
return predicate.NewPredicateSnapshot(snapshotStore, testFwHandle, true), testFwHandle, nil
return predicate.NewPredicateSnapshot(snapshotStore, testFwHandle, true, 1), testFwHandle, nil
}
Loading