Skip to content

🐛 Bugfixes for priority queue #3060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/priorityqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/builder"
kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/config"
Expand All @@ -48,7 +49,7 @@ func run() error {

// Setup a Manager
mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{
Controller: config.Controller{UsePriorityQueue: true},
Controller: config.Controller{UsePriorityQueue: ptr.To(true)},
})
if err != nil {
return fmt.Errorf("failed to set up controller-manager: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@ type Controller struct {
// priority queue.
//
// Note: This flag is disabled by default until a future version. It's currently in beta.
UsePriorityQueue bool
UsePriorityQueue *bool
}
5 changes: 3 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/go-logr/logr"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
Expand Down Expand Up @@ -190,7 +191,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
}

if options.RateLimiter == nil {
if mgr.GetControllerOptions().UsePriorityQueue {
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second)
} else {
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
Expand All @@ -199,7 +200,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt

if options.NewQueue == nil {
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
if mgr.GetControllerOptions().UsePriorityQueue {
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
o.RateLimiter = rateLimiter
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ var _ = Describe("controller.Controller", func() {

It("should configure a priority queue if UsePriorityQueue is set", func() {
m, err := manager.New(cfg, manager.Options{
Controller: config.Controller{UsePriorityQueue: true},
Controller: config.Controller{UsePriorityQueue: ptr.To(true)},
})
Expect(err).NotTo(HaveOccurred())

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/priorityqueue/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type defaultQueueMetrics[T comparable] struct {
retries workqueue.CounterMetric
}

// add is called for ready items only
func (m *defaultQueueMetrics[T]) add(item T) {
if m == nil {
return
Expand Down
78 changes: 55 additions & 23 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
}

pq := &priorityqueue[T]{
items: map[T]*item[T]{},
queue: btree.NewG(32, less[T]),
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
items: map[T]*item[T]{},
queue: btree.NewG(32, less[T]),
becameReady: sets.Set[T]{},
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
// itemOrWaiterAdded indicates that an item or
// waiter was added. It must be buffered, because
// if we currently process items we can't tell
Expand All @@ -83,16 +84,21 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {

type priorityqueue[T comparable] struct {
// lock has to be acquired for any access any of items, queue, addedCounter
// or metrics.
lock sync.Mutex
items map[T]*item[T]
queue bTree[*item[T]]
metrics queueMetrics[T]
// or becameReady
lock sync.Mutex
items map[T]*item[T]
queue bTree[*item[T]]

// addedCounter is a counter of elements added, we need it
// because unixNano is not guaranteed to be unique.
addedCounter uint64

// becameReady holds items that are in the queue, were added
// with non-zero after and became ready. We need it to call the
// metrics add exactly once for them.
becameReady sets.Set[T]
metrics queueMetrics[T]

itemOrWaiterAdded chan struct{}

rateLimiter workqueue.TypedRateLimiter[T]
Expand Down Expand Up @@ -142,7 +148,9 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
}
w.items[key] = item
w.queue.ReplaceOrInsert(item)
w.metrics.add(key)
if item.readyAt == nil {
w.metrics.add(key)
}
w.addedCounter++
continue
}
Expand Down Expand Up @@ -195,19 +203,25 @@ func (w *priorityqueue[T]) spin() {
w.lockedLock.Lock()
defer w.lockedLock.Unlock()

// manipulating the tree from within Ascend might lead to panics, so
// track what we want to delete and do it after we are done ascending.
var toDelete []*item[T]
w.queue.Ascend(func(item *item[T]) bool {
if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways
return false
if item.readyAt != nil {
if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 {
nextReady = w.tick(readyAt)
return false
}
if !w.becameReady.Has(item.key) {
w.metrics.add(item.key)
w.becameReady.Insert(item.key)
}
}

// No next element we can process
if item.readyAt != nil && item.readyAt.After(w.now()) {
readyAt := item.readyAt.Sub(w.now())
if readyAt <= 0 { // Toctou race with the above check
readyAt = 1
}
nextReady = w.tick(readyAt)
return false
if w.waiters.Load() == 0 {
// Have to keep iterating here to ensure we update metrics
// for further items that became ready and set nextReady.
return true
}

// Item is locked, we can not hand it out
Expand All @@ -219,11 +233,16 @@ func (w *priorityqueue[T]) spin() {
w.locked.Insert(item.key)
w.waiters.Add(-1)
delete(w.items, item.key)
w.queue.Delete(item)
toDelete = append(toDelete, item)
w.becameReady.Delete(item.key)
w.get <- *item

return true
})

for _, item := range toDelete {
w.queue.Delete(item)
}
}()
}
}
Expand Down Expand Up @@ -279,22 +298,36 @@ func (w *priorityqueue[T]) ShutDown() {
close(w.done)
}

// ShutDownWithDrain just calls ShutDown, as the draining
// functionality is not used by controller-runtime.
func (w *priorityqueue[T]) ShutDownWithDrain() {
w.ShutDown()
}

// Len returns the number of items that are ready to be
// picked up. It does not include items that are not yet
// ready.
func (w *priorityqueue[T]) Len() int {
w.lock.Lock()
defer w.lock.Unlock()

return w.queue.Len()
var result int
w.queue.Ascend(func(item *item[T]) bool {
if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 {
result++
return true
}
return false
})

return result
}

func less[T comparable](a, b *item[T]) bool {
if a.readyAt == nil && b.readyAt != nil {
return true
}
if a.readyAt != nil && b.readyAt == nil {
if b.readyAt == nil && a.readyAt != nil {
return false
}
if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) {
Expand Down Expand Up @@ -329,5 +362,4 @@ type bTree[T any] interface {
ReplaceOrInsert(item T) (_ T, _ bool)
Delete(item T) (T, bool)
Ascend(iterator btree.ItemIteratorG[T])
Len() int
}
104 changes: 99 additions & 5 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package priorityqueue

import (
"fmt"
"math/rand/v2"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -283,6 +284,101 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(metrics.depth["test"]).To(Equal(0))
Expect(metrics.adds["test"]).To(Equal(2))
})

It("doesn't include non-ready items in Len()", func() {
q, metrics := newQueue()
defer q.ShutDown()

q.AddWithOpts(AddOpts{After: time.Minute}, "foo")
q.AddWithOpts(AddOpts{}, "baz")
q.AddWithOpts(AddOpts{After: time.Minute}, "bar")
q.AddWithOpts(AddOpts{}, "bal")

Expect(q.Len()).To(Equal(2))
Expect(metrics.depth).To(HaveLen(1))
Expect(metrics.depth["test"]).To(Equal(2))
})

It("items are included in Len() and the queueDepth metric once they are ready", func() {
q, metrics := newQueue()
defer q.ShutDown()

q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo")
q.AddWithOpts(AddOpts{}, "baz")
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar")
q.AddWithOpts(AddOpts{}, "bal")

Expect(q.Len()).To(Equal(2))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(2))
metrics.mu.Unlock()
time.Sleep(time.Second)
Expect(q.Len()).To(Equal(4))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(4))
metrics.mu.Unlock()

// Drain queue
for range 4 {
item, _ := q.Get()
q.Done(item)
}
Expect(q.Len()).To(Equal(0))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(0))
metrics.mu.Unlock()

// Validate that doing it again still works to notice bugs with removing
// it from the queues becameReady tracking.
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo")
q.AddWithOpts(AddOpts{}, "baz")
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar")
q.AddWithOpts(AddOpts{}, "bal")

Expect(q.Len()).To(Equal(2))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(2))
metrics.mu.Unlock()
time.Sleep(time.Second)
Expect(q.Len()).To(Equal(4))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(4))
metrics.mu.Unlock()
})

It("returns many items", func() {
// This test ensures the queue is able to drain a large queue without panic'ing.
// In a previous version of the code we were calling queue.Delete within q.Ascend
// which led to a panic in queue.Ascend > iterate:
// "panic: runtime error: index out of range [0] with length 0"
q, _ := newQueue()
defer q.ShutDown()

for range 20 {
for i := range 1000 {
rn := rand.N(100) //nolint:gosec // We don't need cryptographically secure entropy here
if rn < 10 {
q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i))
} else {
q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i))
}
}

wg := sync.WaitGroup{}
for range 100 { // The panic only occurred relatively frequently with a high number of go routines.
wg.Add(1)
go func() {
defer wg.Done()
for range 10 {
obj, _, _ := q.GetWithPriority()
q.Done(obj)
}
}()
}

wg.Wait()
}
})
})

