Skip to content

Commit 334fe67

Browse files
authored
Added usage attributes (#418)
1 parent 855d71d commit 334fe67

File tree

11 files changed

+148
-95
lines changed

11 files changed

+148
-95
lines changed

cmd/fairshare-simulator/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (s *server) simulateHandler(w http.ResponseWriter, r *http.Request) {
5656
return
5757
}
5858

59-
queues := SimulateSetResourcesShare(req.TotalResource, req.Queues)
59+
queues := SimulateSetResourcesShare(req.TotalResource, 0, req.Queues)
6060

6161
resp := make(map[string]QueueFairShare)
6262
for id, qa := range queues {
@@ -92,12 +92,12 @@ func main() {
9292
}
9393
}
9494

95-
func SimulateSetResourcesShare(totalResource rs.ResourceQuantities, queueOverrides []rs.QueueOverrides) map[common_info.QueueID]*rs.QueueAttributes {
95+
func SimulateSetResourcesShare(totalResource rs.ResourceQuantities, kValue float64, queueOverrides []rs.QueueOverrides) map[common_info.QueueID]*rs.QueueAttributes {
9696
queues := make(map[common_info.QueueID]*rs.QueueAttributes)
9797
for _, qo := range queueOverrides {
9898
qa := qo.ToQueueAttributes()
9999
queues[qa.UID] = qa
100100
}
101-
resource_division.SetResourcesShare(totalResource, queues)
101+
resource_division.SetResourcesShare(totalResource, kValue, queues)
102102
return queues
103103
}

pkg/scheduler/api/queue_info/queue_info.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type QueueInfo struct {
3535
ParentQueue common_info.QueueID
3636
ChildQueues []common_info.QueueID
3737
Resources QueueQuota
38+
ResourceUsage QueueUsage
3839
Priority int
3940
CreationTimestamp metav1.Time
4041
PreemptMinRuntime *metav1.Duration

pkg/scheduler/api/queue_info/quota_info.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,11 @@ type QueueUsage struct {
2727
}
2828

2929
type ClusterUsage struct {
30-
ClusterCapacity QueueUsage `json:"cluster"`
31-
Queues map[common_info.QueueID]*QueueUsage `json:"queues"`
30+
Queues map[common_info.QueueID]*QueueUsage `json:"queues"`
3231
}
3332

3433
func NewClusterUsage() *ClusterUsage {
3534
return &ClusterUsage{
36-
ClusterCapacity: QueueUsage{},
37-
Queues: make(map[common_info.QueueID]*QueueUsage),
35+
Queues: make(map[common_info.QueueID]*QueueUsage),
3836
}
3937
}

pkg/scheduler/cache/cluster_info/cluster_info_test.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,6 @@ func TestSnapshotUsage(t *testing.T) {
173173
{
174174
name: "BasicUsage",
175175
usage: &queue_info.ClusterUsage{
176-
ClusterCapacity: queue_info.QueueUsage{
177-
CPU: 10,
178-
Memory: 10,
179-
GPU: 10,
180-
},
181176
Queues: map[common_info.QueueID]*queue_info.QueueUsage{
182177
"queue-1": {
183178
CPU: 10,
@@ -188,11 +183,6 @@ func TestSnapshotUsage(t *testing.T) {
188183
},
189184
err: nil,
190185
expectedUsage: &queue_info.ClusterUsage{
191-
ClusterCapacity: queue_info.QueueUsage{
192-
CPU: 10,
193-
Memory: 10,
194-
GPU: 10,
195-
},
196186
Queues: map[common_info.QueueID]*queue_info.QueueUsage{
197187
"queue-1": {
198188
CPU: 10,
@@ -211,11 +201,6 @@ func TestSnapshotUsage(t *testing.T) {
211201
{
212202
name: "Error and usage",
213203
usage: &queue_info.ClusterUsage{
214-
ClusterCapacity: queue_info.QueueUsage{
215-
CPU: 11,
216-
Memory: 11,
217-
GPU: 11,
218-
},
219204
Queues: map[common_info.QueueID]*queue_info.QueueUsage{
220205
"queue-1": {
221206
CPU: 11,
@@ -235,7 +220,6 @@ func TestSnapshotUsage(t *testing.T) {
235220
return
236221
}
237222
assert.NotNil(t, actual)
238-
assert.Equal(t, expected.ClusterCapacity, actual.ClusterCapacity)
239223
assert.Equal(t, len(expected.Queues), len(actual.Queues))
240224
for queueID, expectedUsage := range expected.Queues {
241225
actualUsage, ok := actual.Queues[queueID]

pkg/scheduler/cache/usagedb/usagedb_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,13 @@ func TestGetResourceUsage(t *testing.T) {
8787
name: "fresh data available",
8888
setupLister: func(l *UsageLister) {
8989
usage := queue_info.NewClusterUsage()
90-
usage.ClusterCapacity.GPU = 10
9190
usage.Queues["queue1"] = &queue_info.QueueUsage{GPU: 5}
9291
now := time.Now()
9392
l.lastUsageData = usage
9493
l.lastUsageDataTime = &now
9594
},
9695
wantUsage: func() *queue_info.ClusterUsage {
9796
usage := queue_info.NewClusterUsage()
98-
usage.ClusterCapacity.GPU = 10
9997
usage.Queues["queue1"] = &queue_info.QueueUsage{GPU: 5}
10098
return usage
10199
}(),
@@ -104,14 +102,12 @@ func TestGetResourceUsage(t *testing.T) {
104102
name: "stale data",
105103
setupLister: func(l *UsageLister) {
106104
usage := queue_info.NewClusterUsage()
107-
usage.ClusterCapacity.GPU = 10
108105
staleTime := time.Now().Add(-10 * time.Minute)
109106
l.lastUsageData = usage
110107
l.lastUsageDataTime = &staleTime
111108
},
112109
wantUsage: func() *queue_info.ClusterUsage {
113110
usage := queue_info.NewClusterUsage()
114-
usage.ClusterCapacity.GPU = 10
115111
return usage
116112
}(),
117113
wantErr: true,

pkg/scheduler/metrics/metrics.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ var (
5151
queueFairShareCPU *prometheus.GaugeVec
5252
queueFairShareMemory *prometheus.GaugeVec
5353
queueFairShareGPU *prometheus.GaugeVec
54+
queueCPUUsage *prometheus.GaugeVec
55+
queueMemoryUsage *prometheus.GaugeVec
56+
queueGPUUsage *prometheus.GaugeVec
5457
)
5558

5659
func init() {
@@ -168,6 +171,25 @@ func InitMetrics(namespace string) {
168171
Name: "queue_fair_share_gpu",
169172
Help: "GPU Fair share of queue, as a gauge. Values in GPU devices",
170173
}, []string{"queue_name"})
174+
175+
queueCPUUsage = promauto.NewGaugeVec(
176+
prometheus.GaugeOpts{
177+
Namespace: namespace,
178+
Name: "queue_cpu_usage_cores",
179+
Help: "CPU usage of queue, as a gauge. Value is proportional to cpu*hours usage with time decay applied",
180+
}, []string{"queue_name"})
181+
queueMemoryUsage = promauto.NewGaugeVec(
182+
prometheus.GaugeOpts{
183+
Namespace: namespace,
184+
Name: "queue_memory_usage_gb",
185+
Help: "Memory usage of queue, as a gauge. Value is proportional to memory*hours usage with time decay applied",
186+
}, []string{"queue_name"})
187+
queueGPUUsage = promauto.NewGaugeVec(
188+
prometheus.GaugeOpts{
189+
Namespace: namespace,
190+
Name: "queue_gpu_usage_devices",
191+
Help: "GPU usage of queue, as a gauge. Value is proportional to gpu*hours usage with time decay applied",
192+
}, []string{"queue_name"})
171193
}
172194

173195
// UpdateOpenSessionDuration updates latency for open session, including all plugins
@@ -242,6 +264,19 @@ func ResetQueueFairShare() {
242264
queueFairShareGPU.Reset()
243265
}
244266

267+
// UpdateQueueUsage updates usage of queue for a resource
268+
func UpdateQueueUsage(queueName string, cpu, memory, gpu float64) {
269+
queueCPUUsage.WithLabelValues(queueName).Set(cpu)
270+
queueMemoryUsage.WithLabelValues(queueName).Set(memory)
271+
queueGPUUsage.WithLabelValues(queueName).Set(gpu)
272+
}
273+
274+
func ResetQueueUsage() {
275+
queueCPUUsage.Reset()
276+
queueMemoryUsage.Reset()
277+
queueGPUUsage.Reset()
278+
}
279+
245280
// RegisterPreemptionAttempts records number of attempts for preemption
246281
func RegisterPreemptionAttempts() {
247282
preemptionAttempts.Inc()

pkg/scheduler/plugins/proportion/proportion.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ func (pp *proportionPlugin) calculateResourcesProportion(ssn *framework.Session)
220220
log.InfraLogger.V(6).Infof("Calculating resource proportion")
221221

222222
pp.setTotalResources(ssn)
223+
223224
pp.createQueueAttributes(ssn)
224225
log.InfraLogger.V(3).Infof("Total allocatable resources are <%s>, number of nodes: <%d>, number of "+
225226
"queues: <%d>", pp.totalResource, len(ssn.Nodes), len(pp.queues))
@@ -322,6 +323,11 @@ func (pp *proportionPlugin) createQueueResourceAttrs(ssn *framework.Session) {
322323
overQuotaWeight = queue.Resources.GPU.OverQuotaWeight
323324
queueAttributes.SetQuotaResources(rs.GpuResource, deserved, limit, overQuotaWeight)
324325

326+
usage, found := ssn.ResourceUsage.Queues[queue.UID]
327+
if found {
328+
queueAttributes.SetResourceUsage(*usage)
329+
}
330+
325331
pp.queues[queue.UID] = queueAttributes
326332
log.InfraLogger.V(7).Infof("Added queue attributes for queue <%s>", queue.Name)
327333
}
@@ -380,22 +386,23 @@ func (pp *proportionPlugin) updateQueuesResourceUsageForPendingJob(queueId commo
380386

381387
func (pp *proportionPlugin) setFairShare() {
382388
topQueues := pp.getTopQueues()
389+
metrics.ResetQueueUsage()
383390
metrics.ResetQueueFairShare()
384-
pp.setFairShareForQueues(pp.totalResource, topQueues)
391+
pp.setFairShareForQueues(pp.totalResource, 1, topQueues)
385392
}
386393

387-
func (pp *proportionPlugin) setFairShareForQueues(totalResources rs.ResourceQuantities,
394+
func (pp *proportionPlugin) setFairShareForQueues(totalResources rs.ResourceQuantities, kValue float64,
388395
queues map[common_info.QueueID]*rs.QueueAttributes) {
389396

390397
if len(queues) == 0 {
391398
return
392399
}
393400

394-
resource_division.SetResourcesShare(totalResources, queues)
401+
resource_division.SetResourcesShare(totalResources, kValue, queues)
395402
for _, queue := range queues {
396403
childQueues := pp.getChildQueues(queue)
397404
resources := queue.GetFairShare()
398-
pp.setFairShareForQueues(resources, childQueues)
405+
pp.setFairShareForQueues(resources, kValue, childQueues)
399406
}
400407
}
401408

pkg/scheduler/plugins/proportion/resource_division/resource_division.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@ type remainingRequestedResource struct {
2323
remainingAmount float64
2424
}
2525

26-
func SetResourcesShare(totalResource rs.ResourceQuantities, queues map[common_info.QueueID]*rs.QueueAttributes) {
26+
func SetResourcesShare(totalResource rs.ResourceQuantities, kValue float64, queues map[common_info.QueueID]*rs.QueueAttributes) {
2727
for _, resource := range rs.AllResources {
28-
setResourceShare(totalResource[resource], resource, queues)
28+
setResourceShare(totalResource[resource], kValue, resource, queues)
2929
}
3030
reportDivisionResult(queues)
3131
}
3232

33-
func setResourceShare(totalAmount float64, resourceName rs.ResourceName, queues map[common_info.QueueID]*rs.QueueAttributes) float64 {
33+
func setResourceShare(totalAmount, kValue float64, resourceName rs.ResourceName, queues map[common_info.QueueID]*rs.QueueAttributes) float64 {
3434
log.InfraLogger.V(6).Infof("About to start calculating %v fairShare, totalAmount: <%v>", resourceName, totalAmount)
3535
remainingAmount := setDeservedResource(totalAmount, queues, resourceName)
3636
if remainingAmount > 0 {
37-
remainingAmount = divideOverQuotaResource(remainingAmount, queues, resourceName)
37+
remainingAmount = divideOverQuotaResource(remainingAmount, kValue, queues, resourceName)
3838
} else {
3939
remainingAmount = 0
4040
}
@@ -54,24 +54,34 @@ func reportDivisionResult(queues map[common_info.QueueID]*rs.QueueAttributes) {
5454
gpuResourceShare.FairShare,
5555
)
5656

57+
metrics.UpdateQueueUsage(
58+
queue.Name,
59+
cpuResourceShare.GetUsage(),
60+
memoryResourceShare.GetUsage(),
61+
gpuResourceShare.GetUsage(),
62+
)
63+
5764
log.InfraLogger.V(3).Infof("Resource division result for queue <%v>: "+
5865
"Queue Priority: <%d>, "+
59-
"GPU: deserved: <%v>, requested: <%v>, maxAllowed: <%v>, fairShare: <%v> "+
60-
"CPU (cores): deserved: <%v>, requested: <%v>, maxAllowed: <%v>, fairShare: <%v> "+
61-
"memory (GB): deserved: <%v>, requested: <%v>, maxAllowed: <%v>, fairShare: <%v> ",
66+
"GPU: deserved: <%v>, requested: <%v>, maxAllowed: <%v>, usage: <%v>, fairShare: <%v> "+
67+
"CPU (cores): deserved: <%v>, requested: <%v>, maxAllowed: <%v>, usage: <%v>, fairShare: <%v> "+
68+
"memory (GB): deserved: <%v>, requested: <%v>, maxAllowed: <%v>, usage: <%v>, fairShare: <%v> ",
6269
queue.Name,
6370
queue.Priority,
6471
resource_info.HumanizeResource(gpuResourceShare.Deserved, 1),
6572
resource_info.HumanizeResource(gpuResourceShare.GetRequestableShare(), 1),
6673
resource_info.HumanizeResource(gpuResourceShare.MaxAllowed, 1),
74+
resource_info.HumanizeResource(gpuResourceShare.GetUsage(), 1),
6775
resource_info.HumanizeResource(gpuResourceShare.FairShare, 1),
6876
resource_info.HumanizeResource(cpuResourceShare.Deserved, resource_info.MilliCPUToCores),
6977
resource_info.HumanizeResource(cpuResourceShare.GetRequestableShare(), resource_info.MilliCPUToCores),
7078
resource_info.HumanizeResource(cpuResourceShare.MaxAllowed, resource_info.MilliCPUToCores),
79+
resource_info.HumanizeResource(cpuResourceShare.GetUsage(), resource_info.MilliCPUToCores),
7180
resource_info.HumanizeResource(cpuResourceShare.FairShare, resource_info.MilliCPUToCores),
7281
resource_info.HumanizeResource(memoryResourceShare.Deserved, resource_info.MemoryToGB),
7382
resource_info.HumanizeResource(memoryResourceShare.GetRequestableShare(), resource_info.MemoryToGB),
7483
resource_info.HumanizeResource(memoryResourceShare.MaxAllowed, resource_info.MemoryToGB),
84+
resource_info.HumanizeResource(memoryResourceShare.GetUsage(), resource_info.MemoryToGB),
7585
resource_info.HumanizeResource(memoryResourceShare.FairShare, resource_info.MemoryToGB))
7686
}
7787
}
@@ -95,15 +105,15 @@ func setDeservedResource(
95105
return remainingAmount
96106
}
97107

98-
func divideOverQuotaResource(totalResourceAmount float64, queues map[common_info.QueueID]*rs.QueueAttributes,
108+
func divideOverQuotaResource(totalResourceAmount, kValue float64, queues map[common_info.QueueID]*rs.QueueAttributes,
99109
resourceName rs.ResourceName) (remainingAmount float64) {
100110
queuesByPriority, priorities := getQueuesByPriority(queues)
101111
remainingRequested := make(map[int]map[common_info.QueueID]*remainingRequestedResource)
102112
remainingAmount = totalResourceAmount
103113

104114
for _, priority := range priorities {
105115
var newRemainingRequested map[common_info.QueueID]*remainingRequestedResource
106-
remainingAmount, newRemainingRequested = divideUpToFairShare(remainingAmount, queuesByPriority[priority], resourceName)
116+
remainingAmount, newRemainingRequested = divideUpToFairShare(remainingAmount, kValue, queuesByPriority[priority], resourceName)
107117
if remainingRequested[priority] == nil {
108118
remainingRequested[priority] = make(map[common_info.QueueID]*remainingRequestedResource)
109119
}
@@ -148,9 +158,10 @@ func getQueuesByPriority(queues map[common_info.QueueID]*rs.QueueAttributes) (ma
148158
return queuesByPriority, priorities
149159
}
150160

151-
func divideUpToFairShare(totalResourceAmount float64, queues map[common_info.QueueID]*rs.QueueAttributes,
161+
func divideUpToFairShare(totalResourceAmount, kValue float64, queues map[common_info.QueueID]*rs.QueueAttributes,
152162
resourceName rs.ResourceName) (remainingAmount float64, remainingRequested map[common_info.QueueID]*remainingRequestedResource) {
153163
remainingRequested = map[common_info.QueueID]*remainingRequestedResource{}
164+
154165
for {
155166
shouldRunAnotherRound := false
156167
amountToGiveInCurrentRound := totalResourceAmount

0 commit comments

Comments
 (0)