Skip to content

Commit c0d5e20

Browse files
authored
Use more peek and Fix for the implementation of popNextJob instead of pop+push (#152)
1 parent 63f592d commit c0d5e20

File tree

3 files changed

+244
-21
lines changed

3 files changed

+244
-21
lines changed

pkg/scheduler/actions/utils/job_order_by_queue.go

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ func (jobsOrder *JobsOrderByQueues) PopNextJob() *podgroup_info.PodGroupInfo {
6060
return nil
6161
}
6262

63-
department := jobsOrder.popNextDepartment()
63+
department := jobsOrder.getNextDepartment()
6464
if department == nil {
6565
return nil
6666
}
6767

68-
queue := jobsOrder.popNextQueue(department)
68+
queue := jobsOrder.getNextQueue(department)
6969
if queue == nil {
7070
return nil
7171
}
@@ -78,8 +78,8 @@ func (jobsOrder *JobsOrderByQueues) PopNextJob() *podgroup_info.PodGroupInfo {
7878
jobsOrder.queuePopsMap[queue.UID] = append(jobsOrder.queuePopsMap[queue.UID], job)
7979
}
8080

81-
jobsOrder.updateQueuePriorityQueue(queue, department)
82-
jobsOrder.updateDepartmentPriorityQueue(department)
81+
jobsOrder.handleJobPopOutOfQueue(queue, department)
82+
jobsOrder.handleJobPopOutOfDepartment(department)
8383

8484
log.InfraLogger.V(7).Infof("Popped job: %v", job.Name)
8585
return job
@@ -107,51 +107,52 @@ func (jobsOrder *JobsOrderByQueues) PushJob(job *podgroup_info.PodGroupInfo) {
107107
department.Name)
108108
}
109109

