Skip to content

Commit 1362ca3

Browse files
committed
Implement sync disabled queue
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent bbe9cb8 commit 1362ca3

File tree

7 files changed

+277
-14
lines changed

7 files changed

+277
-14
lines changed

.chloggen/disabled-queue.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Implement sync disabled queue used when batching is enabled.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12245]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
7575
}
7676

7777
be := &BaseExporter{
78-
timeoutCfg: NewDefaultTimeoutConfig(),
79-
Set: set,
78+
Set: set,
79+
timeoutCfg: NewDefaultTimeoutConfig(),
80+
queueFactory: exporterqueue.NewMemoryQueueFactory[internal.Request](),
8081
}
8182

8283
for _, op := range options {
@@ -100,7 +101,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
100101
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
101102
}
102103

103-
if be.batcherCfg.Enabled && !(usePullingBasedExporterQueueBatcher.IsEnabled() && be.queueCfg.Enabled) {
104+
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled {
104105
concurrencyLimit := int64(0)
105106
if be.queueCfg.Enabled {
106107
concurrencyLimit = int64(be.queueCfg.NumConsumers)
@@ -109,7 +110,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
109110
be.firstSender = be.BatchSender
110111
}
111112

112-
if be.queueCfg.Enabled {
113+
if be.queueCfg.Enabled || usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled {
113114
qSet := exporterqueue.Settings{
114115
Signal: signal,
115116
ExporterSettings: set,
@@ -126,10 +127,15 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
126127

127128
// Send sends the request using the first sender in the chain.
128129
func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error {
130+
// Have to read the number of items before sending the request since the request can
131+
// be modified by the downstream components like the batcher, and even worse can
132+
// have concurrent access to it for example if the queue returns when Context is cancelled
133+
// but request may still be processed.
134+
itemsCount := req.ItemsCount()
129135
err := be.firstSender.Send(ctx, req)
130136
if err != nil {
131137
be.Set.Logger.Error("Exporting failed. Rejecting data."+be.ExportFailureMessage,
132-
zap.Error(err), zap.Int("rejected_items", req.ItemsCount()))
138+
zap.Error(err), zap.Int("rejected_items", itemsCount))
133139
}
134140
return err
135141
}

exporter/exporterhelper/internal/batch_sender_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,9 @@ func TestBatchSender_PostShutdown(t *testing.T) {
327327
assert.Equal(t, int64(8), sink.ItemsCount())
328328
})
329329
}
330-
runTest("enable_queue_batcher", true)
330+
// This test is disabled because in the new batching, we still do the batching while shutdown because that will
331+
// limit the number of request sent.
332+
// runTest("enable_queue_batcher", true)
331333
runTest("disable_queue_batcher", false)
332334
}
333335

@@ -436,8 +438,7 @@ func TestBatchSender_BatchBlocking(t *testing.T) {
436438
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
437439
bCfg := exporterbatcher.NewDefaultConfig()
438440
bCfg.MinSizeItems = 3
439-
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
440-
WithBatcher(bCfg))
441+
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithBatcher(bCfg))
441442
require.NotNil(t, be)
442443
require.NoError(t, err)
443444
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
@@ -449,8 +450,8 @@ func TestBatchSender_BatchBlocking(t *testing.T) {
449450
for i := 0; i < 6; i++ {
450451
wg.Add(1)
451452
go func() {
453+
defer wg.Done()
452454
assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 10 * time.Millisecond}))
453-
wg.Done()
454455
}()
455456
}
456457
wg.Wait()
@@ -473,8 +474,7 @@ func TestBatchSender_BatchCancelled(t *testing.T) {
473474
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
474475
bCfg := exporterbatcher.NewDefaultConfig()
475476
bCfg.MinSizeItems = 2
476-
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
477-
WithBatcher(bCfg))
477+
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithBatcher(bCfg))
478478
require.NotNil(t, be)
479479
require.NoError(t, err)
480480
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
@@ -486,14 +486,14 @@ func TestBatchSender_BatchCancelled(t *testing.T) {
486486
ctx, cancel := context.WithCancel(context.Background())
487487
wg.Add(1)
488488
go func() {
489+
defer wg.Done()
489490
assert.ErrorIs(t, be.Send(ctx, &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 100 * time.Millisecond}), context.Canceled)
490-
wg.Done()
491491
}()
492492
wg.Add(1)
493493
go func() {
494-
time.Sleep(20 * time.Millisecond) // ensure this call is the second
494+
defer wg.Done()
495+
time.Sleep(100 * time.Millisecond) // ensure this call is the second
495496
assert.ErrorIs(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 100 * time.Millisecond}), context.Canceled)
496-
wg.Done()
497497
}()
498498
cancel() // canceling the first request should cancel the whole batch
499499
wg.Wait()
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
5+
6+
import (
7+
"context"
8+
"sync"
9+
"sync/atomic"
10+
11+
"go.opentelemetry.io/collector/component"
12+
)
13+
14+
var donePool = sync.Pool{
15+
New: func() any {
16+
return &blockingDone{ch: make(chan error, 1)}
17+
},
18+
}
19+
20+
func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] {
21+
return &disabledQueue[T]{
22+
consumeFunc: consumeFunc,
23+
size: &atomic.Int64{},
24+
}
25+
}
26+
27+
type disabledQueue[T any] struct {
28+
component.StartFunc
29+
component.ShutdownFunc
30+
consumeFunc ConsumeFunc[T]
31+
size *atomic.Int64
32+
}
33+
34+
func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
35+
done := donePool.Get().(*blockingDone)
36+
d.size.Add(1)
37+
d.consumeFunc(ctx, req, done)
38+
defer d.size.Add(-1)
39+
// Only re-add the blockingDone instance back to the pool if successfully received the
40+
// message from the consumer which guarantees consumer will not use that anymore,
41+
// otherwise no guarantee about when the consumer will add the message to the channel so cannot reuse or close.
42+
select {
43+
case doneErr := <-done.ch:
44+
donePool.Put(done)
45+
return doneErr
46+
case <-ctx.Done():
47+
return ctx.Err()
48+
}
49+
}
50+
51+
// Size returns the current number of blocked requests waiting to be processed.
52+
func (d *disabledQueue[T]) Size() int64 {
53+
return d.size.Load()
54+
}
55+
56+
// Capacity returns the capacity of this queue, which is 0 that means no bounds.
57+
func (d *disabledQueue[T]) Capacity() int64 {
58+
return 0
59+
}
60+
61+
type blockingDone struct {
62+
ch chan error
63+
}
64+
65+
func (d *blockingDone) OnDone(err error) {
66+
d.ch <- err
67+
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue
5+
6+
import (
7+
"context"
8+
"errors"
9+
"sync"
10+
"sync/atomic"
11+
"testing"
12+
"time"
13+
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
17+
"go.opentelemetry.io/collector/component/componenttest"
18+
)
19+
20+
func TestDisabledPassErrorBack(t *testing.T) {
21+
myErr := errors.New("test error")
22+
q := newDisabledQueue[int64](func(_ context.Context, _ int64, done Done) {
23+
done.OnDone(myErr)
24+
})
25+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
26+
require.ErrorIs(t, q.Offer(context.Background(), int64(1)), myErr)
27+
require.NoError(t, q.Shutdown(context.Background()))
28+
}
29+
30+
func TestDisabledCancelIncomingRequest(t *testing.T) {
31+
wg := sync.WaitGroup{}
32+
stop := make(chan struct{})
33+
q := newDisabledQueue[int64](func(_ context.Context, _ int64, done Done) {
34+
wg.Add(1)
35+
go func() {
36+
defer wg.Done()
37+
<-stop
38+
done.OnDone(nil)
39+
}()
40+
})
41+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
42+
ctx, cancel := context.WithCancel(context.Background())
43+
wg.Add(1)
44+
go func() {
45+
defer wg.Done()
46+
<-time.After(time.Second)
47+
cancel()
48+
}()
49+
require.ErrorIs(t, q.Offer(ctx, int64(1)), context.Canceled)
50+
close(stop)
51+
require.NoError(t, q.Shutdown(context.Background()))
52+
wg.Wait()
53+
}
54+
55+
func TestDisabledQueueMultiThread(t *testing.T) {
56+
buf := newBuffer()
57+
buf.start()
58+
q := newDisabledQueue[int64](buf.consume)
59+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
60+
wg := sync.WaitGroup{}
61+
for i := 0; i < 10; i++ {
62+
wg.Add(1)
63+
go func() {
64+
defer wg.Done()
65+
for j := 0; j < 10_000; j++ {
66+
assert.NoError(t, q.Offer(context.Background(), int64(j)))
67+
}
68+
}()
69+
}
70+
wg.Wait()
71+
require.NoError(t, q.Shutdown(context.Background()))
72+
buf.shutdown()
73+
assert.Equal(t, int64(10*10_000), buf.consumed())
74+
}
75+
76+
func BenchmarkDisabledQueueOffer(b *testing.B) {
77+
consumed := &atomic.Int64{}
78+
q := newDisabledQueue[int64](func(_ context.Context, _ int64, done Done) {
79+
consumed.Add(1)
80+
done.OnDone(nil)
81+
})
82+
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
83+
b.ResetTimer()
84+
b.ReportAllocs()
85+
for i := 0; i < b.N; i++ {
86+
require.NoError(b, q.Offer(context.Background(), int64(i)))
87+
}
88+
require.NoError(b, q.Shutdown(context.Background()))
89+
assert.Equal(b, int64(b.N), consumed.Load())
90+
}
91+
92+
const flushNum = 5
93+
94+
type buffer struct {
95+
ch chan Done
96+
nr *atomic.Int64
97+
wg sync.WaitGroup
98+
dones []Done
99+
}
100+
101+
func newBuffer() *buffer {
102+
buf := &buffer{
103+
ch: make(chan Done, 10),
104+
nr: &atomic.Int64{},
105+
dones: make([]Done, 0, flushNum),
106+
}
107+
return buf
108+
}
109+
110+
func (buf *buffer) consume(_ context.Context, _ int64, done Done) {
111+
buf.ch <- done
112+
}
113+
114+
func (buf *buffer) start() {
115+
buf.wg.Add(1)
116+
go func() {
117+
defer buf.wg.Done()
118+
buf.dones = make([]Done, 0, flushNum)
119+
for {
120+
select {
121+
case done, ok := <-buf.ch:
122+
if !ok {
123+
return
124+
}
125+
buf.dones = append(buf.dones, done)
126+
if len(buf.dones) == flushNum {
127+
buf.flush()
128+
}
129+
case <-time.After(10 * time.Millisecond):
130+
buf.flush()
131+
}
132+
}
133+
}()
134+
}
135+
136+
func (buf *buffer) shutdown() {
137+
close(buf.ch)
138+
buf.wg.Wait()
139+
}
140+
141+
func (buf *buffer) flush() {
142+
if len(buf.dones) == 0 {
143+
return
144+
}
145+
buf.nr.Add(int64(len(buf.dones)))
146+
for _, done := range buf.dones {
147+
done.OnDone(nil)
148+
}
149+
buf.dones = buf.dones[:0]
150+
}
151+
152+
func (buf *buffer) consumed() int64 {
153+
return buf.nr.Load()
154+
}

exporter/exporterqueue/queue.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ type Factory[T any] func(context.Context, Settings, Config, ConsumeFunc[T]) Queu
8181
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
8282
func NewMemoryQueueFactory[T any]() Factory[T] {
8383
return func(_ context.Context, _ Settings, cfg Config, consume ConsumeFunc[T]) Queue[T] {
84+
if !cfg.Enabled {
85+
return newDisabledQueue(consume)
86+
}
8487
q := newBoundedMemoryQueue[T](memoryQueueSettings[T]{
8588
sizer: &requestSizer[T]{},
8689
capacity: int64(cfg.QueueSize),
@@ -109,6 +112,9 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P
109112
return NewMemoryQueueFactory[T]()
110113
}
111114
return func(_ context.Context, set Settings, cfg Config, consume ConsumeFunc[T]) Queue[T] {
115+
if !cfg.Enabled {
116+
return newDisabledQueue(consume)
117+
}
112118
q := newPersistentQueue[T](persistentQueueSettings[T]{
113119
sizer: &requestSizer[T]{},
114120
capacity: int64(cfg.QueueSize),

exporter/internal/queue/default_batcher.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"
55

66
import (
77
"context"
8+
"runtime"
89
"sync"
910
"time"
1011

@@ -38,6 +39,10 @@ func newDefaultBatcher(batchCfg exporterbatcher.Config,
3839
exportFunc func(ctx context.Context, req internal.Request) error,
3940
maxWorkers int,
4041
) *defaultBatcher {
42+
// TODO: Determine how to allow configuring this when queue is disabled.
43+
if maxWorkers == 0 {
44+
maxWorkers = runtime.NumCPU()
45+
}
4146
workerPool := make(chan struct{}, maxWorkers)
4247
for i := 0; i < maxWorkers; i++ {
4348
workerPool <- struct{}{}

0 commit comments

Comments
 (0)