Skip to content

Commit f8b3adf

Browse files
authored
Change workqueue to prefer lowest timestamp. (#1160)
Currently we choose the highest timestamp, which inhibits our ability to force trying a key earlier. If we have a low priority key queued with a long delay, and we want to try it with high priority now, the result is a high priority with the long delay timestamp. In #1159 I want to be able to periodically resync N keys by loading the key space and queuing all of the keys in the future, but this may be hours in the future. When a key actually changes, we want to be able to process it immediately with high priority, but the current semantics mean that if there is an outstanding resync that some keys will get starved until the resync queued keys are processed. ``` ok github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs 316.861s ``` Signed-off-by: Matt Moore <[email protected]>
1 parent 81985b5 commit f8b3adf

File tree

3 files changed

+31
-97
lines changed

3 files changed

+31
-97
lines changed

pkg/workqueue/conformance/conformance.go

Lines changed: 16 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -361,21 +361,8 @@ func TestSemantics(t *testing.T, ctor func(int) workqueue.Interface) {
361361
t.Fatalf("Start failed: %v", err)
362362
}
363363

364-
// Queue the in-progress key with no priority.
365-
if err := wq.Queue(ctx, owned.Name(), workqueue.Options{
366-
// No priority
367-
}); err != nil {
368-
t.Fatalf("Queue failed: %v", err)
369-
}
370-
371-
// Check that the key is queued and in-progress, and the queued key
372-
// is behind the other key.
373-
_, _ = checkQueue(t, wq, ExpectedState{
374-
WorkInProgress: []string{"foo"},
375-
Queued: []string{"bar", "foo"},
376-
})
377-
378364
// Requeue the in-progress high-priority key.
365+
// This will add backoff NotBefore.
379366
if err := owned.Requeue(ctx); err != nil {
380367
t.Fatalf("Requeue failed: %v", err)
381368
}
@@ -422,30 +409,7 @@ func TestSemantics(t *testing.T, ctor func(int) workqueue.Interface) {
422409
t.Fatalf("Queue failed: %v", err)
423410
}
424411

425-
// The queue should appear empty because the later NotBefore won.
426-
_, _ = checkQueue(t, wq, ExpectedState{})
427-
428-
// Queue the same key again with a NotBefore that's twice as long.
429-
if err := wq.Queue(ctx, "foo", workqueue.Options{
430-
NotBefore: time.Now().UTC().Add(2 * delay),
431-
}); err != nil {
432-
t.Fatalf("Queue failed: %v", err)
433-
}
434-
435-
// The queue should appear empty
436-
_, _ = checkQueue(t, wq, ExpectedState{})
437-
438-
// Sleep for the NotBefore delay.
439-
time.Sleep(delay)
440-
441-
// The queue should STILL appear empty because the doubled delay
442-
// should have won.
443-
_, _ = checkQueue(t, wq, ExpectedState{})
444-
445-
// Sleep for the NotBefore delay one last time.
446-
time.Sleep(delay)
447-
448-
// The queue should now have the key.
412+
// The queue should still have the key because the earlier NotBefore won.
449413
_, _ = checkQueue(t, wq, ExpectedState{
450414
Queued: []string{"foo"},
451415
})
@@ -487,53 +451,33 @@ func TestSemantics(t *testing.T, ctor func(int) workqueue.Interface) {
487451
})
488452

