diff --git a/pkg/scheduler/plugins/proportion/resource_division/resource_division.go b/pkg/scheduler/plugins/proportion/resource_division/resource_division.go index 4309399bf..6daaac898 100644 --- a/pkg/scheduler/plugins/proportion/resource_division/resource_division.go +++ b/pkg/scheduler/plugins/proportion/resource_division/resource_division.go @@ -165,11 +165,41 @@ func divideUpToFairShare(totalResourceAmount, kValue float64, queues map[common_ for { shouldRunAnotherRound := false amountToGiveInCurrentRound := totalResourceAmount - totalWeights := getTotalWeightsForUnsatisfied(queues, resourceName) + totalWeights, totalUsages := getTotalWeightsForUnsatisfied(queues, resourceName) if totalWeights == 0 { break } + if totalUsages == 0 { + totalUsages = 1 + } + + portions := make(map[common_info.QueueID]float64) + totalPortions := 0.0 + for _, queue := range queues { + share := queue.ResourceShare(resourceName) + if share.Request <= share.FairShare { + // queue is satisfied, no need to give it more resources + continue + } + + // Normalize queue over quota weight + nWeight := share.OverQuotaWeight / totalWeights + + // We assume that usage is normalized to usage/clusterCapacity + nUsage := share.GetUsage() + + // Floor portion to 0 if it's negative + portion := math.Max(0, nWeight+kValue*(nWeight-nUsage)) + + portions[queue.UID] = portion + totalPortions += portion + } + + if totalPortions == 0 { + break + } + for _, queue := range queues { requested := getRemainingRequested(queue, resourceName) if requested == 0 { @@ -190,7 +220,12 @@ func divideUpToFairShare(totalResourceAmount, kValue float64, queues map[common_ log.InfraLogger.V(6).Infof("calculating %v resource fair share for %v: deserved: %v, "+ "remaining requested: %v, fairShare: %v", resourceName, queue.Name, resourceShare.Deserved, requested, resourceShare.FairShare) - fairShare := amountToGiveInCurrentRound * (overQuotaWeight / totalWeights) + + // normalize portion + portion := portions[queue.UID] + nPortion := portion / totalPortions + + fairShare := amountToGiveInCurrentRound * nPortion resourceToGive := getResourceToGiveInCurrentRound(fairShare, requested, queue, remainingRequested) if resourceToGive == 0 { continue @@ -255,14 +290,24 @@ func getResourceToGiveInCurrentRound(fairShare float64, requested float64, queue return resourcesToGive } -func getTotalWeightsForUnsatisfied(queues map[common_info.QueueID]*rs.QueueAttributes, resourceName rs.ResourceName) (totalOverQuotaWeights float64) { +func getTotalWeightsForUnsatisfied(queues map[common_info.QueueID]*rs.QueueAttributes, resourceName rs.ResourceName) (totalOverQuotaWeights, totalUsages float64) { for _, queue := range queues { remainingRequested := getRemainingRequested(queue, resourceName) - if remainingRequested > 0 { - totalOverQuotaWeights += queue.ResourceShare(resourceName).OverQuotaWeight + if remainingRequested <= 0 { + continue } + totalOverQuotaWeights += queue.ResourceShare(resourceName).OverQuotaWeight + + queueUsage := queue.ResourceShare(resourceName).GetUsage() + if queueUsage < 0 { + log.InfraLogger.V(1).Warnf("queue <%v> has negative usage score of <%v> for resource <%v>, expected non-negative", + queue.Name, queueUsage, resourceName) + continue + } + + totalUsages += queueUsage } - return totalOverQuotaWeights + return totalOverQuotaWeights, totalUsages } func getRemainingRequested(queue *rs.QueueAttributes, resourceName rs.ResourceName) float64 {