Skip to content

Commit 7b878cf

Browse files
authored
Change queue to embed the consumers (#12242)
This PR will facilitate: 1. The implementation of the disabled queue, can be a simpler call into the "batcher" with the done callback setup and block until that is called. 2. The implementation of the disabled batcher becomes trivial, and does not suffer from the limitation that only one reader of the queue is setup. Even though the request was sent async by multiple go-routines, still one reader which may have been a bottleneck. In a separate PR will try to get rid of readableQueue. Initial version of the PR had "read" as a private func on Queue, but the "unused" linter complain that `boundedMemoryQueue.read` was unused. Technically this is a breaking change, but Queue is an experimental API that we allow changes. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent eb4f640 commit 7b878cf

15 files changed

+367
-383
lines changed

.chloggen/consumer-queue.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Change queue to embed the async consumers.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12242]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/internal/obs_queue_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ import (
2424
)
2525

2626
type fakeQueue[T any] struct {
27-
component.StartFunc
28-
component.ShutdownFunc
27+
exporterqueue.Queue[T]
2928
offerErr error
3029
size int64
3130
capacity int64
@@ -39,10 +38,6 @@ func (fq *fakeQueue[T]) Capacity() int64 {
3938
return fq.capacity
4039
}
4140

42-
func (fq *fakeQueue[T]) Read(context.Context) (context.Context, T, exporterqueue.DoneCallback, bool) {
43-
panic("implement me")
44-
}
45-
4641
func (fq *fakeQueue[T]) Offer(context.Context, T) error {
4742
return fq.offerErr
4843
}

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,8 @@ func (qCfg *QueueConfig) Validate() error {
6666
}
6767

6868
type QueueSender struct {
69-
queue exporterqueue.Queue[internal.Request]
70-
batcher queue.Batcher
71-
consumers *queue.Consumers[internal.Request]
69+
queue exporterqueue.Queue[internal.Request]
70+
batcher component.Component
7271
}
7372

7473
func NewQueueSender(
@@ -79,15 +78,6 @@ func NewQueueSender(
7978
exportFailureMessage string,
8079
next Sender[internal.Request],
8180
) (*QueueSender, error) {
82-
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg))
83-
if err != nil {
84-
return nil, err
85-
}
86-
87-
qs := &QueueSender{
88-
queue: q,
89-
}
90-
9181
exportFunc := func(ctx context.Context, req internal.Request) error {
9282
err := next.Send(ctx, req)
9383
if err != nil {
@@ -96,12 +86,30 @@ func NewQueueSender(
9686
}
9787
return err
9888
}
99-
if usePullingBasedExporterQueueBatcher.IsEnabled() {
100-
qs.batcher, _ = queue.NewBatcher(bCfg, q, exportFunc, qCfg.NumConsumers)
101-
} else {
102-
qs.consumers = queue.NewQueueConsumers[internal.Request](q, qCfg.NumConsumers, exportFunc)
89+
if !usePullingBasedExporterQueueBatcher.IsEnabled() {
90+
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, func(ctx context.Context, req internal.Request, done exporterqueue.DoneCallback) {
91+
done(exportFunc(ctx, req))
92+
}))
93+
if err != nil {
94+
return nil, err
95+
}
96+
return &QueueSender{queue: q}, nil
10397
}
104-
return qs, nil
98+
99+
b, err := queue.NewBatcher(bCfg, exportFunc, qCfg.NumConsumers)
100+
if err != nil {
101+
return nil, err
102+
}
103+
// TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244
104+
if bCfg.Enabled {
105+
qCfg.NumConsumers = 1
106+
}
107+
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, b.Consume))
108+
if err != nil {
109+
return nil, err
110+
}
111+
112+
return &QueueSender{queue: q, batcher: b}, nil
105113
}
106114

107115
// Start is invoked during service startup.
@@ -113,25 +121,22 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
113121
if usePullingBasedExporterQueueBatcher.IsEnabled() {
114122
return qs.batcher.Start(ctx, host)
115123
}
116-
return qs.consumers.Start(ctx, host)
124+
125+
return nil
117126
}
118127