func BenchmarkAddGetDone(b *testing.B) {
Expand Down Expand Up @@ -438,10 +534,6 @@ func TestFuzzPrioriorityQueue(t *testing.T) {
}

wg.Wait()

if expected := len(inQueue); expected != q.Len() {
t.Errorf("Expected queue length to be %d, was %d", expected, q.Len())
}
}

func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
Expand All @@ -453,6 +545,8 @@ func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
bTree: q.(*priorityqueue[string]).queue,
}

// validate that tick always gets a positive value as it will just return
// nil otherwise, which results in blocking forever.
upstreamTick := q.(*priorityqueue[string]).tick
q.(*priorityqueue[string]).tick = func(d time.Duration) <-chan time.Time {
if d <= 0 {
Expand All @@ -477,7 +571,7 @@ func (b *btreeInteractionValidator) ReplaceOrInsert(item *item[string]) (*item[s
}

func (b *btreeInteractionValidator) Delete(item *item[string]) (*item[string], bool) {
// There is node codepath that deletes an item that doesn't exist
// There is no codepath that deletes an item that doesn't exist
old, existed := b.bTree.Delete(item)
if !existed {
panic(fmt.Sprintf("Delete: item %v not found", item))
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
}

// isObjectUnchanged checks if the object in a create event is unchanged, for example because
// we got it in our initial listwatch or because of a resync. The heuristic it uses is to check
// if the object is older than one minute.
// we got it in our initial listwatch. The heuristic it uses is to check if the object is older
// than one minute.
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
}
4 changes: 2 additions & 2 deletions pkg/handler/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ var _ = Describe("Eventhandler", func() {
})

Describe("WithLowPriorityWhenUnchanged", func() {
It("should lower the priority of a create request for an object that was crated more than one minute in the past", func() {
It("should lower the priority of a create request for an object that was created more than one minute in the past", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
Expand All @@ -797,7 +797,7 @@ var _ = Describe("Eventhandler", func() {
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of a create request for an object that was crated less than one minute in the past", func() {
It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
Expand Down
Loading