diff --git a/.chloggen/ensure-queue-size-update.yaml b/.chloggen/ensure-queue-size-update.yaml new file mode 100644 index 00000000000..39941dd3c8f --- /dev/null +++ b/.chloggen/ensure-queue-size-update.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Update queue size after the element is done exported + +# One or more tracking issues or pull requests related to the change +issues: [12399] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: After this change the active queue size will include elements in the process of being exported. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index d38c1f91f6d..f1a2ce6d36b 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -90,7 +90,7 @@ func TestBatchSender_Merge(t *testing.T) { assert.Equal(t, int64(1), sink.RequestsCount()) assert.Eventually(t, func() bool { return sink.RequestsCount() == 2 && sink.ItemsCount() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) }) } for _, tt := range tests { @@ -160,12 +160,12 @@ func TestBatchSender_BatchExportError(t *testing.T) { errReq := &requesttest.FakeRequest{Items: 20, ExportErr: errors.New("transient error"), Sink: sink} require.NoError(t, be.Send(context.Background(), errReq)) - // the batch should be dropped since the queue doesn't have requeuing enabled. + // the batch should be dropped since the queue doesn't have re-queuing enabled. assert.Eventually(t, func() bool { return sink.RequestsCount() == tt.expectedRequests && sink.ItemsCount() == tt.expectedItems && be.queue.Size() == 0 - }, 100*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) require.NoError(t, be.Shutdown(context.Background())) }) @@ -194,13 +194,13 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink})) assert.Eventually(t, func() bool { return sink.RequestsCount() == 1 && sink.ItemsCount() == 8 - }, 500*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) // big request should be broken down into two requests, both are sent right away. require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink})) assert.Eventually(t, func() bool { return sink.RequestsCount() == 3 && sink.ItemsCount() == 25 - }, 500*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) // request that cannot be split should be dropped. require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{ @@ -212,7 +212,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink})) assert.Eventually(t, func() bool { return sink.RequestsCount() == 5 && sink.ItemsCount() == 38 - }, 500*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) require.NoError(t, be.Shutdown(context.Background())) }) } @@ -370,20 +370,20 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { assert.Eventually(t, func() bool { return sink.RequestsCount() == 1 && sink.ItemsCount() == 4 - }, 100*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) // the 3rd request should be flushed by itself due to flush interval require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink})) assert.Eventually(t, func() bool { return sink.RequestsCount() == 2 && sink.ItemsCount() == 6 - }, 100*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) // the 4th and 5th request should be flushed in the same batched request by max concurrency limit. require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink})) require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink})) assert.Eventually(t, func() bool { return sink.RequestsCount() == 3 && sink.ItemsCount() == 10 - }, 100*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) // do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling. require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 5, Sink: sink})) @@ -392,7 +392,7 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { // in case of MaxSizeItems=10, wait for the leftover request to send assert.Eventually(t, func() bool { return sink.RequestsCount() == 5 && sink.ItemsCount() == 21 - }, 50*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) } require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) @@ -400,7 +400,7 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 20, Sink: sink})) assert.Eventually(t, func() bool { return sink.RequestsCount() == tt.expectedRequests && sink.ItemsCount() == tt.expectedItems - }, 100*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) }) } } @@ -648,7 +648,7 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) { assert.EventuallyWithT(t, func(c *assert.CollectT) { assert.LessOrEqual(c, int64(1), sink.RequestsCount()) assert.EqualValues(c, 8, sink.ItemsCount()) - }, 500*time.Millisecond, 10*time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) require.NoError(t, be.Shutdown(context.Background())) } diff --git a/exporter/exporterqueue/async_queue_test.go b/exporter/exporterqueue/async_queue_test.go index 89d8bf48d9a..9de8a57c258 100644 --- a/exporter/exporterqueue/async_queue_test.go +++ b/exporter/exporterqueue/async_queue_test.go @@ -70,7 +70,7 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - for j := 0; j < 11; j++ { + for j := 0; j < 10; j++ { assert.NoError(t, ac.Offer(ctx, 1)) } assert.ErrorIs(t, ac.Offer(ctx, 3), context.Canceled) diff --git a/exporter/exporterqueue/disabled_queue.go b/exporter/exporterqueue/disabled_queue.go index 0033ed3014c..fd1ff192cc5 100644 --- a/exporter/exporterqueue/disabled_queue.go +++ b/exporter/exporterqueue/disabled_queue.go @@ -19,6 +19,7 @@ var donePool = sync.Pool{ func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] { return &disabledQueue[T]{ + sizer: &requestSizer[T]{}, consumeFunc: consumeFunc, size: &atomic.Int64{}, } @@ -28,14 +29,18 @@ type disabledQueue[T any] struct { component.StartFunc component.ShutdownFunc consumeFunc ConsumeFunc[T] + sizer sizer[T] size *atomic.Int64 } func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error { + elSize := d.sizer.Sizeof(req) + d.size.Add(elSize) + done := donePool.Get().(*blockingDone) - d.size.Add(1) + done.queue = d + done.elSize = elSize d.consumeFunc(ctx, req, done) - defer d.size.Add(-1) // Only re-add the blockingDone instance back to the pool if successfully received the // message from the consumer which guarantees consumer will not use that anymore, // otherwise no guarantee about when the consumer will add the message to the channel so cannot reuse or close. @@ -48,6 +53,10 @@ func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error { } } +func (d *disabledQueue[T]) onDone(elSize int64) { + d.size.Add(-elSize) +} + // Size returns the current number of blocked requests waiting to be processed. func (d *disabledQueue[T]) Size() int64 { return d.size.Load() @@ -59,9 +68,14 @@ func (d *disabledQueue[T]) Capacity() int64 { } type blockingDone struct { - ch chan error + queue interface { + onDone(int64) + } + elSize int64 + ch chan error } func (d *blockingDone) OnDone(err error) { + d.queue.onDone(d.elSize) d.ch <- err } diff --git a/exporter/exporterqueue/memory_queue.go b/exporter/exporterqueue/memory_queue.go index ff421c5b78d..2eb7b8d4982 100644 --- a/exporter/exporterqueue/memory_queue.go +++ b/exporter/exporterqueue/memory_queue.go @@ -11,6 +11,12 @@ import ( "go.opentelemetry.io/collector/component" ) +var sizeDonePool = sync.Pool{ + New: func() any { + return &sizeDone{} + }, +} + var errInvalidSize = errors.New("invalid element size") // memoryQueueSettings defines internal parameters for boundedMemoryQueue creation. @@ -91,11 +97,11 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool) defer sq.mu.Unlock() for { - if sq.size > 0 { + if sq.items.hasElements() { elCtx, el, elSize := sq.items.pop() - sq.size -= elSize - sq.hasMoreSpace.Signal() - return elCtx, el, noopDoneInst, true + sd := sizeDonePool.Get().(*sizeDone) + sd.reset(elSize, sq) + return elCtx, el, sd, true } if sq.stopped { @@ -109,6 +115,13 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool) } } +func (sq *memoryQueue[T]) onDone(elSize int64) { + sq.mu.Lock() + defer sq.mu.Unlock() + sq.size -= elSize + sq.hasMoreSpace.Signal() +} + // Shutdown closes the queue channel to initiate draining of the queue. func (sq *memoryQueue[T]) Shutdown(context.Context) error { sq.mu.Lock() @@ -142,6 +155,7 @@ type linkedQueue[T any] struct { func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) { n := &node[T]{ctx: ctx, data: data, size: size} + // If tail is nil means list is empty so update both head and tail to point to same element. if l.tail == nil { l.head = n l.tail = n @@ -151,9 +165,14 @@ func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) { l.tail = n } +func (l *linkedQueue[T]) hasElements() bool { + return l.head != nil +} + func (l *linkedQueue[T]) pop() (context.Context, T, int64) { n := l.head l.head = n.next + // If it gets to the last element, then update tail as well. if l.head == nil { l.tail = nil } @@ -161,8 +180,19 @@ func (l *linkedQueue[T]) pop() (context.Context, T, int64) { return n.ctx, n.data, n.size } -type noopDone struct{} +type sizeDone struct { + size int64 + queue interface { + onDone(int64) + } +} -func (*noopDone) OnDone(error) {} +func (sd *sizeDone) reset(size int64, queue interface{ onDone(int64) }) { + sd.size = size + sd.queue = queue +} -var noopDoneInst = &noopDone{} +func (sd *sizeDone) OnDone(error) { + defer sizeDonePool.Put(sd) + sd.queue.onDone(sd.size) +} diff --git a/exporter/exporterqueue/persistent_queue.go b/exporter/exporterqueue/persistent_queue.go index 45f8fb493bb..44d5b69b24b 100644 --- a/exporter/exporterqueue/persistent_queue.go +++ b/exporter/exporterqueue/persistent_queue.go @@ -38,6 +38,12 @@ var ( errWrongExtensionType = errors.New("requested extension is not a storage extension") ) +var indexDonePool = sync.Pool{ + New: func() any { + return &indexDone{} + }, +} + type persistentQueueSettings[T any] struct { sizer sizer[T] capacity int64 @@ -292,16 +298,9 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don pq.hasMoreSpace.Signal() } if consumed { - pq.queueSize -= pq.set.sizer.Sizeof(req) - // The size might be not in sync with the queue in case it's restored from the disk - // because we don't flush the current queue size on the disk on every read/write. - // In that case we need to make sure it doesn't go below 0. - if pq.queueSize < 0 { - pq.queueSize = 0 - } - pq.hasMoreSpace.Signal() - - return context.Background(), req, indexDone[T]{index: index, pq: pq}, true + id := indexDonePool.Get().(*indexDone) + id.reset(index, pq.set.sizer.Sizeof(req), pq) + return context.Background(), req, id, true } } @@ -348,7 +347,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) } // onDone should be called to remove the item of the given index from the queue once processing is finished. -func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) { +func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr error) { // Delete the item from the persistent storage after it was processed. pq.mu.Lock() // Always unref client even if the consumer is shutdown because we always ref it for every valid request. @@ -359,6 +358,15 @@ func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) { pq.mu.Unlock() }() + pq.queueSize -= elSize + // The size might be not in sync with the queue in case it's restored from the disk + // because we don't flush the current queue size on the disk on every read/write. + // In that case we need to make sure it doesn't go below 0. + if pq.queueSize < 0 { + pq.queueSize = 0 + } + pq.hasMoreSpace.Signal() + if experr.IsShutdownErr(consumeErr) { // The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart. // TODO: Handle partially delivered requests by updating their values in the storage. @@ -555,11 +563,20 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) { return val, nil } -type indexDone[T any] struct { +type indexDone struct { index uint64 - pq *persistentQueue[T] + size int64 + queue interface { + onDone(uint64, int64, error) + } +} + +func (id *indexDone) reset(index uint64, size int64, queue interface{ onDone(uint64, int64, error) }) { + id.index = index + id.size = size + id.queue = queue } -func (id indexDone[T]) OnDone(err error) { - id.pq.onDone(id.index, err) +func (id *indexDone) OnDone(err error) { + id.queue.onDone(id.index, id.size, err) } diff --git a/exporter/exporterqueue/persistent_queue_test.go b/exporter/exporterqueue/persistent_queue_test.go index 66080a0e328..b7c57f6fb74 100644 --- a/exporter/exporterqueue/persistent_queue_test.go +++ b/exporter/exporterqueue/persistent_queue_test.go @@ -605,8 +605,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { // Put some items, make sure they are loaded and shutdown the storage... for i := 0; i < 3; i++ { - err := ps.Offer(context.Background(), uint64(50)) - require.NoError(t, err) + require.NoError(t, ps.Offer(context.Background(), uint64(50))) } assert.Equal(t, int64(3), ps.Size()) require.True(t, consume(ps, func(context.Context, uint64) error { @@ -652,8 +651,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { ps := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000) for i := 0; i < 5; i++ { - err := ps.Offer(context.Background(), req) - require.NoError(t, err) + require.NoError(t, ps.Offer(context.Background(), req)) } requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{}) @@ -712,21 +710,20 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { // Put in items up to capacity for i := 0; i < 5; i++ { - err := ps.Offer(context.Background(), req) - require.NoError(t, err) + require.NoError(t, ps.Offer(context.Background(), req)) } + require.Equal(t, int64(5), ps.Size()) require.True(t, consume(ps, func(context.Context, uint64) error { - // put one more item in - require.NoError(t, ps.Offer(context.Background(), req)) + // Check that size is still full even when consuming the element. require.Equal(t, int64(5), ps.Size()) return experr.NewShutdownErr(nil) })) require.NoError(t, ps.Shutdown(context.Background())) // Reload with extra capacity to make sure we re-enqueue in-progress items. - newPs := createTestPersistentQueueWithRequestsCapacity(t, ext, 6) - require.Equal(t, int64(6), newPs.Size()) + newPs := createTestPersistentQueueWithRequestsCapacity(t, ext, 5) + require.Equal(t, int64(5), newPs.Size()) } func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { @@ -1004,9 +1001,7 @@ func TestPersistentQueue_ItemDispatchingFinish_ErrorHandling(t *testing.T) { ps := createTestPersistentQueueWithClient(client) client.Reset() - err := ps.itemDispatchingFinish(context.Background(), 0) - - require.ErrorIs(t, err, tt.expectedError) + require.ErrorIs(t, ps.itemDispatchingFinish(context.Background(), 0), tt.expectedError) }) } }