From b5ddb6530284ae4055a2df0b434cea3679b57669 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 14 Apr 2025 21:29:09 -0700 Subject: [PATCH] Reject too large elements to the queue Without this change, the requests without deadline will block forever until the space is available which will never be. Signed-off-by: Bogdan Drutu --- .chloggen/fix-too-large-size.yaml | 25 +++++++++++++++++++ .../internal/queuebatch/memory_queue.go | 10 +++++++- .../internal/queuebatch/memory_queue_test.go | 8 +++++- 3 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 .chloggen/fix-too-large-size.yaml diff --git a/.chloggen/fix-too-large-size.yaml b/.chloggen/fix-too-large-size.yaml new file mode 100644 index 00000000000..2198e40df38 --- /dev/null +++ b/.chloggen/fix-too-large-size.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Reject elements larger than the queue capacity + +# One or more tracking issues or pull requests related to the change +issues: [12847] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/exporterhelper/internal/queuebatch/memory_queue.go b/exporter/exporterhelper/internal/queuebatch/memory_queue.go index 37a812889b8..1914d447eac 100644 --- a/exporter/exporterhelper/internal/queuebatch/memory_queue.go +++ b/exporter/exporterhelper/internal/queuebatch/memory_queue.go @@ -20,7 +20,10 @@ var blockingDonePool = sync.Pool{ }, } -var errInvalidSize = errors.New("invalid element size") +var ( + errInvalidSize = errors.New("invalid element size") + errSizeTooLarge = errors.New("element size too large") +) // memoryQueueSettings defines internal parameters for boundedMemoryQueue creation. type memoryQueueSettings[T any] struct { @@ -74,6 +77,11 @@ func (mq *memoryQueue[T]) Offer(ctx context.Context, el T) error { return errInvalidSize } + // If element larger than the capacity, will never been able to add it. + if elSize > mq.cap { + return errSizeTooLarge + } + done, err := mq.add(ctx, el, elSize) if err != nil { return err diff --git a/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go b/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go index dca2cb6f089..9687f12c31f 100644 --- a/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go @@ -97,6 +97,11 @@ func TestMemoryQueueOfferInvalidSize(t *testing.T) { require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize) } +func TestMemoryQueueRejectOverCapacityElements(t *testing.T) { + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 7, blockOnOverflow: true}) + require.ErrorIs(t, q.Offer(context.Background(), 8), errSizeTooLarge) +} + func TestMemoryQueueOfferZeroSize(t *testing.T) { q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1}) require.NoError(t, q.Offer(context.Background(), 0)) @@ -106,7 +111,8 @@ func TestMemoryQueueOfferZeroSize(t *testing.T) { } func TestMemoryQueueZeroCapacity(t *testing.T) { - q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 0}) + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1}) + require.NoError(t, q.Offer(context.Background(), 1)) require.ErrorIs(t, q.Offer(context.Background(), 1), ErrQueueIsFull) require.NoError(t, q.Shutdown(context.Background())) }