119128
// Shutdown is invoked during service shutdown.
120129
func (qs *QueueSender) Shutdown(ctx context.Context) error {
121-
// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
130+
// Stop the queue and batcher, this will drain the queue and will call the retry (which is stopped) that will only
122131
// try once every request.
123-
124-
if err := qs.queue.Shutdown(ctx); err != nil {
125-
return err
126-
}
132+
err := qs.queue.Shutdown(ctx)
127133
if usePullingBasedExporterQueueBatcher.IsEnabled() {
128-
return qs.batcher.Shutdown(ctx)
134+
return errors.Join(err, qs.batcher.Shutdown(ctx))
129135
}
130-
err := qs.consumers.Shutdown(ctx)
131136
return err
132137
}
133138

134-
// send implements the requestSender interface. It puts the request in the queue.
139+
// Send implements the requestSender interface. It puts the request in the queue.
135140
func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error {
136141
// Prevent cancellation and deadline to propagate to the context stored in the queue.
137142
// The grpc/http based receivers will cancel the request context after this function returns.

exporter/exporterqueue/bounded_memory_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type memoryQueueSettings[T any] struct {
3030

3131
// newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
3232
// callback for dropped items (e.g. useful to emit metrics).
33-
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
33+
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) readableQueue[T] {
3434
return &boundedMemoryQueue[T]{
3535
sizedQueue: newSizedQueue[T](set.capacity, set.sizer, set.blocking),
3636
}

exporter/exporterqueue/bounded_memory_queue_test.go

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ func TestBoundedQueue(t *testing.T) {
7474
func TestShutdownWhileNotEmpty(t *testing.T) {
7575
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1000})
7676

77-
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
77+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
7878
for i := 0; i < 10; i++ {
7979
require.NoError(t, q.Offer(context.Background(), strconv.FormatInt(int64(i), 10)))
8080
}
81-
assert.NoError(t, q.Shutdown(context.Background()))
81+
require.NoError(t, q.Shutdown(context.Background()))
8282

8383
assert.Equal(t, int64(10), q.Size())
8484
numConsumed := 0
@@ -115,16 +115,15 @@ func TestQueueUsage(t *testing.T) {
115115
t.Run(tt.name, func(t *testing.T) {
116116
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100)})
117117
consumed := &atomic.Int64{}
118-
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
119-
ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error {
118+
ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done DoneCallback) {
120119
consumed.Add(1)
121-
return nil
120+
done(nil)
122121
})
122+
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
123123
for j := 0; j < 10; j++ {
124124
require.NoError(t, q.Offer(context.Background(), uint64(10)))
125125
}
126-
assert.NoError(t, q.Shutdown(context.Background()))
127-
assert.NoError(t, ac.Shutdown(context.Background()))
126+
require.NoError(t, ac.Shutdown(context.Background()))
128127
assert.Equal(t, int64(10), consumed.Load())
129128
})
130129
}
@@ -148,11 +147,11 @@ func TestBlockingQueueUsage(t *testing.T) {
148147
t.Run(tt.name, func(t *testing.T) {
149148
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100), blocking: true})
150149
consumed := &atomic.Int64{}
151-
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
152-
ac := newAsyncConsumer(q, 10, func(context.Context, uint64) error {
150+
ac := newConsumerQueue(q, 10, func(_ context.Context, _ uint64, done DoneCallback) {
153151
consumed.Add(1)
154-
return nil
152+
done(nil)
155153
})
154+
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
156155
wg := &sync.WaitGroup{}
157156
for i := 0; i < 10; i++ {
158157
wg.Add(1)
@@ -164,25 +163,21 @@ func TestBlockingQueueUsage(t *testing.T) {
164163
}()
165164
}
166165
wg.Wait()
167-
assert.NoError(t, q.Shutdown(context.Background()))
168-
assert.NoError(t, ac.Shutdown(context.Background()))
166+
require.NoError(t, ac.Shutdown(context.Background()))
169167
assert.Equal(t, int64(1_000_000), consumed.Load())
170168
})
171169
}
172170
}
173171

174172
func TestZeroSizeNoConsumers(t *testing.T) {
175173
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0})
176-
177174
err := q.Start(context.Background(), componenttest.NewNopHost())
178175
require.NoError(t, err)
179-
180176
require.ErrorIs(t, q.Offer(context.Background(), "a"), ErrQueueIsFull) // in process
181-
182177
assert.NoError(t, q.Shutdown(context.Background()))
183178
}
184179

