Skip to content

Commit 31d30b6

Browse files
committed
fix priority queue ordering when item priority changes
Signed-off-by: zach593 <[email protected]>
1 parent adb6465 commit 31d30b6

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ type AddOpts struct {
3030
// internally de-duplicates all items that are added to
3131
// it. It will use the max of the passed priorities and the
3232
// min of possible durations.
33+
//
34+
// When an item that is already enqueued at a lower priority
35+
// is re-enqueued with a higher priority, it will be placed at
36+
// the end among items of the new priority, in order to
37+
// preserve FIFO semantics within each priority level.
38+
// The effective duration (i.e. the ready time) is still
39+
// computed as the minimum across all enqueues.
3340
type PriorityQueue[T comparable] interface {
3441
workqueue.TypedRateLimitingInterface[T]
3542
AddWithOpts(o AddOpts, Items ...T)
@@ -161,12 +168,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
161168
Priority: ptr.Deref(o.Priority, 0),
162169
ReadyAt: readyAt,
163170
}
171+
w.addedCounter++
164172
w.items[key] = item
165173
w.queue.ReplaceOrInsert(item)
166174
if item.ReadyAt == nil {
167175
w.metrics.add(key, item.Priority)
168176
}
169-
w.addedCounter++
170177
continue
171178
}
172179

@@ -179,6 +186,8 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
179186
w.metrics.updateDepthWithPriorityMetric(item.Priority, newPriority)
180187
}
181188
item.Priority = newPriority
189+
item.AddedCounter = w.addedCounter
190+
w.addedCounter++
182191
}
183192

184193
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,26 @@ var _ = Describe("Controllerworkqueue", func() {
320320
Expect(depth).To(Equal(0))
321321
}
322322
})
323+
324+
It("follows FIFO order in the new priority queue when item priority changes", func() {
325+
q, _ := newQueue()
326+
defer q.ShutDown()
327+
328+
q.AddWithOpts(AddOpts{Priority: ptr.To(0)}, "foo")
329+
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "bar")
330+
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo")
331+
Expect(q.Len()).To(Equal(2))
332+
333+
item, priority, _ := q.GetWithPriority()
334+
Expect(item).To(Equal("bar"))
335+
Expect(priority).To(Equal(1))
336+
Expect(q.Len()).To(Equal(1))
337+
338+
item, priority, _ = q.GetWithPriority()
339+
Expect(item).To(Equal("foo"))
340+
Expect(priority).To(Equal(1))
341+
Expect(q.Len()).To(Equal(0))
342+
})
323343
})
324344

325345
func BenchmarkAddGetDone(b *testing.B) {

0 commit comments

Comments
 (0)