Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,41 @@ func divideUpToFairShare(totalResourceAmount, kValue float64, queues map[common_
for {
shouldRunAnotherRound := false
amountToGiveInCurrentRound := totalResourceAmount
totalWeights := getTotalWeightsForUnsatisfied(queues, resourceName)
totalWeights, totalUsages := getTotalWeightsForUnsatisfied(queues, resourceName)
if totalWeights == 0 {
break
}

if totalUsages == 0 {
totalUsages = 1
}

portions := make(map[common_info.QueueID]float64)
totalPortions := 0.0
for _, queue := range queues {
share := queue.ResourceShare(resourceName)
if share.Request <= share.FairShare {
// queue is satisfied, no need to give it more resources
continue
}

// Normalize queue over quota weight
nWeight := share.OverQuotaWeight / totalWeights

// We assume that usage is normalized to usage/clusterCapacity
nUsage := share.GetUsage()

// Floor portion to 0 if it's negative
portion := math.Max(0, nWeight+kValue*(nWeight-nUsage))

portions[queue.UID] = portion
totalPortions += portion
}

if totalPortions == 0 {
break
}

for _, queue := range queues {
requested := getRemainingRequested(queue, resourceName)
if requested == 0 {
Expand All @@ -190,7 +220,12 @@ func divideUpToFairShare(totalResourceAmount, kValue float64, queues map[common_
log.InfraLogger.V(6).Infof("calculating %v resource fair share for %v: deserved: %v, "+
"remaining requested: %v, fairShare: %v",
resourceName, queue.Name, resourceShare.Deserved, requested, resourceShare.FairShare)
fairShare := amountToGiveInCurrentRound * (overQuotaWeight / totalWeights)

// normalize portion
portion := portions[queue.UID]
nPortion := portion / totalPortions

fairShare := amountToGiveInCurrentRound * nPortion
resourceToGive := getResourceToGiveInCurrentRound(fairShare, requested, queue, remainingRequested)
if resourceToGive == 0 {
continue
Expand Down Expand Up @@ -255,14 +290,24 @@ func getResourceToGiveInCurrentRound(fairShare float64, requested float64, queue
return resourcesToGive
}

func getTotalWeightsForUnsatisfied(queues map[common_info.QueueID]*rs.QueueAttributes, resourceName rs.ResourceName) (totalOverQuotaWeights float64) {
func getTotalWeightsForUnsatisfied(queues map[common_info.QueueID]*rs.QueueAttributes, resourceName rs.ResourceName) (totalOverQuotaWeights, totalUsages float64) {
for _, queue := range queues {
remainingRequested := getRemainingRequested(queue, resourceName)
if remainingRequested > 0 {
totalOverQuotaWeights += queue.ResourceShare(resourceName).OverQuotaWeight
if remainingRequested <= 0 {
continue
}
totalOverQuotaWeights += queue.ResourceShare(resourceName).OverQuotaWeight

queueUsage := queue.ResourceShare(resourceName).GetUsage()
if queueUsage < 0 {
log.InfraLogger.V(1).Warnf("queue <%v> has negative usage score of <%v> for resource <%v>, expected non-negative",
queue.Name, queueUsage, resourceName)
continue
}

totalUsages += queueUsage
}
return totalOverQuotaWeights
return totalOverQuotaWeights, totalUsages
}

func getRemainingRequested(queue *rs.QueueAttributes, resourceName rs.ResourceName) float64 {
Expand Down
Loading