Skip to content

Commit a8f57a8

Browse files
committed
Implement sync disabled queue
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 677b87e commit a8f57a8

File tree

7 files changed

+305
-18
lines changed

7 files changed

+305
-18
lines changed

.chloggen/disabled-queue.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Implement sync disabled queue used when batching is enabled.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12245]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
7575
}
7676

7777
be := &BaseExporter{
78-
timeoutCfg: NewDefaultTimeoutConfig(),
79-
Set: set,
78+
Set: set,
79+
timeoutCfg: NewDefaultTimeoutConfig(),
80+
queueFactory: exporterqueue.NewMemoryQueueFactory[internal.Request](),
8081
}
8182

8283
for _, op := range options {
@@ -100,7 +101,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
100101
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
101102
}
102103

103-
if be.batcherCfg.Enabled && !(usePullingBasedExporterQueueBatcher.IsEnabled() && be.queueCfg.Enabled) {
104+
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled {
104105
concurrencyLimit := int64(0)
105106
if be.queueCfg.Enabled {
106107
concurrencyLimit = int64(be.queueCfg.NumConsumers)
@@ -109,7 +110,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
109110
be.firstSender = be.BatchSender
110111
}
111112

112-
if be.queueCfg.Enabled {
113+
if be.queueCfg.Enabled || usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled {
113114
qSet := exporterqueue.Settings{
114115
Signal: signal,
115116
ExporterSettings: set,

exporter/exporterhelper/internal/batch_sender_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,9 @@ func TestBatchSender_PostShutdown(t *testing.T) {
327327
assert.Equal(t, int64(8), sink.ItemsCount())
328328
})
329329
}
330-
runTest("enable_queue_batcher", true)
330+
// This test is disabled because in the new batching, we still do the batching while shutdown because that will
331+
// limit the number of request sent.
332+
// runTest("enable_queue_batcher", true)
331333
runTest("disable_queue_batcher", false)
332334
}
333335

@@ -436,8 +438,7 @@ func TestBatchSender_BatchBlocking(t *testing.T) {
436438
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
437439
bCfg := exporterbatcher.NewDefaultConfig()
438440
bCfg.MinSizeItems = 3
439-
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
440-
WithBatcher(bCfg))
441+
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithBatcher(bCfg))
441442
require.NotNil(t, be)
442443
require.NoError(t, err)
443444
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
@@ -449,8 +450,8 @@ func TestBatchSender_BatchBlocking(t *testing.T) {
449450
for i := 0; i < 6; i++ {
450451
wg.Add(1)
451452
go func() {
453+
defer wg.Done()
452454
assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 10 * time.Millisecond}))
453-
wg.Done()
454455
}()
455456
}
456457
wg.Wait()
@@ -473,8 +474,7 @@ func TestBatchSender_BatchCancelled(t *testing.T) {
473474
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
474475
bCfg := exporterbatcher.NewDefaultConfig()
475476
bCfg.MinSizeItems = 2
476-
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
477-
WithBatcher(bCfg))
477+
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithBatcher(bCfg))
478478
require.NotNil(t, be)
479479
require.NoError(t, err)
480480
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
@@ -486,14 +486,14 @@ func TestBatchSender_BatchCancelled(t *testing.T) {
486486
ctx, cancel := context.WithCancel(context.Background())
487487
wg.Add(1)
488488
go func() {
489+
defer wg.Done()
489490
assert.ErrorIs(t, be.Send(ctx, &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 100 * time.Millisecond}), context.Canceled)
490-
wg.Done()
491491
}()
492492
wg.Add(1)
493493
go func() {
494-
time.Sleep(20 * time.Millisecond) // ensure this call is the second
494+
defer wg.Done()
495+
time.Sleep(100 * time.Millisecond) // ensure this call is the second
495496
assert.ErrorIs(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 100 * time.Millisecond}), context.Canceled)
496-
wg.Done()
497497
}()
498498
cancel() // canceling the first request should cancel the whole batch
499499
wg.Wait()
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
5+
6+
import (
7+
"context"
8+
"sync"
9+
"sync/atomic"
10+
11+
"go.opentelemetry.io/collector/component"
12+
)
13+
14+
var donePool = sync.Pool{
15+
New: func() any {
16+
return &blockingDone{ch: make(chan error, 1)}
17+
},
18+
}
19+
20+
func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] {
21+
return &disabledQueue[T]{
22+
consumeFunc: consumeFunc,
23+
size: &atomic.Int64{},
24+
}
25+
}
26+
27+
type disabledQueue[T any] struct {
28+
component.StartFunc
29+
component.ShutdownFunc
30+
consumeFunc ConsumeFunc[T]
31+
size *atomic.Int64
32+
}
33+
34+
func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
35+
done := donePool.Get().(*blockingDone)
36+
d.size.Add(1)
37+
d.consumeFunc(ctx, req, done)
38+
defer d.size.Add(-1)
39+
// Only re-add the blockingDone instance back to the pool if successfully received the
40+
// message from the consumer which guarantees consumer will not use that anymore,
41+
// otherwise no guarantee about when the consumer will add the message to the channel so cannot reuse or close.
42+
select {
43+
case doneErr := <-done.ch:
44+
donePool.Put(done)
45+
return doneErr
46+
case <-ctx.Done():
47+
return ctx.Err()
48+
}
49+
}
50+
51+
// Size returns the current number of blocked requests waiting to be processed.
52+
func (d *disabledQueue[T]) Size() int64 {
53+
return d.size.Load()
54+
}
55+
56+
// Capacity returns the capacity of this queue, which is 0 that means no bounds.
57+
func (d *disabledQueue[T]) Capacity() int64 {
58+
return 0
59+
}
60+
61+
type blockingDone struct {
62+
ch chan error
63+
}
64+
65+
func (d *blockingDone) OnDone(err error) {
66+
d.ch <- err
67+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue
5+
6+
import (
7+
"context"
8+
"errors"
9+
"sync"
10+
"sync/atomic"
11+
"testing"
12+
"time"
13+
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
17+
"go.opentelemetry.io/collector/component/componenttest"
18+
)
19+
20+
func TestDisabledPassErrorBack(t *testing.T) {
21+
myErr := errors.New("test error")
22+
q := newDisabledQueue[int64](func(_ context.Context, _ int64, done Done) {
23+
done.OnDone(myErr)
24+
})
25+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
26+
require.ErrorIs(t, q.Offer(context.Background(), int64(1)), myErr)
27+
require.NoError(t, q.Shutdown(context.Background()))
28+
}
29+
30+
func TestDisabledCancelIncomingRequest(t *testing.T) {
31+
wg := sync.WaitGroup{}
32+
stop := make(chan struct{})
33+
q := newDisabledQueue[int64](func(_ context.Context, _ int64, done Done) {
34+
wg.Add(1)
35+
go func() {
36+
defer wg.Done()
37+
<-stop
38+
done.OnDone(nil)
39+
}()
40+
})
41+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
42+
ctx, cancel := context.WithCancel(context.Background())
43+
wg.Add(1)
44+
go func() {
45+
defer wg.Done()
46+
<-time.After(time.Second)
47+
cancel()
48+
}()
49+
require.ErrorIs(t, q.Offer(ctx, int64(1)), context.Canceled)
50+
close(stop)
51+
require.NoError(t, q.Shutdown(context.Background()))
52+
wg.Wait()
53+
}
54+
55+
func TestDisabledSizeAndCapacity(t *testing.T) {
56+
wg := sync.WaitGroup{}
57+
stop := make(chan struct{})
58+
q := newDisabledQueue[int64](func(_ context.Context, _ int64, done Done) {
59+
wg.Add(1)
60+
go func() {
61+
defer wg.Done()
62+
<-stop
63+
done.OnDone(nil)
64+
}()
65+
})
66+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
67+
assert.EqualValues(t, 0, q.Size())
68+
assert.EqualValues(t, 0, q.Capacity())
69+
wg.Add(1)
70+
go func() {
71+
defer wg.Done()
72+
assert.NoError(t, q.Offer(context.Background(), int64(1)))
73+
}()
74+
assert.Eventually(t, func() bool { return q.Size() == 1 }, 1*time.Second, 10*time.Millisecond)
75+
assert.EqualValues(t, 0, q.Capacity())
76+
close(stop)
77+
require.NoError(t, q.Shutdown(context.Background()))
78+
wg.Wait()
79+
}
80+
81+
func TestDisabledQueueMultiThread(t *testing.T) {
82+
buf := newBuffer()
83+
buf.start()
84+
q := newDisabledQueue[int64](buf.consume)
85+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
86+
wg := sync.WaitGroup{}
87+
for i := 0; i < 10; i++ {
88+
wg.Add(1)
89+
go func() {
90+
defer wg.Done()
91+
for j := 0; j < 10_000; j++ {
92+
assert.NoError(t, q.Offer(context.Background(), int64(j)))
93+
}
94+
}()
95+
}
96+
wg.Wait()
97+
require.NoError(t, q.Shutdown(context.Background()))
98+
buf.shutdown()
99+
assert.Equal(t, int64(10*10_000), buf.consumed())
100+
}
101+
102+
func BenchmarkDisabledQueueOffer(b *testing.B) {
103+
consumed := &atomic.Int64{}
104+
q := newDisabledQueue[int64](func(_ context.Context, _ int64, done Done) {
105+
consumed.Add(1)
106+
done.OnDone(nil)
107+
})
108+
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
109+
b.ResetTimer()
110+
b.ReportAllocs()
111+
for i := 0; i < b.N; i++ {
112+
require.NoError(b, q.Offer(context.Background(), int64(i)))
113+
}
114+
require.NoError(b, q.Shutdown(context.Background()))
115+
assert.Equal(b, int64(b.N), consumed.Load())
116+
}
117+
118+
const flushNum = 5
119+
120+
type buffer struct {
121+
ch chan Done
122+
nr *atomic.Int64
123+
wg sync.WaitGroup
124+
dones []Done
125+
}
126+
127+
func newBuffer() *buffer {
128+
buf := &buffer{
129+
ch: make(chan Done, 10),
130+
nr: &atomic.Int64{},
131+
dones: make([]Done, 0, flushNum),
132+
}
133+
return buf
134+
}
135+
136+
func (buf *buffer) consume(_ context.Context, _ int64, done Done) {
137+
buf.ch <- done
138+
}
139+
140+
func (buf *buffer) start() {
141+
buf.wg.Add(1)
142+
go func() {
143+
defer buf.wg.Done()
144+
buf.dones = make([]Done, 0, flushNum)
145+
for {
146+
select {
147+
case done, ok := <-buf.ch:
148+
if !ok {
149+
return
150+
}
151+
buf.dones = append(buf.dones, done)
152+
if len(buf.dones) == flushNum {
153+
buf.flush()
154+
}
155+
case <-time.After(10 * time.Millisecond):
156+
buf.flush()
157+
}
158+
}
159+
}()
160+
}
161+
162+
func (buf *buffer) shutdown() {
163+
close(buf.ch)
164+
buf.wg.Wait()
165+
}
166+
167+
func (buf *buffer) flush() {
168+
if len(buf.dones) == 0 {
169+
return
170+
}
171+
buf.nr.Add(int64(len(buf.dones)))
172+
for _, done := range buf.dones {
173+
done.OnDone(nil)
174+
}
175+
buf.dones = buf.dones[:0]
176+
}
177+
178+
func (buf *buffer) consumed() int64 {
179+
return buf.nr.Load()
180+
}

exporter/exporterqueue/queue.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ type Factory[T any] func(context.Context, Settings, Config, ConsumeFunc[T]) Queu
8181
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
8282
func NewMemoryQueueFactory[T any]() Factory[T] {
8383
return func(_ context.Context, _ Settings, cfg Config, consume ConsumeFunc[T]) Queue[T] {
84+
if !cfg.Enabled {
85+
return newDisabledQueue(consume)
86+
}
8487
q := newMemoryQueue[T](memoryQueueSettings[T]{
8588
sizer: &requestSizer[T]{},
8689
capacity: int64(cfg.QueueSize),
@@ -109,6 +112,9 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P
109112
return NewMemoryQueueFactory[T]()
110113
}
111114
return func(_ context.Context, set Settings, cfg Config, consume ConsumeFunc[T]) Queue[T] {
115+
if !cfg.Enabled {
116+
return newDisabledQueue(consume)
117+
}
112118
q := newPersistentQueue[T](persistentQueueSettings[T]{
113119
sizer: &requestSizer[T]{},
114120
capacity: int64(cfg.QueueSize),

0 commit comments

Comments
 (0)