Skip to content

Commit 27714cb

Browse files
committed
Change queue to embed the consumers
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent eb4f640 commit 27714cb

15 files changed

+367
-383
lines changed

.chloggen/consumer-queue.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Change queue to embed the async consumers.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12242]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/internal/obs_queue_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ import (
2424
)
2525

2626
type fakeQueue[T any] struct {
27-
component.StartFunc
28-
component.ShutdownFunc
27+
exporterqueue.Queue[T]
2928
offerErr error
3029
size int64
3130
capacity int64
@@ -39,10 +38,6 @@ func (fq *fakeQueue[T]) Capacity() int64 {
3938
return fq.capacity
4039
}
4140

42-
func (fq *fakeQueue[T]) Read(context.Context) (context.Context, T, exporterqueue.DoneCallback, bool) {
43-
panic("implement me")
44-
}
45-
4641
func (fq *fakeQueue[T]) Offer(context.Context, T) error {
4742
return fq.offerErr
4843
}

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,8 @@ func (qCfg *QueueConfig) Validate() error {
6666
}
6767

6868
type QueueSender struct {
69-
queue exporterqueue.Queue[internal.Request]
70-
batcher queue.Batcher
71-
consumers *queue.Consumers[internal.Request]
69+
queue exporterqueue.Queue[internal.Request]
70+
batcher component.Component
7271
}
7372

7473
func NewQueueSender(
@@ -79,15 +78,6 @@ func NewQueueSender(
7978
exportFailureMessage string,
8079
next Sender[internal.Request],
8180
) (*QueueSender, error) {
82-
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg))
83-
if err != nil {
84-
return nil, err
85-
}
86-
87-
qs := &QueueSender{
88-
queue: q,
89-
}
90-
9181
exportFunc := func(ctx context.Context, req internal.Request) error {
9282
err := next.Send(ctx, req)
9383
if err != nil {
@@ -96,12 +86,30 @@ func NewQueueSender(
9686
}
9787
return err
9888
}
99-
if usePullingBasedExporterQueueBatcher.IsEnabled() {
100-
qs.batcher, _ = queue.NewBatcher(bCfg, q, exportFunc, qCfg.NumConsumers)
101-
} else {
102-
qs.consumers = queue.NewQueueConsumers[internal.Request](q, qCfg.NumConsumers, exportFunc)
89+
if !usePullingBasedExporterQueueBatcher.IsEnabled() {
90+
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, func(ctx context.Context, req internal.Request, done exporterqueue.DoneCallback) {
91+
done(exportFunc(ctx, req))
92+
}))
93+
if err != nil {
94+
return nil, err
95+
}
96+
return &QueueSender{queue: q}, nil
10397
}
104-
return qs, nil
98+
99+
b, err := queue.NewBatcher(bCfg, exportFunc, qCfg.NumConsumers)
100+
if err != nil {
101+
return nil, err
102+
}
103+
// TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244
104+
if bCfg.Enabled {
105+
qCfg.NumConsumers = 1
106+
}
107+
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, b.Consume))
108+
if err != nil {
109+
return nil, err
110+
}
111+
112+
return &QueueSender{queue: q, batcher: b}, nil
105113
}
106114

107115
// Start is invoked during service startup.
@@ -113,25 +121,22 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
113121
if usePullingBasedExporterQueueBatcher.IsEnabled() {
114122
return qs.batcher.Start(ctx, host)
115123
}
116-
return qs.consumers.Start(ctx, host)
124+
125+
return nil
117126
}
118127

119128
// Shutdown is invoked during service shutdown.
120129
func (qs *QueueSender) Shutdown(ctx context.Context) error {
121-
// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
130+
// Stop the queue and batcher, this will drain the queue and will call the retry (which is stopped) that will only
122131
// try once every request.
123-
124-
if err := qs.queue.Shutdown(ctx); err != nil {
125-
return err
126-
}
132+
err := qs.queue.Shutdown(ctx)
127133
if usePullingBasedExporterQueueBatcher.IsEnabled() {
128-
return qs.batcher.Shutdown(ctx)
134+
return errors.Join(err, qs.batcher.Shutdown(ctx))
129135
}
130-
err := qs.consumers.Shutdown(ctx)
131136
return err
132137
}
133138

