Skip to content

Change the memory queue implementation to not pre-allocate capacity objects. #12070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/sized-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change the memory queue implementation to not pre-allocate capacity objects.

# One or more tracking issues or pull requests related to the change
issues: [12070]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This change improves memory usage of the collector under low utilization and is a prerequisite for supporting different other size limitations (number of items, bytes).

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
35 changes: 5 additions & 30 deletions exporter/exporterqueue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
*sizedChannel[memQueueEl[T]]
*sizedQueue[T]
}

// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
Expand All @@ -29,40 +29,15 @@ type memoryQueueSettings[T any] struct {
// callback for dropped items (e.g. useful to emit metrics).
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
return &boundedMemoryQueue[T]{
sizedChannel: newSizedChannel[memQueueEl[T]](set.capacity, memQueueElSizer[T]{sizer: set.sizer}),
sizedQueue: newSizedQueue[T](set.capacity, set.sizer),
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req})
}

func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
item, ok := q.sizedChannel.pop()
return 0, item.ctx, item.req, ok
ctx, req, ok := q.sizedQueue.pop()
return 0, ctx, req, ok
}

// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
// For in memory queue, this function is noop.
func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
q.sizedChannel.shutdown()
return nil
}

type memQueueEl[T any] struct {
req T
ctx context.Context
}

type memQueueElSizer[T any] struct {
sizer sizer[T]
}

func (mqes memQueueElSizer[T]) Sizeof(el memQueueEl[T]) int64 {
return mqes.sizer.Sizeof(el.req)
}
func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {}
71 changes: 0 additions & 71 deletions exporter/exporterqueue/sized_channel.go

This file was deleted.

51 changes: 0 additions & 51 deletions exporter/exporterqueue/sized_channel_test.go

This file was deleted.

138 changes: 138 additions & 0 deletions exporter/exporterqueue/sized_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"

import (
"context"
"errors"
"sync"
)

var errInvalidSize = errors.New("invalid element size")

type node[T any] struct {
ctx context.Context
data T
size int64
next *node[T]
}

type linkedQueue[T any] struct {
head *node[T]
tail *node[T]
}

func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
n := &node[T]{ctx: ctx, data: data, size: size}
if l.tail == nil {
l.head = n
l.tail = n
return
}
l.tail.next = n
l.tail = n
}

func (l *linkedQueue[T]) pop() (context.Context, T, int64) {
n := l.head
l.head = n.next
if l.head == nil {
l.tail = nil
}
n.next = nil
return n.ctx, n.data, n.size
}

// sizedQueue is a channel wrapper for sized elements with a capacity set to a total size of all the elements.
// The channel will accept elements until the total size of the elements reaches the capacity.
type sizedQueue[T any] struct {
sizer sizer[T]
cap int64

mu sync.Mutex
hasElements *sync.Cond
items *linkedQueue[T]
size int64
stopped bool
}

// newSizedQueue creates a sized elements channel. Each element is assigned a size by the provided sizer.
// capacity is the capacity of the queue.
func newSizedQueue[T any](capacity int64, sizer sizer[T]) *sizedQueue[T] {
sq := &sizedQueue[T]{
sizer: sizer,
cap: capacity,
items: &linkedQueue[T]{},
}
sq.hasElements = sync.NewCond(&sq.mu)
return sq
}

// Offer puts the element into the queue with the given sized if there is enough capacity.
// Returns an error if the queue is full.
func (sq *sizedQueue[T]) Offer(ctx context.Context, el T) error {
elSize := sq.sizer.Sizeof(el)
if elSize == 0 {
return nil
}

if elSize <= 0 {
return errInvalidSize
}

sq.mu.Lock()
defer sq.mu.Unlock()

if sq.size+elSize > sq.cap {
return ErrQueueIsFull
}

sq.size += elSize
sq.items.push(ctx, el, elSize)
// Signal one consumer if any.
sq.hasElements.Signal()
return nil
}

// pop removes the element from the queue and returns it.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (sq *sizedQueue[T]) pop() (context.Context, T, bool) {
sq.mu.Lock()
defer sq.mu.Unlock()

for {
if sq.size > 0 {
ctx, el, elSize := sq.items.pop()
sq.size -= elSize
return ctx, el, true
}

if sq.stopped {
var el T
return context.Background(), el, false
}

sq.hasElements.Wait()
}
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (sq *sizedQueue[T]) Shutdown(context.Context) error {
sq.mu.Lock()
defer sq.mu.Unlock()
sq.stopped = true
sq.hasElements.Broadcast()
return nil
}

func (sq *sizedQueue[T]) Size() int {
sq.mu.Lock()
defer sq.mu.Unlock()
return int(sq.size)
}

func (sq *sizedQueue[T]) Capacity() int {
return int(sq.cap)
}
Loading
Loading