110-
func (jobsOrder *JobsOrderByQueues) updateDepartmentPriorityQueue(department *queue_info.QueueInfo) {
110+
func (jobsOrder *JobsOrderByQueues) handleJobPopOutOfDepartment(department *queue_info.QueueInfo) {
111111
if jobsOrder.departmentIdToDepartmentMetadata[department.UID].queuesPriorityQueue.Len() == 0 {
112+
jobsOrder.activeDepartments.Pop()
112113
delete(jobsOrder.departmentIdToDepartmentMetadata, department.UID)
113114
return
114115
}
115116

116-
jobsOrder.activeDepartments.Push(department)
117117
jobsOrder.departmentIdToDepartmentMetadata[department.UID].shouldUpdateQueueShare = true
118-
log.InfraLogger.V(7).Infof("Pushed department: %v", department.Name)
119118
}
120119

121-
func (jobsOrder *JobsOrderByQueues) updateQueuePriorityQueue(queue, department *queue_info.QueueInfo) {
120+
func (jobsOrder *JobsOrderByQueues) handleJobPopOutOfQueue(queue, department *queue_info.QueueInfo) {
122121
if jobsOrder.queueIdToQueueMetadata[queue.UID].jobsInQueue.Len() == 0 {
122+
jobsOrder.departmentIdToDepartmentMetadata[department.UID].queuesPriorityQueue.Pop()
123123
delete(jobsOrder.queueIdToQueueMetadata, queue.UID)
124124
return
125125
}
126126

127-
jobsOrder.departmentIdToDepartmentMetadata[department.UID].queuesPriorityQueue.Push(queue)
128127
jobsOrder.queueIdToQueueMetadata[queue.UID].shouldUpdateQueueShare = true
129-
log.InfraLogger.V(7).Infof("Pushed queue: %v", queue.Name)
130128
}
131129

132-
func (jobsOrder *JobsOrderByQueues) popNextQueue(department *queue_info.QueueInfo) *queue_info.QueueInfo {
133-
queue := jobsOrder.departmentIdToDepartmentMetadata[department.UID].queuesPriorityQueue.Pop().(*queue_info.QueueInfo)
130+
func (jobsOrder *JobsOrderByQueues) getNextQueue(department *queue_info.QueueInfo) *queue_info.QueueInfo {
131+
queue := jobsOrder.departmentIdToDepartmentMetadata[department.UID].queuesPriorityQueue.Peek().(*queue_info.QueueInfo)
134132
if jobsOrder.queueIdToQueueMetadata[queue.UID].shouldUpdateQueueShare {
135-
jobsOrder.departmentIdToDepartmentMetadata[department.UID].queuesPriorityQueue.Push(queue)
136-
jobsOrder.queueIdToQueueMetadata[queue.UID].shouldUpdateQueueShare = false
137-
queue = jobsOrder.departmentIdToDepartmentMetadata[department.UID].queuesPriorityQueue.Pop().(*queue_info.QueueInfo)
133+
jobsOrder.updateTopQueueShare(queue, department)
134+
return jobsOrder.getNextQueue(department)
138135
}
139136

140137
if jobsOrder.queueIdToQueueMetadata[queue.UID].jobsInQueue.Len() == 0 {
141138
log.InfraLogger.V(7).Warnf("Queue: <%v> is active, yet no jobs in queue", queue.Name)
142139
return nil
143140
}
144141

145-
log.InfraLogger.V(7).Infof("Popped queue: %v", queue.Name)
142+
log.InfraLogger.V(7).Infof("Get queue: %v", queue.Name)
146143
return queue
147144
}
148145

149-
func (jobsOrder *JobsOrderByQueues) popNextDepartment() *queue_info.QueueInfo {
150-
department := jobsOrder.activeDepartments.Pop().(*queue_info.QueueInfo)
146+
func (jobsOrder *JobsOrderByQueues) updateTopQueueShare(topQueue *queue_info.QueueInfo, department *queue_info.QueueInfo) {
147+
jobsOrder.departmentIdToDepartmentMetadata[department.UID].queuesPriorityQueue.Fix(0)
148+
jobsOrder.queueIdToQueueMetadata[topQueue.UID].shouldUpdateQueueShare = false
149+
}
150+
151+
func (jobsOrder *JobsOrderByQueues) getNextDepartment() *queue_info.QueueInfo {
152+
department := jobsOrder.activeDepartments.Peek().(*queue_info.QueueInfo)
151153
if jobsOrder.departmentIdToDepartmentMetadata[department.UID].shouldUpdateQueueShare {
152-
jobsOrder.activeDepartments.Push(department)
153-
jobsOrder.departmentIdToDepartmentMetadata[department.UID].shouldUpdateQueueShare = false
154-
department = jobsOrder.activeDepartments.Pop().(*queue_info.QueueInfo)
154+
jobsOrder.updateTopDepartmentShare(department)
155+
return jobsOrder.getNextDepartment()
155156
}
156157
if jobsOrder.departmentIdToDepartmentMetadata[department.UID].queuesPriorityQueue.Empty() {
157158
log.InfraLogger.V(7).Warnf("Department: <%v> is active, yet no queues in department", department.Name)
@@ -162,6 +163,11 @@ func (jobsOrder *JobsOrderByQueues) popNextDepartment() *queue_info.QueueInfo {
162163
return department
163164
}
164165

166+
func (jobsOrder *JobsOrderByQueues) updateTopDepartmentShare(topDepartment *queue_info.QueueInfo) {
167+
jobsOrder.activeDepartments.Fix(0)
168+
jobsOrder.departmentIdToDepartmentMetadata[topDepartment.UID].shouldUpdateQueueShare = false
169+
}
170+
165171
// addJobToQueue adds `job` to the jobs queue, creating that job's queue in the jobs order if needed
166172
func (jobsOrder *JobsOrderByQueues) addJobToQueue(job *podgroup_info.PodGroupInfo, reverseOrder bool) {
167173
if _, found := jobsOrder.queueIdToQueueMetadata[job.Queue]; !found {

pkg/scheduler/scheduler_util/priority_queue.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,18 @@ func (q *PriorityQueue) Pop() interface{} {
4646
return heap.Pop(&q.queue)
4747
}
4848

49+
func (q *PriorityQueue) Peek() interface{} {
50+
if q.Len() == 0 {
51+
return nil
52+
}
53+
54+
return q.queue.Peek()
55+
}
56+
57+
func (q *PriorityQueue) Fix(index int) {
58+
heap.Fix(&q.queue, index)
59+
}
60+
4961
func (q *PriorityQueue) Empty() bool {
5062
return q.queue.Len() == 0
5163
}
@@ -80,3 +92,11 @@ func (pq *priorityQueue) Pop() interface{} {
8092
(*pq).items = old[0 : n-1]
8193
return item
8294
}
95+
96+
func (pq *priorityQueue) Peek() interface{} {
97+
if pq.Len() == 0 {
98+
return nil
99+
}
100+
101+
return pq.items[0]
102+
}

pkg/scheduler/scheduler_util/priority_queue_test.go

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,200 @@ func TestPriorityQueue_PushAndPop(t *testing.T) {
106106
})
107107
}
108108
}
109+
110+
func TestPriorityQueue_Peek(t *testing.T) {
111+
type fields struct {
112+
lessFn common_info.LessFn
113+
maxQueueSize int
114+
}
115+
type args struct {
116+
previouslyPushed []interface{}
117+
}
118+
type want struct {
119+
mostPrioritized interface{}
120+
queueLength int
121+
}
122+
tests := []struct {
123+
name string
124+
fields fields
125+
args args
126+
want want
127+
}{
128+
{
129+
name: "basic peek",
130+
fields: fields{
131+
lessFn: func(l interface{}, r interface{}) bool {
132+
lv := l.(int)
133+
rv := r.(int)
134+
return lv < rv
135+
},
136+
maxQueueSize: QueueCapacityInfinite,
137+
},
138+
args: args{
139+
previouslyPushed: []interface{}{2, 3},
140+
},
141+
want: want{
142+
mostPrioritized: 2,
143+
queueLength: 2,
144+
},
145+
},
146+
{
147+
name: "no items",
148+
fields: fields{
149+
lessFn: func(l interface{}, r interface{}) bool {
150+
lv := l.(int)
151+
rv := r.(int)
152+
return lv < rv
153+
},
154+
maxQueueSize: QueueCapacityInfinite,
155+
},
156+
args: args{
157+
previouslyPushed: []interface{}{},
158+
},
159+
want: want{
160+
mostPrioritized: nil,
161+
queueLength: 0,
162+
},
163+
},
164+
}
165+
for _, tt := range tests {
166+
t.Run(tt.name, func(t *testing.T) {
167+
q := NewPriorityQueue(tt.fields.lessFn, tt.fields.maxQueueSize)
168+
for _, queueFill := range tt.args.previouslyPushed {
169+
q.Push(queueFill)
170+
}
171+
172+
if q.Len() != tt.want.queueLength {
173+
t.Errorf("before peek, queue length got %v, want %v", q.Len(), tt.want.queueLength)
174+
}
175+
176+
mostPrioritized := q.Peek()
177+
if !reflect.DeepEqual(mostPrioritized, tt.want.mostPrioritized) {
178+
t.Errorf("got %v, want %v", mostPrioritized, tt.want.mostPrioritized)
179+
}
180+
181+
if q.Len() != tt.want.queueLength {
182+
t.Errorf("after peek, queue length got %v, want %v", q.Len(), tt.want.queueLength)
183+
}
184+
})
185+
}
186+
}
187+
188+
func TestPriorityQueue_Fix(t *testing.T) {
189+
type item struct {
190+
Value int
191+
}
192+
type fields struct {
193+
lessFn common_info.LessFn
194+
maxQueueSize int
195+
}
196+
type args struct {
197+
previouslyPushed []*item
198+
valueToFix int
199+
}
200+
type want struct {
201+
mostPrioritized item
202+
}
203+
tests := []struct {
204+
name string
205+
fields fields
206+
args args
207+
want want
208+
}{
209+
{
210+
name: "basic fix",
211+
fields: fields{
212+
lessFn: func(l interface{}, r interface{}) bool {
213+
lv := l.(*item)
214+
rv := r.(*item)
215+
return lv.Value < rv.Value
216+
},
217+
maxQueueSize: QueueCapacityInfinite,
218+
},
219+
args: args{
220+
previouslyPushed: []*item{{Value: 2}, {Value: 3}},
221+
valueToFix: 4,
222+
},
223+
want: want{
224+
mostPrioritized: item{Value: 3},
225+
},
226+
},
227+
}
228+
for _, tt := range tests {
229+
t.Run(tt.name, func(t *testing.T) {
230+
q := NewPriorityQueue(tt.fields.lessFn, tt.fields.maxQueueSize)
231+
for _, queueFill := range tt.args.previouslyPushed {
232+
q.Push(queueFill)
233+
}
234+
235+
firstMostPrioritized := q.Peek().(*item)
236+
firstMostPrioritized.Value = tt.args.valueToFix
237+
q.Fix(0)
238+
239+
mostPrioritized := q.Peek().(*item)
240+
if !reflect.DeepEqual(*mostPrioritized, tt.want.mostPrioritized) {
241+
t.Errorf("got %v, want %v", *mostPrioritized, tt.want.mostPrioritized)
242+
}
243+
})
244+
}
245+
}
246+
247+
func TestPriorityQueue_EmptyAndLen(t *testing.T) {
248+
type expected struct {
249+
empty bool
250+
len int
251+
}
252+
tests := []struct {
253+
name string
254+
setup func(*PriorityQueue)
255+
expected expected
256+
}{
257+
{
258+
name: "Empty queue",
259+
setup: func(q *PriorityQueue) {},
260+
expected: expected{
261+
empty: true,
262+
len: 0,
263+
},
264+
},
265+
{
266+
name: "Queue with one item",
267+
setup: func(q *PriorityQueue) {
268+
q.Push("test")
269+
},
270+
expected: expected{
271+
empty: false,
272+
len: 1,
273+
},
274+
},
275+
{
276+
name: "Queue with multiple items",
277+
setup: func(q *PriorityQueue) {
278+
q.Push("test1")
279+
q.Push("test2")
280+
},
281+
expected: expected{
282+
empty: false,
283+
len: 2,
284+
},
285+
},
286+
}
287+
288+
for _, tt := range tests {
289+
t.Run(tt.name, func(t *testing.T) {
290+
lessFn := func(a, b interface{}) bool {
291+
return a.(string) < b.(string)
292+
}
293+
q := NewPriorityQueue(lessFn, QueueCapacityInfinite)
294+
295+
tt.setup(q)
296+
297+
if got := q.Empty(); got != tt.expected.empty {
298+
t.Errorf("PriorityQueue.Empty() = %v, want %v", got, tt.expected.empty)
299+
}
300+
if got := q.Len(); got != tt.expected.len {
301+
t.Errorf("PriorityQueue.Len() = %v, want %v", got, tt.expected.len)
302+
}
303+
})
304+
}
305+
}

0 commit comments

Comments
 (0)