134-
// send implements the requestSender interface. It puts the request in the queue.
139+
// Send implements the requestSender interface. It puts the request in the queue.
135140
func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error {
136141
// Prevent cancellation and deadline to propagate to the context stored in the queue.
137142
// The grpc/http based receivers will cancel the request context after this function returns.

exporter/exporterqueue/bounded_memory_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type memoryQueueSettings[T any] struct {
3030

3131
// newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
3232
// callback for dropped items (e.g. useful to emit metrics).
33-
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
33+
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) readableQueue[T] {
3434
return &boundedMemoryQueue[T]{
3535
sizedQueue: newSizedQueue[T](set.capacity, set.sizer, set.blocking),
3636
}

exporter/exporterqueue/bounded_memory_queue_test.go

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ func TestBoundedQueue(t *testing.T) {
7474
func TestShutdownWhileNotEmpty(t *testing.T) {
7575
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1000})
7676

77-
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
77+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
7878
for i := 0; i < 10; i++ {
7979
require.NoError(t, q.Offer(context.Background(), strconv.FormatInt(int64(i), 10)))
8080
}
81-
assert.NoError(t, q.Shutdown(context.Background()))
81+
require.NoError(t, q.Shutdown(context.Background()))
8282

8383
assert.Equal(t, int64(10), q.Size())
8484
numConsumed := 0
@@ -115,16 +115,15 @@ func TestQueueUsage(t *testing.T) {
115115
t.Run(tt.name, func(t *testing.T) {
116116
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100)})
117117
consumed := &atomic.Int64{}
118-
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
119-
ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error {
118+
ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done DoneCallback) {
120119
consumed.Add(1)
121-
return nil
120+
done(nil)
122121
})
122+
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
123123
for j := 0; j < 10; j++ {
124124
require.NoError(t, q.Offer(context.Background(), uint64(10)))
125125
}
126-
assert.NoError(t, q.Shutdown(context.Background()))
127-
assert.NoError(t, ac.Shutdown(context.Background()))
126+
require.NoError(t, ac.Shutdown(context.Background()))
128127
assert.Equal(t, int64(10), consumed.Load())
129128
})
130129
}
@@ -148,11 +147,11 @@ func TestBlockingQueueUsage(t *testing.T) {
148147
t.Run(tt.name, func(t *testing.T) {
149148
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100), blocking: true})
150149
consumed := &atomic.Int64{}
151-
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
152-
ac := newAsyncConsumer(q, 10, func(context.Context, uint64) error {
150+
ac := newConsumerQueue(q, 10, func(_ context.Context, _ uint64, done DoneCallback) {
153151
consumed.Add(1)
154-
return nil
152+
done(nil)
155153
})
154+
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
156155
wg := &sync.WaitGroup{}
157156
for i := 0; i < 10; i++ {
158157
wg.Add(1)
@@ -164,25 +163,21 @@ func TestBlockingQueueUsage(t *testing.T) {
164163
}()
165164
}
166165
wg.Wait()
167-
assert.NoError(t, q.Shutdown(context.Background()))
168-
assert.NoError(t, ac.Shutdown(context.Background()))
166+
require.NoError(t, ac.Shutdown(context.Background()))
169167
assert.Equal(t, int64(1_000_000), consumed.Load())
170168
})
171169
}
172170
}
173171

174172
func TestZeroSizeNoConsumers(t *testing.T) {
175173
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0})
176-
177174
err := q.Start(context.Background(), componenttest.NewNopHost())
178175
require.NoError(t, err)
179-
180176
require.ErrorIs(t, q.Offer(context.Background(), "a"), ErrQueueIsFull) // in process
181-
182177
assert.NoError(t, q.Shutdown(context.Background()))
183178
}
184179

185-
func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool {
180+
func consume[T any](q readableQueue[T], consumeFunc func(context.Context, T) error) bool {
186181
ctx, req, done, ok := q.Read(context.Background())
187182
if !ok {
188183
return false
@@ -191,35 +186,6 @@ func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool
191186
return true
192187
}
193188

194-
type asyncConsumer struct {
195-
stopWG sync.WaitGroup
196-
}
197-
198-
func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *asyncConsumer {
199-
ac := &asyncConsumer{}
200-
201-
ac.stopWG.Add(numConsumers)
202-
for i := 0; i < numConsumers; i++ {
203-
go func() {
204-
defer ac.stopWG.Done()
205-
for {
206-
ctx, req, done, ok := q.Read(context.Background())
207-
if !ok {
208-
return
209-
}
210-
done(consumeFunc(ctx, req))
211-
}
212-
}()
213-
}
214-
return ac
215-
}
216-
217-
// Shutdown ensures that queue and all consumers are stopped.
218-
func (qc *asyncConsumer) Shutdown(_ context.Context) error {
219-
qc.stopWG.Wait()
220-
return nil
221-
}
222-
223189
func BenchmarkOffer(b *testing.B) {
224190
tests := []struct {
225191
name string
@@ -236,20 +202,20 @@ func BenchmarkOffer(b *testing.B) {
236202
}
237203
for _, tt := range tests {
238204
b.Run(tt.name, func(b *testing.B) {
239-
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: &requestSizer[uint64]{}, capacity: int64(10 * b.N)})
205+
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(10 * b.N)})
240206
consumed := &atomic.Int64{}
241207
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
242-
ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error {
208+
ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done DoneCallback) {
243209
consumed.Add(1)
244-
return nil
210+
done(nil)
245211
})
212+
require.NoError(b, ac.Start(context.Background(), componenttest.NewNopHost()))
246213
b.ResetTimer()
247214
b.ReportAllocs()
248215
for j := 0; j < b.N; j++ {
249216
require.NoError(b, q.Offer(context.Background(), uint64(10)))
250217
}
251-
assert.NoError(b, q.Shutdown(context.Background()))
252-
assert.NoError(b, ac.Shutdown(context.Background()))
218+
require.NoError(b, ac.Shutdown(context.Background()))
253219
assert.Equal(b, int64(b.N), consumed.Load())
254220
})
255221
}

exporter/exporterqueue/consumers.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
5+
6+
import (
7+
"context"
8+
"sync"
9+
10+
"go.opentelemetry.io/collector/component"
11+
)
12+
13+
type consumerQueue[T any] struct {
14+
readableQueue[T]
15+
numConsumers int
16+
consumeFunc ConsumeFunc[T]
17+
stopWG sync.WaitGroup
18+
}
19+
20+
func newConsumerQueue[T any](q readableQueue[T], numConsumers int, consumeFunc ConsumeFunc[T]) *consumerQueue[T] {
21+
return &consumerQueue[T]{
22+
readableQueue: q,
23+
numConsumers: numConsumers,
24+
consumeFunc: consumeFunc,
25+
}
26+
}
27+
28+
// Start ensures that queue and all consumers are started.
29+
func (qc *consumerQueue[T]) Start(ctx context.Context, host component.Host) error {
30+
if err := qc.readableQueue.Start(ctx, host); err != nil {
31+
return err
32+
}
33+
var startWG sync.WaitGroup
34+
for i := 0; i < qc.numConsumers; i++ {
35+
qc.stopWG.Add(1)
36+
startWG.Add(1)
37+
go func() {
38+
startWG.Done()
39+
defer qc.stopWG.Done()
40+
for {
41+
ctx, req, done, ok := qc.readableQueue.Read(context.Background())
42+
if !ok {
43+
return
44+
}
45+
qc.consumeFunc(ctx, req, done)
46+
}
47+
}()
48+
}
49+
startWG.Wait()
50+
51+
return nil
52+
}
53+
54+
// Shutdown ensures that queue and all consumers are stopped.
55+
func (qc *consumerQueue[T]) Shutdown(ctx context.Context) error {
56+
err := qc.readableQueue.Shutdown(ctx)
57+
qc.stopWG.Wait()
58+
return err
59+
}

exporter/exporterqueue/persistent_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ type persistentQueue[T any] struct {
9292
}
9393

9494
// newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
95-
func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] {
95+
func newPersistentQueue[T any](set persistentQueueSettings[T]) readableQueue[T] {
9696
_, isRequestSized := set.sizer.(*requestSizer[T])
9797
pq := &persistentQueue[T]{
9898
set: set,

0 commit comments

Comments
 (0)