Skip to content

Commit 450a4ce

Browse files
authored
[chore] Add ability to configure sizer for default batcher (#12744)
This functionality is not yet exposed to the users, in next PRs will be. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 1439116 commit 450a4ce

File tree

5 files changed

+74
-27
lines changed

5 files changed

+74
-27
lines changed

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,9 @@ type SizeConfig struct {
203203
Sizer request.SizerType `mapstructure:"sizer"`
204204

205205
// MinSize defines the configuration for the minimum size of a batch.
206-
MinSize int `mapstructure:"min_size"`
206+
MinSize int64 `mapstructure:"min_size"`
207207
// MaxSize defines the configuration for the maximum size of a batch.
208-
MaxSize int `mapstructure:"max_size"`
208+
MaxSize int64 `mapstructure:"max_size"`
209209
}
210210

211211
func (c *BatcherConfig) Validate() error {

exporter/exporterhelper/internal/queuebatch/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,10 @@ type BatchConfig struct {
7979
FlushTimeout time.Duration `mapstructure:"flush_timeout"`
8080

8181
// MinSize defines the configuration for the minimum size of a batch.
82-
MinSize int `mapstructure:"min_size"`
82+
MinSize int64 `mapstructure:"min_size"`
8383

8484
// MaxSize defines the configuration for the maximum size of a batch.
85-
MaxSize int `mapstructure:"max_size"`
85+
MaxSize int64 `mapstructure:"max_size"`
8686
}
8787

8888
func (cfg *BatchConfig) Validate() error {

exporter/exporterhelper/internal/queuebatch/default_batcher.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,19 @@ type batch struct {
2121
done multiDone
2222
}
2323

24+
type batcherSettings[K any] struct {
25+
sizerType request.SizerType
26+
sizer request.Sizer[K]
27+
next sender.SendFunc[K]
28+
maxWorkers int
29+
}
30+
2431
// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
2532
type defaultBatcher struct {
26-
batchCfg BatchConfig
33+
cfg BatchConfig
2734
workerPool chan struct{}
35+
sizerType request.SizerType
36+
sizer request.Sizer[request.Request]
2837
consumeFunc sender.SendFunc[request.Request]
2938
stopWG sync.WaitGroup
3039
currentBatchMu sync.Mutex
@@ -33,35 +42,37 @@ type defaultBatcher struct {
3342
shutdownCh chan struct{}
3443
}
3544

36-
func newDefaultBatcher(batchCfg BatchConfig, consumeFunc sender.SendFunc[request.Request], maxWorkers int) *defaultBatcher {
45+
func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *defaultBatcher {
3746
// TODO: Determine what is the right behavior for this in combination with async queue.
3847
var workerPool chan struct{}
39-
if maxWorkers != 0 {
40-
workerPool = make(chan struct{}, maxWorkers)
41-
for i := 0; i < maxWorkers; i++ {
48+
if bSet.maxWorkers != 0 {
49+
workerPool = make(chan struct{}, bSet.maxWorkers)
50+
for i := 0; i < bSet.maxWorkers; i++ {
4251
workerPool <- struct{}{}
4352
}
4453
}
4554
return &defaultBatcher{
46-
batchCfg: batchCfg,
55+
cfg: bCfg,
4756
workerPool: workerPool,
48-
consumeFunc: consumeFunc,
57+
sizerType: bSet.sizerType,
58+
sizer: bSet.sizer,
59+
consumeFunc: bSet.next,
4960
stopWG: sync.WaitGroup{},
5061
shutdownCh: make(chan struct{}, 1),
5162
}
5263
}
5364

5465
func (qb *defaultBatcher) resetTimer() {
55-
if qb.batchCfg.FlushTimeout > 0 {
56-
qb.timer.Reset(qb.batchCfg.FlushTimeout)
66+
if qb.cfg.FlushTimeout > 0 {
67+
qb.timer.Reset(qb.cfg.FlushTimeout)
5768
}
5869
}
5970

6071
func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done Done) {
6172
qb.currentBatchMu.Lock()
6273

6374
if qb.currentBatch == nil {
64-
reqList, mergeSplitErr := req.MergeSplit(ctx, qb.batchCfg.MaxSize, request.SizerTypeItems, nil)
75+
reqList, mergeSplitErr := req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, nil)
6576
if mergeSplitErr != nil || len(reqList) == 0 {
6677
done.OnDone(mergeSplitErr)
6778
qb.currentBatchMu.Unlock()
@@ -76,7 +87,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
7687
// We have at least one result in the reqList. Last in the list may not have enough data to be flushed.
7788
// Find if it has at least MinSize, and if it does then move that as the current batch.
7889
lastReq := reqList[len(reqList)-1]
79-
if lastReq.ItemsCount() < qb.batchCfg.MinSize {
90+
if qb.sizer.Sizeof(lastReq) < qb.cfg.MinSize {
8091
// Do not flush the last item and add it to the current batch.
8192
reqList = reqList[:len(reqList)-1]
8293
qb.currentBatch = &batch{
@@ -95,7 +106,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
95106
return
96107
}
97108

98-
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSize, request.SizerTypeItems, req)
109+
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
99110
// If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch.
100111
if mergeSplitErr != nil || len(reqList) == 0 {
101112
done.OnDone(mergeSplitErr)
@@ -121,7 +132,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
121132
// cannot unlock and re-lock because we are not done processing all the responses.
122133
var firstBatch *batch
123134
// Need to check the currentBatch if more than 1 result returned or if 1 result return but larger than MinSize.
124-
if len(reqList) > 1 || qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSize {
135+
if len(reqList) > 1 || qb.sizer.Sizeof(qb.currentBatch.req) >= qb.cfg.MinSize {
125136
firstBatch = qb.currentBatch
126137
qb.currentBatch = nil
127138
}
@@ -131,7 +142,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
131142
// If we still have results to process, then we need to check if the last result has enough data to flush, or we add it to the currentBatch.
132143
if len(reqList) > 0 {
133144
lastReq := reqList[len(reqList)-1]
134-
if lastReq.ItemsCount() < qb.batchCfg.MinSize {
145+
if qb.sizer.Sizeof(lastReq) < qb.cfg.MinSize {
135146
// Do not flush the last item and add it to the current batch.
136147
reqList = reqList[:len(reqList)-1]
137148
qb.currentBatch = &batch{
@@ -170,8 +181,8 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
170181

171182
// Start starts the goroutine that reads from the queue and flushes asynchronously.
172183
func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error {
173-
if qb.batchCfg.FlushTimeout > 0 {
174-
qb.timer = time.NewTimer(qb.batchCfg.FlushTimeout)
184+
if qb.cfg.FlushTimeout > 0 {
185+
qb.timer = time.NewTimer(qb.cfg.FlushTimeout)
175186
qb.startTimeBasedFlushingGoroutine()
176187
}
177188

exporter/exporterhelper/internal/queuebatch/default_batcher_test.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/stretchr/testify/require"
1616

1717
"go.opentelemetry.io/collector/component/componenttest"
18+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
1920
)
2021

@@ -40,7 +41,12 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
4041
}
4142

4243
sink := requesttest.NewSink()
43-
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
44+
ba := newDefaultBatcher(cfg, batcherSettings[request.Request]{
45+
sizerType: request.SizerTypeItems,
46+
sizer: request.NewItemsSizer(),
47+
next: sink.Export,
48+
maxWorkers: tt.maxWorkers,
49+
})
4450
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
4551
t.Cleanup(func() {
4652
require.NoError(t, ba.Shutdown(context.Background()))
@@ -87,7 +93,12 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
8793
}
8894

8995
sink := requesttest.NewSink()
90-
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
96+
ba := newDefaultBatcher(cfg, batcherSettings[request.Request]{
97+
sizerType: request.SizerTypeItems,
98+
sizer: request.NewItemsSizer(),
99+
next: sink.Export,
100+
maxWorkers: tt.maxWorkers,
101+
})
91102
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
92103

93104
done := newFakeDone()
@@ -149,7 +160,12 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
149160
}
150161

151162
sink := requesttest.NewSink()
152-
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
163+
ba := newDefaultBatcher(cfg, batcherSettings[request.Request]{
164+
sizerType: request.SizerTypeItems,
165+
sizer: request.NewItemsSizer(),
166+
next: sink.Export,
167+
maxWorkers: tt.maxWorkers,
168+
})
153169
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
154170
t.Cleanup(func() {
155171
require.NoError(t, ba.Shutdown(context.Background()))
@@ -202,7 +218,12 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
202218
}
203219

204220
sink := requesttest.NewSink()
205-
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
221+
ba := newDefaultBatcher(cfg, batcherSettings[request.Request]{
222+
sizerType: request.SizerTypeItems,
223+
sizer: request.NewItemsSizer(),
224+
next: sink.Export,
225+
maxWorkers: tt.maxWorkers,
226+
})
206227
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
207228

208229
done := newFakeDone()
@@ -249,7 +270,12 @@ func TestDefaultBatcher_Shutdown(t *testing.T) {
249270
}
250271

251272
sink := requesttest.NewSink()
252-
ba := newDefaultBatcher(cfg, sink.Export, 2)
273+
ba := newDefaultBatcher(cfg, batcherSettings[request.Request]{
274+
sizerType: request.SizerTypeItems,
275+
sizer: request.NewItemsSizer(),
276+
next: sink.Export,
277+
maxWorkers: 2,
278+
})
253279
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
254280

255281
done := newFakeDone()
@@ -277,7 +303,12 @@ func TestDefaultBatcher_MergeError(t *testing.T) {
277303
}
278304

279305
sink := requesttest.NewSink()
280-
ba := newDefaultBatcher(cfg, sink.Export, 2)
306+
ba := newDefaultBatcher(cfg, batcherSettings[request.Request]{
307+
sizerType: request.SizerTypeItems,
308+
sizer: request.NewItemsSizer(),
309+
next: sink.Export,
310+
maxWorkers: 2,
311+
})
281312

282313
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
283314
t.Cleanup(func() {

exporter/exporterhelper/internal/queuebatch/queue_batch.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ func NewQueueBatch(
4040
default:
4141
// TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244
4242
cfg.NumConsumers = 1
43-
b = newDefaultBatcher(*cfg.Batch, next, cfg.NumConsumers)
43+
b = newDefaultBatcher(*cfg.Batch, batcherSettings[request.Request]{
44+
sizerType: request.SizerTypeItems,
45+
sizer: request.NewItemsSizer(),
46+
next: next,
47+
maxWorkers: cfg.NumConsumers,
48+
})
4449
}
4550

4651
var q Queue[request.Request]

0 commit comments

Comments
 (0)