Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 44 additions & 10 deletions pkg/descheduler/core/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,60 @@ func NewSchedulingResultHelper(binding *workv1alpha2.ResourceBinding) *Schedulin
func (h *SchedulingResultHelper) FillUnschedulableReplicas(unschedulableThreshold time.Duration) {
reference := &h.Spec.Resource
undesiredClusters, undesiredClusterNames := h.GetUndesiredClusters()
// clusterIndexToEstimatorPriority key refers to index of cluster slice,
// value refers to the EstimatorPriority of who gave its estimated result.
clusterIndexToEstimatorPriority := make(map[int]estimatorclient.EstimatorPriority)

// Set the boundary.
for i := range undesiredClusters {
undesiredClusters[i].Unschedulable = math.MaxInt32
}
// Get the minimum value of MaxAvailableReplicas in terms of all estimators.

// Get all replicaEstimators, which are stored in TreeMap.
estimators := estimatorclient.GetUnschedulableReplicaEstimators()
ctx := context.WithValue(context.TODO(), util.ContextKeyObject,
fmt.Sprintf("kind=%s, name=%s/%s", reference.Kind, reference.Namespace, reference.Name))
for _, estimator := range estimators {
res, err := estimator.GetUnschedulableReplicas(ctx, undesiredClusterNames, reference, unschedulableThreshold)
if err != nil {
klog.Errorf("Max cluster unschedulable replicas error: %v", err)
continue

// List all unschedulableReplicaEstimators in order of descending priority. The estimators are grouped with different
// priorities, e.g: [priority:20, {estimators:[es1, es3]}, {priority:10, estimators:[es2, es4]}, ...]
estimatorGroups := estimators.Values()

// Iterate the estimator groups in order of descending priority
for _, estimatorGroup := range estimatorGroups {
// if higher-priority estimators have formed a full result of member clusters, no longer to call lower-priority estimator.
if len(clusterIndexToEstimatorPriority) == len(undesiredClusterNames) {
break
}
for i := range res {
if res[i].Replicas == estimatorclient.UnauthenticReplica {
estimatorsWithSamePriority := estimatorGroup.(map[string]estimatorclient.UnschedulableReplicaEstimator)
// iterate through these estimators with the same priority.
for _, estimator := range estimatorsWithSamePriority {
res, err := estimator.GetUnschedulableReplicas(ctx, undesiredClusterNames, reference, unschedulableThreshold)
if err != nil {
klog.Errorf("Max cluster unschedulable replicas error: %v", err)
continue
}
if undesiredClusters[i].ClusterName == res[i].Name && undesiredClusters[i].Unschedulable > res[i].Replicas {
undesiredClusters[i].Unschedulable = res[i].Replicas
for i := range res {
// the result of this cluster estimated failed, ignore the corresponding result
if res[i].Replicas == estimatorclient.UnauthenticReplica {
continue
}
// the cluster name not match, ignore, which hardly ever happens
if res[i].Name != undesiredClusters[i].ClusterName {
klog.Errorf("unexpected cluster name in the result of estimator with %d priority, "+
"expected: %s, got: %s", estimator.Priority(), undesiredClusters[i].ClusterName, res[i].Name)
continue
}
// the result of this cluster has already been estimated by higher-priority estimator,
// ignore the corresponding result by this estimator
if priority, ok := clusterIndexToEstimatorPriority[i]; ok && estimator.Priority() < priority {
continue
}
// if multiple estimators are called, choose the minimum value of each estimated result,
// record the priority of result provider.
if res[i].Replicas < undesiredClusters[i].Unschedulable {
undesiredClusters[i].Unschedulable = res[i].Replicas
clusterIndexToEstimatorPriority[i] = estimator.Priority()
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func NewDescheduler(karmadaClient karmadaclientset.Interface, kubeClient kuberne
ReconcileFunc: desched.reconcileEstimatorConnection,
}
desched.schedulerEstimatorWorker = util.NewAsyncWorker(schedulerEstimatorWorkerOptions)
schedulerEstimator := estimatorclient.NewSchedulerEstimator(desched.schedulerEstimatorCache, opts.SchedulerEstimatorTimeout.Duration)
schedulerEstimator := estimatorclient.NewSchedulerEstimator(desched.schedulerEstimatorCache,
opts.SchedulerEstimatorTimeout.Duration, estimatorclient.Accurate)
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)
deschedulerWorkerOptions := util.Options{
Name: "descheduler",
Expand Down
2 changes: 1 addition & 1 deletion pkg/descheduler/descheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func TestDescheduler_worker(t *testing.T) {
unschedulableThreshold: 5 * time.Minute,
eventRecorder: record.NewFakeRecorder(1024),
}
schedulerEstimator := estimatorclient.NewSchedulerEstimator(desched.schedulerEstimatorCache, 5*time.Second)
schedulerEstimator := estimatorclient.NewSchedulerEstimator(desched.schedulerEstimatorCache, 5*time.Second, estimatorclient.Accurate)
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)

for _, c := range tt.args.unschedulable {
Expand Down
21 changes: 14 additions & 7 deletions pkg/estimator/client/accurate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,25 @@ import (

// RegisterSchedulerEstimator will register a SchedulerEstimator.
func RegisterSchedulerEstimator(se *SchedulerEstimator) {
replicaEstimators["scheduler-estimator"] = se
unschedulableReplicaEstimators["scheduler-estimator"] = se
registerReplicaEstimator("scheduler-estimator", se)
registerUnschedulableReplicaEstimator("scheduler-estimator", se)
}

type getClusterReplicasFunc func(ctx context.Context, cluster string) (int32, error)

// SchedulerEstimator is an estimator that calls karmada-scheduler-estimator for estimation.
type SchedulerEstimator struct {
cache *SchedulerEstimatorCache
timeout time.Duration
cache *SchedulerEstimatorCache
timeout time.Duration
priority EstimatorPriority
}

// NewSchedulerEstimator builds a new SchedulerEstimator.
func NewSchedulerEstimator(cache *SchedulerEstimatorCache, timeout time.Duration) *SchedulerEstimator {
func NewSchedulerEstimator(cache *SchedulerEstimatorCache, timeout time.Duration, priority EstimatorPriority) *SchedulerEstimator {
return &SchedulerEstimator{
cache: cache,
timeout: timeout,
cache: cache,
timeout: timeout,
priority: priority,
}
}

Expand All @@ -67,6 +69,11 @@ func (se *SchedulerEstimator) MaxAvailableReplicas(
})
}

// Priority provides the priority of this estimator.
func (se *SchedulerEstimator) Priority() EstimatorPriority {
return se.priority
}

// GetUnschedulableReplicas gets the unschedulable replicas which belong to a specified workload by calling karmada-scheduler-estimator.
func (se *SchedulerEstimator) GetUnschedulableReplicas(
parentCtx context.Context,
Expand Down
15 changes: 11 additions & 4 deletions pkg/estimator/client/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ import (

// GeneralEstimator is the default replica estimator.
func init() {
replicaEstimators["general-estimator"] = NewGeneralEstimator()
registerReplicaEstimator("general-estimator", NewGeneralEstimator(General))
}

// GeneralEstimator is a normal estimator in terms of cluster ResourceSummary.
type GeneralEstimator struct{}
type GeneralEstimator struct {
priority EstimatorPriority
}

// NewGeneralEstimator builds a new GeneralEstimator.
func NewGeneralEstimator() *GeneralEstimator {
return &GeneralEstimator{}
func NewGeneralEstimator(priority EstimatorPriority) *GeneralEstimator {
return &GeneralEstimator{priority: priority}
}

// MaxAvailableReplicas estimates the maximum replicas that can be applied to the target cluster by cluster ResourceSummary.
Expand All @@ -53,6 +55,11 @@ func (ge *GeneralEstimator) MaxAvailableReplicas(_ context.Context, clusters []*
return availableTargetClusters, nil
}

// Priority provides the priority of this estimator.
func (ge *GeneralEstimator) Priority() EstimatorPriority {
return ge.priority
}

func (ge *GeneralEstimator) maxAvailableReplicas(cluster *clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) int32 {
resourceSummary := cluster.Status.ResourceSummary
if resourceSummary == nil {
Expand Down
67 changes: 63 additions & 4 deletions pkg/estimator/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"time"

"github.com/emirpasic/gods/maps/treemap"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
)
Expand All @@ -30,26 +32,83 @@ import (
const UnauthenticReplica = -1

var (
replicaEstimators = map[string]ReplicaEstimator{}
unschedulableReplicaEstimators = map[string]UnschedulableReplicaEstimator{}
// replicaEstimators are organized in a TreeMap, sorted by descending priority.
// The key is of type EstimatorPriority, indicating the estimator's priority level.
// The value is a map with string keys and ReplicaEstimator values, grouping estimators by their respective priorities.
replicaEstimators = treemap.NewWith(estimatorPriorityComparator)

// unschedulableReplicaEstimators are organized in a TreeMap, sorted by descending priority.
// The key is of type EstimatorPriority, indicating the estimator's priority level.
// The value is a map with string keys and UnschedulableReplicaEstimator values, grouping estimators by their respective priorities.
unschedulableReplicaEstimators = treemap.NewWith(estimatorPriorityComparator)
)

// registerReplicaEstimator add a estimator to replicaEstimators
func registerReplicaEstimator(estimatorName string, estimator ReplicaEstimator) {
if val, ok := replicaEstimators.Get(estimator.Priority()); !ok {
replicaEstimators.Put(estimator.Priority(), map[string]ReplicaEstimator{estimatorName: estimator})
} else {
estimatorsWithSamePriority := val.(map[string]ReplicaEstimator)
estimatorsWithSamePriority[estimatorName] = estimator
}
}

// registerUnschedulableReplicaEstimator add a estimator to unschedulableReplicaEstimators
func registerUnschedulableReplicaEstimator(estimatorName string, estimator UnschedulableReplicaEstimator) {
if val, ok := unschedulableReplicaEstimators.Get(estimator.Priority()); !ok {
unschedulableReplicaEstimators.Put(estimator.Priority(), map[string]UnschedulableReplicaEstimator{estimatorName: estimator})
} else {
estimatorsWithSamePriority := val.(map[string]UnschedulableReplicaEstimator)
estimatorsWithSamePriority[estimatorName] = estimator
}
}

// ReplicaEstimator is an estimator which estimates the maximum replicas that can be applied to the target cluster.
type ReplicaEstimator interface {
MaxAvailableReplicas(ctx context.Context, clusters []*clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) ([]workv1alpha2.TargetCluster, error)
Priority() EstimatorPriority
}

// UnschedulableReplicaEstimator is an estimator which estimates the unschedulable replicas which belong to a specified workload.
type UnschedulableReplicaEstimator interface {
GetUnschedulableReplicas(ctx context.Context, clusters []string, reference *workv1alpha2.ObjectReference, unschedulableThreshold time.Duration) ([]workv1alpha2.TargetCluster, error)
Priority() EstimatorPriority
}

// GetReplicaEstimators returns all replica estimators.
func GetReplicaEstimators() map[string]ReplicaEstimator {
func GetReplicaEstimators() *treemap.Map {
return replicaEstimators
}

// GetUnschedulableReplicaEstimators returns all unschedulable replica estimators.
func GetUnschedulableReplicaEstimators() map[string]UnschedulableReplicaEstimator {
func GetUnschedulableReplicaEstimators() *treemap.Map {
return unschedulableReplicaEstimators
}

// EstimatorPriority the priority of estimator
// 1. If two estimators are of the same priority, call both and choose the minimum value of each estimated result.
// 2. If higher-priority estimators have formed a full result of member clusters, no longer to call lower-priority estimator.
// 3. If higher-priority estimators haven't given the result for certain member clusters, lower-priority estimator will
// continue to estimate for such clusters haven't got a result.
type EstimatorPriority int32

const (
// General general priority, e.g: ResourceModel
General EstimatorPriority = 10
// Accurate accurate priority, e.g: SchedulerEstimator
Accurate EstimatorPriority = 20
)

// estimatorPriorityComparator provides a basic comparison on EstimatorPriority.
func estimatorPriorityComparator(a, b interface{}) int {
aAsserted := a.(EstimatorPriority)
bAsserted := b.(EstimatorPriority)
switch {
case aAsserted > bAsserted:
return -1
case aAsserted < bAsserted:
return 1
default:
return 0
}
}
55 changes: 44 additions & 11 deletions pkg/scheduler/core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ func getDefaultWeightPreference(clusters []*clusterv1alpha1.Cluster) *policyv1al
}

func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster {
// availableTargetClusters stores the result of estimated replicas for each clusters
availableTargetClusters := make([]workv1alpha2.TargetCluster, len(clusters))
// clusterIndexToEstimatorPriority key refers to index of cluster slice,
// value refers to the EstimatorPriority of who gave its estimated result.
clusterIndexToEstimatorPriority := make(map[int]estimatorclient.EstimatorPriority)

// Set the boundary.
for i := range availableTargetClusters {
Expand All @@ -68,22 +72,51 @@ func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha
return availableTargetClusters
}

// Get the minimum value of MaxAvailableReplicas in terms of all estimators.
estimators := estimatorclient.GetReplicaEstimators()
// Get all replicaEstimators, which are stored in TreeMap.
replicaEstimators := estimatorclient.GetReplicaEstimators()
ctx := context.WithValue(context.TODO(), util.ContextKeyObject,
fmt.Sprintf("kind=%s, name=%s/%s", spec.Resource.Kind, spec.Resource.Namespace, spec.Resource.Name))
for _, estimator := range estimators {
res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements)
if err != nil {
klog.Errorf("Max cluster available replicas error: %v", err)
continue

// List all replicaEstimators in order of descending priority. The estimators are grouped with different priorities,
// e.g: [priority:20, {estimators:[es1, es3]}, {priority:10, estimators:[es2, es4]}, ...]
estimatorGroups := replicaEstimators.Values()

// Iterate the estimator groups in order of descending priority
for _, estimatorGroup := range estimatorGroups {
// if higher-priority estimators have formed a full result of member clusters, no longer to call lower-priority estimator.
if len(clusterIndexToEstimatorPriority) == len(clusters) {
break
}
for i := range res {
if res[i].Replicas == estimatorclient.UnauthenticReplica {
estimatorsWithSamePriority := estimatorGroup.(map[string]estimatorclient.ReplicaEstimator)
// iterate through these estimators with the same priority.
for _, estimator := range estimatorsWithSamePriority {
res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements)
if err != nil {
klog.Errorf("Max cluster available replicas error: %v", err)
continue
}
if availableTargetClusters[i].Name == res[i].Name && availableTargetClusters[i].Replicas > res[i].Replicas {
availableTargetClusters[i].Replicas = res[i].Replicas
for i := range res {
// the result of this cluster estimated failed, ignore the corresponding result
if res[i].Replicas == estimatorclient.UnauthenticReplica {
continue
}
// the cluster name not match, ignore, which hardly ever happens
if res[i].Name != availableTargetClusters[i].Name {
klog.Errorf("unexpected cluster name in the result of estimator with %d priority, "+
"expected: %s, got: %s", estimator.Priority(), availableTargetClusters[i].Name, res[i].Name)
continue
}
// the result of this cluster has already been estimated by higher-priority estimator,
// ignore the corresponding result by this estimator
if priority, ok := clusterIndexToEstimatorPriority[i]; ok && estimator.Priority() < priority {
continue
}
// if multiple estimators are called, choose the minimum value of each estimated result,
// record the priority of result provider.
if res[i].Replicas < availableTargetClusters[i].Replicas {
availableTargetClusters[i].Replicas = res[i].Replicas
clusterIndexToEstimatorPriority[i] = estimator.Priority()
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
ReconcileFunc: sched.reconcileEstimatorConnection,
}
sched.schedulerEstimatorWorker = util.NewAsyncWorker(schedulerEstimatorWorkerOptions)
schedulerEstimator := estimatorclient.NewSchedulerEstimator(sched.schedulerEstimatorCache, options.schedulerEstimatorTimeout.Duration)
schedulerEstimator := estimatorclient.NewSchedulerEstimator(sched.schedulerEstimatorCache,
options.schedulerEstimatorTimeout.Duration, estimatorclient.Accurate)
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)
}
sched.enableEmptyWorkloadPropagation = options.enableEmptyWorkloadPropagation
Expand Down
40 changes: 40 additions & 0 deletions vendor/github.com/emirpasic/gods/maps/maps.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading