Skip to content

Commit b7ad21c

Browse files
committed
[chore] Decouple single vs multi partition batcher
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 56433bc commit b7ad21c

File tree

6 files changed

+104
-101
lines changed

6 files changed

+104
-101
lines changed

exporter/exporterhelper/internal/queuebatch/batcher.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,32 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1012
)
1113

1214
// Batcher is in charge of reading items from the queue and send them out asynchronously.
1315
type Batcher[T any] interface {
1416
component.Component
1517
Consume(context.Context, T, Done)
1618
}
19+
20+
type batcherSettings[T any] struct {
21+
sizerType request.SizerType
22+
sizer request.Sizer[T]
23+
partitioner Partitioner[T]
24+
next sender.SendFunc[T]
25+
maxWorkers int
26+
}
27+
28+
func NewBatcher(cfg *BatchConfig, set batcherSettings[request.Request]) Batcher[request.Request] {
29+
if cfg == nil {
30+
return newDisabledBatcher[request.Request](set.next)
31+
}
32+
33+
if set.partitioner == nil {
34+
return newPartitionBatcher(*cfg, set.sizerType, set.sizer, newWorkerPool(set.maxWorkers), set.next)
35+
}
36+
37+
return newMultiBatcher(*cfg, set.sizerType, set.sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.next)
38+
}

exporter/exporterhelper/internal/queuebatch/multi_batcher.go

Lines changed: 23 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,7 @@ import (
1111
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1212
)
1313

14-
type batcherSettings[T any] struct {
15-
sizerType request.SizerType
16-
sizer request.Sizer[T]
17-
partitioner Partitioner[T]
18-
next sender.SendFunc[T]
19-
maxWorkers int
20-
}
14+
var _ Batcher[request.Request] = (*multiBatcher)(nil)
2115