185-
func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool {
180+
func consume[T any](q readableQueue[T], consumeFunc func(context.Context, T) error) bool {
186181
ctx, req, done, ok := q.Read(context.Background())
187182
if !ok {
188183
return false
@@ -191,35 +186,6 @@ func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool
191186
return true
192187
}
193188

194-
type asyncConsumer struct {
195-
stopWG sync.WaitGroup
196-
}
197-
198-
func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *asyncConsumer {
199-
ac := &asyncConsumer{}
200-
201-
ac.stopWG.Add(numConsumers)
202-
for i := 0; i < numConsumers; i++ {
203-
go func() {
204-
defer ac.stopWG.Done()
205-
for {
206-
ctx, req, done, ok := q.Read(context.Background())
207-
if !ok {
208-
return
209-
}
210-
done(consumeFunc(ctx, req))
211-
}
212-
}()
213-
}
214-
return ac
215-
}
216-
217-
// Shutdown ensures that queue and all consumers are stopped.
218-
func (qc *asyncConsumer) Shutdown(_ context.Context) error {
219-
qc.stopWG.Wait()
220-
return nil
221-
}
222-
223189
func BenchmarkOffer(b *testing.B) {
224190
tests := []struct {
225191
name string
@@ -236,20 +202,20 @@ func BenchmarkOffer(b *testing.B) {
236202
}
237203
for _, tt := range tests {
238204
b.Run(tt.name, func(b *testing.B) {
239-
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: &requestSizer[uint64]{}, capacity: int64(10 * b.N)})
205+
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(10 * b.N)})
240206
consumed := &atomic.Int64{}
241207
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
242-
ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error {
208+
ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done DoneCallback) {
243209
consumed.Add(1)
244-
return nil
210+
done(nil)
245211
})
212+
require.NoError(b, ac.Start(context.Background(), componenttest.NewNopHost()))
246213
b.ResetTimer()
247214
b.ReportAllocs()
248215
for j := 0; j < b.N; j++ {
249216
require.NoError(b, q.Offer(context.Background(), uint64(10)))
250217
}
251-
assert.NoError(b, q.Shutdown(context.Background()))
252-
assert.NoError(b, ac.Shutdown(context.Background()))
218+
require.NoError(b, ac.Shutdown(context.Background()))
253219
assert.Equal(b, int64(b.N), consumed.Load())
254220
})
255221
}

exporter/exporterqueue/consumers.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
5+
6+
import (
7+
"context"
8+
"sync"
9+
10+
"go.opentelemetry.io/collector/component"
11+
)
12+
13+
type consumerQueue[T any] struct {
14+
readableQueue[T]
15+
numConsumers int
16+
consumeFunc ConsumeFunc[T]
17+
stopWG sync.WaitGroup
18+
}
19+
20+
func newConsumerQueue[T any](q readableQueue[T], numConsumers int, consumeFunc ConsumeFunc[T]) *consumerQueue[T] {
21+
return &consumerQueue[T]{
22+
readableQueue: q,
23+
numConsumers: numConsumers,
24+
consumeFunc: consumeFunc,
25+
}
26+
}
27+
28+
// Start ensures that queue and all consumers are started.
29+
func (qc *consumerQueue[T]) Start(ctx context.Context, host component.Host) error {
30+
if err := qc.readableQueue.Start(ctx, host); err != nil {
31+
return err
32+
}
33+
var startWG sync.WaitGroup
34+
for i := 0; i < qc.numConsumers; i++ {
35+
qc.stopWG.Add(1)
36+
startWG.Add(1)
37+
go func() {
38+
startWG.Done()
39+
defer qc.stopWG.Done()
40+
for {
41+
ctx, req, done, ok := qc.readableQueue.Read(context.Background())
42+
if !ok {
43+
return
44+
}
45+
qc.consumeFunc(ctx, req, done)
46+
}
47+
}()
48+
}
49+
startWG.Wait()
50+
51+
return nil
52+
}
53+
54+
// Shutdown ensures that queue and all consumers are stopped.
55+
func (qc *consumerQueue[T]) Shutdown(ctx context.Context) error {
56+
err := qc.readableQueue.Shutdown(ctx)
57+
qc.stopWG.Wait()
58+
return err
59+
}

exporter/exporterqueue/persistent_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ type persistentQueue[T any] struct {
9292
}
9393

9494
// newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
95-
func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] {
95+
func newPersistentQueue[T any](set persistentQueueSettings[T]) readableQueue[T] {
9696
_, isRequestSized := set.sizer.(*requestSizer[T])
9797
pq := &persistentQueue[T]{
9898
set: set,

0 commit comments

Comments
 (0)