Skip to content

Commit 466ce29

Browse files
committed
Change the memory queue implementation to not pre-allocate capacity objects.
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 2a6150d commit 466ce29

File tree

6 files changed

+251
-152
lines changed

6 files changed

+251
-152
lines changed

.chloggen/sized-queue.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Change the memory queue implementation to not pre-allocate capacity objects.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12070]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: This change improves memory usage of the collector under low utilization and is a prerequisite for supporting different other size limitations (number of items, bytes).
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/exporterqueue/bounded_memory_queue.go

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
// the producer are dropped.
1717
type boundedMemoryQueue[T any] struct {
1818
component.StartFunc
19-
*sizedChannel[memQueueEl[T]]
19+
*sizedQueue[T]
2020
}
2121

2222
// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
@@ -29,40 +29,15 @@ type memoryQueueSettings[T any] struct {
2929
// callback for dropped items (e.g. useful to emit metrics).
3030
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
3131
return &boundedMemoryQueue[T]{
32-
sizedChannel: newSizedChannel[memQueueEl[T]](set.capacity, memQueueElSizer[T]{sizer: set.sizer}),
32+
sizedQueue: newSizedQueue[T](set.capacity, set.sizer),
3333
}
3434
}
3535

36-
// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
37-
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
38-
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req})
39-
}
40-
4136
func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
42-
item, ok := q.sizedChannel.pop()
43-
return 0, item.ctx, item.req, ok
37+
ctx, req, ok := q.sizedQueue.pop()
38+
return 0, ctx, req, ok
4439
}
4540

4641
// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
4742
// For in memory queue, this function is noop.
48-
func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {
49-
}
50-
51-
// Shutdown closes the queue channel to initiate draining of the queue.
52-
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
53-
q.sizedChannel.shutdown()
54-
return nil
55-
}
56-
57-
type memQueueEl[T any] struct {
58-
req T
59-
ctx context.Context
60-
}
61-
62-
type memQueueElSizer[T any] struct {
63-
sizer sizer[T]
64-
}
65-
66-
func (mqes memQueueElSizer[T]) Sizeof(el memQueueEl[T]) int64 {
67-
return mqes.sizer.Sizeof(el.req)
68-
}
43+
func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {}

exporter/exporterqueue/sized_channel.go

Lines changed: 0 additions & 71 deletions
This file was deleted.

exporter/exporterqueue/sized_channel_test.go

Lines changed: 0 additions & 51 deletions
This file was deleted.

exporter/exporterqueue/sized_queue.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"sync"
10+
)
11+
12+
var errInvalidSize = errors.New("invalid element size")
13+
14+
type node[T any] struct {
15+
ctx context.Context
16+
data T
17+
size int64
18+
next *node[T]
19+
}
20+
21+
type linkedQueue[T any] struct {
22+
head *node[T]
23+
tail *node[T]
24+
}
25+
26+
func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
27+
n := &node[T]{ctx: ctx, data: data, size: size}
28+
if l.tail == nil {
29+
l.head = n
30+
l.tail = n
31+
return
32+
}
33+
l.tail.next = n
34+
l.tail = n
35+
}
36+
37+
func (l *linkedQueue[T]) pop() (context.Context, T, int64) {
38+
n := l.head
39+
l.head = n.next
40+
if l.head == nil {
41+
l.tail = nil
42+
}
43+
n.next = nil
44+
return n.ctx, n.data, n.size
45+
}
46+
47+
// sizedQueue is a channel wrapper for sized elements with a capacity set to a total size of all the elements.
48+
// The channel will accept elements until the total size of the elements reaches the capacity.
49+
type sizedQueue[T any] struct {
50+
sizer sizer[T]
51+
cap int64
52+
53+
mu sync.Mutex
54+
hasElements *sync.Cond
55+
items *linkedQueue[T]
56+
size int64
57+
stopped bool
58+
}
59+
60+
// newSizedQueue creates a sized elements channel. Each element is assigned a size by the provided sizer.
61+
// capacity is the capacity of the queue.
62+
func newSizedQueue[T any](capacity int64, sizer sizer[T]) *sizedQueue[T] {
63+
sq := &sizedQueue[T]{
64+
sizer: sizer,
65+
cap: capacity,
66+
items: &linkedQueue[T]{},
67+
}
68+
sq.hasElements = sync.NewCond(&sq.mu)
69+
return sq
70+
}
71+
72+
// Offer puts the element into the queue with the given sized if there is enough capacity.
73+
// Returns an error if the queue is full.
74+
func (sq *sizedQueue[T]) Offer(ctx context.Context, el T) error {
75+
elSize := sq.sizer.Sizeof(el)
76+
if elSize == 0 {
77+
return nil
78+
}
79+
80+
if elSize <= 0 {
81+
return errInvalidSize
82+
}
83+
84+
sq.mu.Lock()
85+
defer sq.mu.Unlock()
86+
87+
if sq.size+elSize > sq.cap {
88+
return ErrQueueIsFull
89+
}
90+
91+
sq.size += elSize
92+
sq.items.push(ctx, el, elSize)
93+
// Signal one consumer if any.
94+
sq.hasElements.Signal()
95+
return nil
96+
}
97+
98+
// pop removes the element from the queue and returns it.
99+
// The call blocks until there is an item available or the queue is stopped.
100+
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
101+
func (sq *sizedQueue[T]) pop() (context.Context, T, bool) {
102+
sq.mu.Lock()
103+
defer sq.mu.Unlock()
104+
105+
for {
106+
if sq.size > 0 {
107+
ctx, el, elSize := sq.items.pop()
108+
sq.size -= elSize
109+
return ctx, el, true
110+
}
111+
112+
if sq.stopped {
113+
var el T
114+
return context.Background(), el, false
115+
}
116+
117+
sq.hasElements.Wait()
118+
}
119+
}
120+
121+
// Shutdown closes the queue channel to initiate draining of the queue.
122+
func (sq *sizedQueue[T]) Shutdown(context.Context) error {
123+
sq.mu.Lock()
124+
defer sq.mu.Unlock()
125+
sq.stopped = true
126+
sq.hasElements.Broadcast()
127+
return nil
128+
}
129+
130+
func (sq *sizedQueue[T]) Size() int {
131+
sq.mu.Lock()
132+
defer sq.mu.Unlock()
133+
return int(sq.size)
134+
}
135+
136+
func (sq *sizedQueue[T]) Capacity() int {
137+
return int(sq.cap)
138+
}

0 commit comments

Comments
 (0)