489453
ct.scenario("requeue doesn't reset not before", func(ctx context.Context, t *testing.T, wq workqueue.Interface) {
490-
// Queue a key without NotBefore set.
454+
// Test that lowest (earliest) NotBefore wins when merging.
455+
// Queue a key with a long NotBefore delay.
456+
longDelay := 10 * workqueue.BackoffPeriod
491457
if err := wq.Queue(ctx, "foo", workqueue.Options{
492-
// No NotBefore.
458+
NotBefore: time.Now().UTC().Add(longDelay),
493459
}); err != nil {
494460
t.Fatalf("Queue failed: %v", err)
495461
}
496462

497-
// The queue should have the key.
498-
_, qd := checkQueue(t, wq, ExpectedState{
499-
Queued: []string{"foo"},
500-
})
501-
502-
// Start processing the first key.
503-
owned, err := qd[0].Start(ctx)
504-
if err != nil {
505-
t.Fatalf("Start failed: %v", err)
506-
}
507-
508-
// The key should now be in progress
509-
_, _ = checkQueue(t, wq, ExpectedState{
510-
WorkInProgress: []string{"foo"},
511-
})
463+
// The queue should be empty (long delay not passed).
464+
_, _ = checkQueue(t, wq, ExpectedState{})
512465

513-
// Queue the key again with a short NotBefore delay.
514-
if err := wq.Queue(ctx, owned.Name(), workqueue.Options{
515-
NotBefore: time.Now().UTC().Add(delay),
466+
// Queue the same key again with a shorter NotBefore delay.
467+
shortDelay := workqueue.BackoffPeriod
468+
if err := wq.Queue(ctx, "foo", workqueue.Options{
469+
NotBefore: time.Now().UTC().Add(shortDelay),
516470
}); err != nil {
517471
t.Fatalf("Queue failed: %v", err)
518472
}
519473

520-
// The queue should still have the key just in-progress.
521-
_, _ = checkQueue(t, wq, ExpectedState{
522-
WorkInProgress: []string{"foo"},
523-
})
524-
525-
// Requeue the key.
526-
if err := owned.Requeue(ctx); err != nil {
527-
t.Fatalf("Requeue failed: %v", err)
528-
}
529-
530-
// The requeue should not reset NotBefore.
474+
// The queue should still be empty (short delay not passed yet).
531475
_, _ = checkQueue(t, wq, ExpectedState{})
532476

533-
// Sleep for the NotBefore delay.
534-
time.Sleep(delay)
477+
// Sleep for the short delay.
478+
time.Sleep(shortDelay)
535479

536-
// Now the key should show as queued.
480+
// Now the key should show as queued (short delay won over long delay).
537481
_, _ = checkQueue(t, wq, ExpectedState{
538482
Queued: []string{"foo"},
539483
})

pkg/workqueue/gcs/gcs.go

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -117,23 +117,6 @@ func (w *wq) Queue(ctx context.Context, key string, opts workqueue.Options) erro
117117
}
118118

119119
func updateMetadata(ctx context.Context, client ClientInterface, key string, metadata map[string]string) error {
120-
switch {
121-
case metadata[priorityMetadataKey] != noPriority:
122-
// If the priority was set, then attempt to merge it with the queued
123-
// key.
124-
break
125-
126-
case metadata[notBeforeMetadataKey] != noNotBefore:
127-
// If the not before was set, then attempt to merge it with the queued
128-
// key. This is largely for Queue operations, as the NotBefore is
129-
// cleared when we start processing the key.
130-
break
131-
132-
default:
133-
// No options, so don't bother fetching the queued object.
134-
return nil
135-
}
136-
137120
attrs, err := client.Object(fmt.Sprintf("%s%s", queuedPrefix, key)).Attrs(ctx)
138121
if err != nil {
139122
return fmt.Errorf("Attrs() = %w", err)
@@ -143,12 +126,14 @@ func updateMetadata(ctx context.Context, client ClientInterface, key string, met
143126
attrs.Metadata = make(map[string]string, 2)
144127
}
145128
update := false
129+
// Always choose the highest priority.
146130
if p, ok := attrs.Metadata[priorityMetadataKey]; !ok || p < metadata[priorityMetadataKey] {
147131
clog.InfoContextf(ctx, "Updating %s priority from %q to %q", key, p, metadata[priorityMetadataKey])
148132
attrs.Metadata[priorityMetadataKey] = metadata[priorityMetadataKey]
149133
update = true
150134
}
151-
if ts, ok := attrs.Metadata[notBeforeMetadataKey]; ok && ts < metadata[notBeforeMetadataKey] {
135+
// Always choose the lowest not-before.
136+
if ts, ok := attrs.Metadata[notBeforeMetadataKey]; ok && ts > metadata[notBeforeMetadataKey] {
152137
clog.InfoContextf(ctx, "Updating %s not-before from %q to %q", key, ts, metadata[notBeforeMetadataKey])
153138
attrs.Metadata[notBeforeMetadataKey] = metadata[notBeforeMetadataKey]
154139
update = true

pkg/workqueue/inmem/inmem.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,21 @@ var _ workqueue.Interface = (*wq)(nil)
4949
func (w *wq) Queue(_ context.Context, key string, opts workqueue.Options) error {
5050
w.rw.Lock()
5151
defer w.rw.Unlock()
52-
if qi, ok := w.queue[key]; !ok {
52+
qi, ok := w.queue[key]
53+
if !ok {
5354
w.queue[key] = queueItem{
5455
Options: opts,
5556
queued: time.Now().UTC(),
5657
}
57-
} else if qi.Priority < opts.Priority {
58-
// Raise the priority of the queued item.
58+
return nil
59+
}
60+
// Always choose the highest priority.
61+
if qi.Priority < opts.Priority {
5962
qi.Priority = opts.Priority
6063
w.queue[key] = qi
61-
} else if qi.NotBefore.Before(opts.NotBefore) {
64+
}
65+
// Always choose the lowest not-before.
66+
if qi.NotBefore.After(opts.NotBefore) {
6267
// Update the NotBefore time.
6368
qi.NotBefore = opts.NotBefore
6469
w.queue[key] = qi
@@ -223,12 +228,12 @@ func (o *inProgressKey) RequeueWithOptions(_ context.Context, opts workqueue.Opt
223228
queued: time.Now().UTC(),
224229
}
225230
} else {
231+
// Always choose the highest priority.
226232
if qi.Priority < opts.Priority {
227-
// Raise the priority of the queued item.
228233
qi.Priority = opts.Priority
229234
}
230-
if opts.NotBefore.After(qi.NotBefore) {
231-
// Update the NotBefore time if the new one is later.
235+
// Always choose the lowest not-before.
236+
if qi.NotBefore.After(opts.NotBefore) {
232237
qi.NotBefore = opts.NotBefore
233238
}
234239
o.wq.queue[o.key] = qi

0 commit comments

Comments
 (0)