2216
type multiBatcher struct {
2317
cfg BatchConfig
@@ -26,73 +20,60 @@ type multiBatcher struct {
2620
sizer request.Sizer[request.Request]
2721
partitioner Partitioner[request.Request]
2822
consumeFunc sender.SendFunc[request.Request]
29-
30-
singleShard *shardBatcher
3123
shards sync.Map
3224
}
3325

34-
var _ Batcher[request.Request] = (*multiBatcher)(nil)
35-
36-
func newMultiBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *multiBatcher {
37-
mb := &multiBatcher{
26+
func newMultiBatcher(
27+
bCfg BatchConfig,
28+
sizerType request.SizerType,
29+
sizer request.Sizer[request.Request],
30+
wp *workerPool,
31+
partitioner Partitioner[request.Request],
32+
next sender.SendFunc[request.Request],
33+
) *multiBatcher {
34+
return &multiBatcher{
3835
cfg: bCfg,
39-
wp: newWorkerPool(bSet.maxWorkers),
40-
sizerType: bSet.sizerType,
41-
sizer: bSet.sizer,
42-
partitioner: bSet.partitioner,
43-
consumeFunc: bSet.next,
44-
}
45-
46-
if bSet.partitioner == nil {
47-
mb.singleShard = newShard(mb.cfg, mb.sizerType, mb.sizer, mb.wp, mb.consumeFunc)
36+
wp: wp,
37+
sizerType: sizerType,
38+
sizer: sizer,
39+
partitioner: partitioner,
40+
consumeFunc: next,
4841
}
49-
return mb
5042
}
5143

52-
func (mb *multiBatcher) getShard(ctx context.Context, req request.Request) *shardBatcher {
53-
if mb.singleShard != nil {
54-
return mb.singleShard
55-
}
56-
44+
func (mb *multiBatcher) getPartition(ctx context.Context, req request.Request) *partitionBatcher {
5745
key := mb.partitioner.GetKey(ctx, req)
58-
// Fast path, shard already created.
5946
s, found := mb.shards.Load(key)
47+
// Fast path, shard already created.
6048
if found {
61-
return s.(*shardBatcher)
49+
return s.(*partitionBatcher)
6250
}
63-
newS := newShard(mb.cfg, mb.sizerType, mb.sizer, mb.wp, mb.consumeFunc)
51+
newS := newPartitionBatcher(mb.cfg, mb.sizerType, mb.sizer, mb.wp, mb.consumeFunc)
6452
_ = newS.Start(ctx, nil)
6553
s, loaded := mb.shards.LoadOrStore(key, newS)
6654
// If not loaded, there was a race condition in adding the new shard. Shutdown the newly created shard.
6755
if loaded {
6856
_ = newS.Shutdown(ctx)
6957
}
70-
return s.(*shardBatcher)
58+
return s.(*partitionBatcher)
7159
}
7260

73-
func (mb *multiBatcher) Start(ctx context.Context, host component.Host) error {
74-
if mb.singleShard != nil {
75-
return mb.singleShard.Start(ctx, host)
76-
}
61+
func (mb *multiBatcher) Start(context.Context, component.Host) error {
7762
return nil
7863
}
7964

8065
func (mb *multiBatcher) Consume(ctx context.Context, req request.Request, done Done) {
81-
shard := mb.getShard(ctx, req)
66+
shard := mb.getPartition(ctx, req)
8267
shard.Consume(ctx, req, done)
8368
}
8469

8570
func (mb *multiBatcher) Shutdown(ctx context.Context) error {
86-
if mb.singleShard != nil {
87-
return mb.singleShard.Shutdown(ctx)
88-
}
89-
9071
var wg sync.WaitGroup
9172
mb.shards.Range(func(_ any, shard any) bool {
9273
wg.Add(1)
9374
go func() {
9475
defer wg.Done()
95-
_ = shard.(*shardBatcher).Shutdown(ctx)
76+
_ = shard.(*partitionBatcher).Shutdown(ctx)
9677
}()
9778
return true
9879
})

exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ func TestMultiBatcher_NoTimeout(t *testing.T) {
2525

2626
type partitionKey struct{}
2727

28-
ba := newMultiBatcher(cfg, batcherSettings[request.Request]{
29-
sizerType: request.SizerTypeItems,
30-
sizer: request.NewItemsSizer(),
31-
partitioner: NewPartitioner(func(ctx context.Context, _ request.Request) string {
28+
ba := newMultiBatcher(cfg,
29+
request.SizerTypeItems,
30+
request.NewItemsSizer(),
31+
newWorkerPool(1),
32+
NewPartitioner(func(ctx context.Context, _ request.Request) string {
3233
return ctx.Value(partitionKey{}).(string)
3334
}),
34-
next: sink.Export,
35-
maxWorkers: 1,
36-
})
35+
sink.Export,
36+
)
3737

3838
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
3939
t.Cleanup(func() {
@@ -76,15 +76,15 @@ func TestMultiBatcher_Timeout(t *testing.T) {
7676

7777
type partitionKey struct{}
7878

79-
ba := newMultiBatcher(cfg, batcherSettings[request.Request]{
80-
sizerType: request.SizerTypeItems,
81-
sizer: request.NewItemsSizer(),
82-
partitioner: NewPartitioner(func(ctx context.Context, _ request.Request) string {
79+
ba := newMultiBatcher(cfg,
80+
request.SizerTypeItems,
81+
request.NewItemsSizer(),
82+
newWorkerPool(1),
83+
NewPartitioner(func(ctx context.Context, _ request.Request) string {
8384
return ctx.Value(partitionKey{}).(string)
8485
}),
85-
next: sink.Export,
86-
maxWorkers: 1,
87-
})
86+
sink.Export,
87+
)
8888

8989
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
9090
t.Cleanup(func() {

exporter/exporterhelper/internal/queuebatch/shard_batcher.go renamed to exporter/exporterhelper/internal/queuebatch/partition_batcher.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@ import (
1515
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1616
)
1717

18+
var _ Batcher[request.Request] = (*partitionBatcher)(nil)
19+
1820
type batch struct {
1921
ctx context.Context
2022
req request.Request
2123
done multiDone
2224
}
2325

24-
// shardBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
25-
type shardBatcher struct {
26+
// partitionBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
27+
type partitionBatcher struct {
2628
cfg BatchConfig
2729
wp *workerPool
2830
sizerType request.SizerType
@@ -35,8 +37,14 @@ type shardBatcher struct {
3537
shutdownCh chan struct{}
3638
}
3739

38-
func newShard(cfg BatchConfig, sizerType request.SizerType, sizer request.Sizer[request.Request], wp *workerPool, next sender.SendFunc[request.Request]) *shardBatcher {
39-
return &shardBatcher{
40+
func newPartitionBatcher(
41+
cfg BatchConfig,
42+
sizerType request.SizerType,
43+
sizer request.Sizer[request.Request],
44+
wp *workerPool,
45+
next sender.SendFunc[request.Request],
46+
) *partitionBatcher {
47+
return &partitionBatcher{
4048
cfg: cfg,
4149
wp: wp,
4250
sizerType: sizerType,
@@ -46,13 +54,13 @@ func newShard(cfg BatchConfig, sizerType request.SizerType, sizer request.Sizer[
4654
}
4755
}
4856

49-
func (qb *shardBatcher) resetTimer() {
57+
func (qb *partitionBatcher) resetTimer() {
5058
if qb.cfg.FlushTimeout > 0 {
5159
qb.timer.Reset(qb.cfg.FlushTimeout)
5260
}
5361
}
5462

55-
func (qb *shardBatcher) Consume(ctx context.Context, req request.Request, done Done) {
63+
func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, done Done) {
5664
qb.currentBatchMu.Lock()
5765

5866
if qb.currentBatch == nil {
@@ -149,7 +157,7 @@ func (qb *shardBatcher) Consume(ctx context.Context, req request.Request, done D
149157
}
150158

151159
// Start starts the goroutine that reads from the queue and flushes asynchronously.
152-
func (qb *shardBatcher) Start(context.Context, component.Host) error {
160+
func (qb *partitionBatcher) Start(context.Context, component.Host) error {
153161
if qb.cfg.FlushTimeout <= 0 {
154162
return nil
155163
}
@@ -170,7 +178,7 @@ func (qb *shardBatcher) Start(context.Context, component.Host) error {
170178
}
171179

172180
// Shutdown ensures that queue and all Batcher are stopped.
173-
func (qb *shardBatcher) Shutdown(context.Context) error {
181+
func (qb *partitionBatcher) Shutdown(context.Context) error {
174182
close(qb.shutdownCh)
175183
// Make sure execute one last flush if necessary.
176184
qb.flushCurrentBatchIfNecessary()
@@ -179,7 +187,7 @@ func (qb *shardBatcher) Shutdown(context.Context) error {
179187
}
180188

181189
// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
182-
func (qb *shardBatcher) flushCurrentBatchIfNecessary() {
190+
func (qb *partitionBatcher) flushCurrentBatchIfNecessary() {
183191
qb.currentBatchMu.Lock()
184192
if qb.currentBatch == nil {
185193
qb.currentBatchMu.Unlock()
@@ -195,7 +203,7 @@ func (qb *shardBatcher) flushCurrentBatchIfNecessary() {
195203
}
196204

197205
// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
198-
func (qb *shardBatcher) flush(ctx context.Context, req request.Request, done Done) {
206+
func (qb *partitionBatcher) flush(ctx context.Context, req request.Request, done Done) {
199207
qb.stopWG.Add(1)
200208
qb.wp.execute(func() {
201209
defer qb.stopWG.Done()

exporter/exporterhelper/internal/queuebatch/shard_batcher_test.go renamed to exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2020
)
2121

22-
func TestShardBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
22+
func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
2323
tests := []struct {
2424
name string
2525
sizerType request.SizerType
@@ -59,7 +59,7 @@ func TestShardBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
5959
}
6060

6161
sink := requesttest.NewSink()
62-
ba := newShard(cfg, tt.sizerType, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
62+
ba := newPartitionBatcher(cfg, tt.sizerType, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
6363
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
6464
t.Cleanup(func() {
6565
require.NoError(t, ba.Shutdown(context.Background()))
@@ -84,7 +84,7 @@ func TestShardBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
8484
}
8585
}
8686

87-
func TestShardBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
87+
func TestPartitionBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
8888
tests := []struct {
8989
name string
9090
sizerType request.SizerType
@@ -124,7 +124,7 @@ func TestShardBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
124124
}
125125

126126
sink := requesttest.NewSink()
127-
ba := newShard(cfg, tt.sizerType, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
127+
ba := newPartitionBatcher(cfg, tt.sizerType, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
128128
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
129129

130130
done := newFakeDone()
@@ -160,7 +160,7 @@ func TestShardBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
160160
}
161161
}
162162

163-
func TestShardBatcher_NoSplit_WithTimeout(t *testing.T) {
163+
func TestPartitionBatcher_NoSplit_WithTimeout(t *testing.T) {
164164
if runtime.GOOS == "windows" {
165165
t.Skip("Skipping test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/11869")
166166
}
@@ -204,7 +204,7 @@ func TestShardBatcher_NoSplit_WithTimeout(t *testing.T) {
204204
}
205205

206206
sink := requesttest.NewSink()
207-
ba := newShard(cfg, tt.sizerType, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
207+
ba := newPartitionBatcher(cfg, tt.sizerType, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
208208
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
209209
t.Cleanup(func() {
210210
require.NoError(t, ba.Shutdown(context.Background()))
@@ -230,7 +230,7 @@ func TestShardBatcher_NoSplit_WithTimeout(t *testing.T) {
230230
}
231231
}
232232

233-
func TestShardBatcher_Split_TimeoutDisabled(t *testing.T) {
233+
func TestPartitionBatcher_Split_TimeoutDisabled(t *testing.T) {
234234
if runtime.GOOS == "windows" {
235235
t.Skip("Skipping test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/11847")
236236
}
@@ -275,7 +275,7 @@ func TestShardBatcher_Split_TimeoutDisabled(t *testing.T) {
275275
}
276276

277277
sink := requesttest.NewSink()
278-
ba := newShard(cfg, tt.sizerType, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
278+
ba := newPartitionBatcher(cfg, tt.sizerType, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
279279
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
280280

281281
done := newFakeDone()
@@ -315,14 +315,14 @@ func TestShardBatcher_Split_TimeoutDisabled(t *testing.T) {
315315
}
316316
}
317317

318-
func TestShardBatcher_Shutdown(t *testing.T) {
318+
func TestPartitionBatcher_Shutdown(t *testing.T) {
319319
cfg := BatchConfig{
320320
FlushTimeout: 100 * time.Second,
321321
MinSize: 10,
322322
}
323323

324324
sink := requesttest.NewSink()
325-
ba := newShard(cfg, request.SizerTypeItems, request.NewItemsSizer(), newWorkerPool(2), sink.Export)
325+
ba := newPartitionBatcher(cfg, request.SizerTypeItems, request.NewItemsSizer(), newWorkerPool(2), sink.Export)
326326
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
327327

328328
done := newFakeDone()
@@ -342,15 +342,15 @@ func TestShardBatcher_Shutdown(t *testing.T) {
342342
assert.EqualValues(t, 2, done.success.Load())
343343
}
344344

345-
func TestShardBatcher_MergeError(t *testing.T) {
345+
func TestPartitionBatcher_MergeError(t *testing.T) {
346346
cfg := BatchConfig{
347347
FlushTimeout: 200 * time.Second,
348348
MinSize: 5,
349349
MaxSize: 7,
350350
}
351351

352352
sink := requesttest.NewSink()
353-
ba := newShard(cfg, request.SizerTypeItems, request.NewItemsSizer(), newWorkerPool(2), sink.Export)
353+
ba := newPartitionBatcher(cfg, request.SizerTypeItems, request.NewItemsSizer(), newWorkerPool(2), sink.Export)
354354
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
355355
t.Cleanup(func() {
356356
require.NoError(t, ba.Shutdown(context.Background()))

0 commit comments

Comments
 (0)