Skip to content

Commit e9ae31f

Browse files
committed
Multi batch support
1 parent ebd2a9d commit e9ae31f

File tree

5 files changed

+174
-49
lines changed

5 files changed

+174
-49
lines changed

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type BaseExporter struct {
5050
queueBatchSettings QueueBatchSettings[request.Request]
5151
queueCfg QueueConfig
5252
batcherCfg exporterbatcher.Config
53+
keyFunc queuebatch.KeyFunc
5354
}
5455

5556
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sender.SendFunc[request.Request], options ...Option) (*BaseExporter, error) {
@@ -244,6 +245,15 @@ func WithBatcher(cfg exporterbatcher.Config) Option {
244245
}
245246
}
246247

248+
// WithMultiBatcher is same WithBatcher except that it allows key-based batching.
249+
func WithMultiBatcher(cfg exporterbatcher.Config, keyFunc queuebatch.KeyFunc) Option {
250+
return func(o *BaseExporter) error {
251+
o.batcherCfg = cfg
252+
o.keyFunc = keyFunc
253+
return nil
254+
}
255+
}
256+
247257
// WithQueueBatchSettings is used to set the QueueBatchSettings for the new request based exporter helper.
248258
// It must be provided as the first option when creating a new exporter helper.
249259
func WithQueueBatchSettings(set QueueBatchSettings[request.Request]) Option {
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import (
7+
"context"
8+
"sync"
9+
"time"
10+
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
12+
)
13+
14+
type batch struct {
15+
ctx context.Context
16+
req request.Request
17+
done multiDone
18+
created time.Time
19+
}
20+
21+
type batchEntry struct {
22+
*batch
23+
mu sync.Mutex
24+
}
25+
26+
func newBatchEntry() *batchEntry {
27+
return &batchEntry{
28+
nil,
29+
sync.Mutex{},
30+
}
31+
}
32+
33+
type batchManager interface {
34+
getBatch(ctx context.Context, req request.Request) *batchEntry
35+
forEachBatch(func(*batchEntry))
36+
}
37+
38+
func newBatchManager(keyFunc KeyFunc) batchManager {
39+
if keyFunc == nil {
40+
return &singleBatchManager{
41+
batch: newBatchEntry(),
42+
}
43+
}
44+
return &multiBatchManager{
45+
batchMap: make(map[string]*batchEntry),
46+
keyFunc: keyFunc,
47+
}
48+
}
49+
50+
type singleBatchManager struct {
51+
batch *batchEntry
52+
}
53+
54+
func (bm *singleBatchManager) getBatch(_ context.Context, _ request.Request) *batchEntry {
55+
return bm.batch
56+
}
57+
58+
func (bm *singleBatchManager) forEachBatch(callback func(*batchEntry)) {
59+
callback(bm.batch)
60+
}
61+
62+
type multiBatchManager struct {
63+
batchMap map[string]*batchEntry
64+
mu sync.RWMutex
65+
keyFunc KeyFunc
66+
}
67+
68+
func (bm *multiBatchManager) forEachBatch(callback func(*batchEntry)) {
69+
bm.mu.RLock()
70+
for _, batchEntry := range bm.batchMap {
71+
callback(batchEntry)
72+
}
73+
bm.mu.RUnlock()
74+
}
75+
76+
func (bm *multiBatchManager) getBatch(ctx context.Context, req request.Request) *batchEntry {
77+
key := bm.keyFunc(ctx, req)
78+
79+
bm.mu.RLock()
80+
batchEntry, ok := bm.batchMap[key]
81+
bm.mu.RUnlock()
82+
if ok {
83+
return batchEntry
84+
}
85+
bm.mu.Lock()
86+
batchEntry = newBatchEntry()
87+
bm.batchMap[key] = batchEntry
88+
bm.mu.Unlock()
89+
return batchEntry
90+
}

exporter/exporterhelper/internal/queuebatch/default_batcher.go

Lines changed: 58 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,31 @@ import (
1616
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1717
)
1818

19-
type batch struct {
20-
ctx context.Context
21-
req request.Request
22-
done multiDone
23-
created time.Time
24-
}
25-
2619
// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
2720
type defaultBatcher struct {
28-
batchCfg BatchConfig
29-
workerPool chan struct{}
30-
consumeFunc sender.SendFunc[request.Request]
31-
stopWG sync.WaitGroup
32-
currentBatchMu sync.Mutex
33-
currentBatch *batch
34-
ticker *time.Ticker
35-
shutdownCh chan struct{}
21+
batchCfg BatchConfig
22+
workerPool chan struct{}
23+
consumeFunc sender.SendFunc[request.Request]
24+
batchManager batchManager
25+
stopWG sync.WaitGroup
26+
ticker *time.Ticker
27+
shutdownCh chan struct{}
28+
}
29+
30+
func newDefaultBatcher(
31+
batchCfg BatchConfig,
32+
consumeFunc sender.SendFunc[request.Request],
33+
maxWorkers int,
34+
) *defaultBatcher {
35+
return newDefaultBatcherWithKeyFunc(batchCfg, nil, consumeFunc, maxWorkers)
3636
}
3737

38-
func newDefaultBatcher(batchCfg BatchConfig, consumeFunc sender.SendFunc[request.Request], maxWorkers int) *defaultBatcher {
38+
func newDefaultBatcherWithKeyFunc(
39+
batchCfg BatchConfig,
40+
keyFunc KeyFunc,
41+
consumeFunc sender.SendFunc[request.Request],
42+
maxWorkers int,
43+
) *defaultBatcher {
3944
// TODO: Determine what is the right behavior for this in combination with async queue.
4045
var workerPool chan struct{}
4146
if maxWorkers != 0 {
@@ -45,22 +50,24 @@ func newDefaultBatcher(batchCfg BatchConfig, consumeFunc sender.SendFunc[request
4550
}
4651
}
4752
return &defaultBatcher{
48-
batchCfg: batchCfg,
49-
workerPool: workerPool,
50-
consumeFunc: consumeFunc,
51-
stopWG: sync.WaitGroup{},
52-
shutdownCh: make(chan struct{}, 1),
53+
batchCfg: batchCfg,
54+
workerPool: workerPool,
55+
consumeFunc: consumeFunc,
56+
batchManager: newBatchManager(keyFunc),
57+
stopWG: sync.WaitGroup{},
58+
shutdownCh: make(chan struct{}, 1),
5359
}
5460
}
5561

5662
func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done Done) {
57-
qb.currentBatchMu.Lock()
63+
batchEntry := qb.batchManager.getBatch(ctx, req)
64+
batchEntry.mu.Lock()
5865

59-
if qb.currentBatch == nil {
66+
if batchEntry.batch == nil {
6067
reqList, mergeSplitErr := req.MergeSplit(ctx, qb.batchCfg.MaxSize, exporterbatcher.SizerTypeItems, nil)
6168
if mergeSplitErr != nil || len(reqList) == 0 {
6269
done.OnDone(mergeSplitErr)
63-
qb.currentBatchMu.Unlock()
70+
batchEntry.mu.Unlock()
6471
return
6572
}
6673

@@ -75,27 +82,27 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
7582
if lastReq.ItemsCount() < qb.batchCfg.MinSize {
7683
// Do not flush the last item and add it to the current batch.
7784
reqList = reqList[:len(reqList)-1]
78-
qb.currentBatch = &batch{
85+
batchEntry.batch = &batch{
7986
ctx: ctx,
8087
req: lastReq,
8188
done: multiDone{done},
8289
created: time.Now(),
8390
}
8491
}
8592

86-
qb.currentBatchMu.Unlock()
93+
batchEntry.mu.Unlock()
8794
for i := 0; i < len(reqList); i++ {
8895
qb.flush(ctx, reqList[i], done)
8996
}
9097

9198
return
9299
}
93100

94-
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSize, exporterbatcher.SizerTypeItems, req)
101+
reqList, mergeSplitErr := batchEntry.req.MergeSplit(ctx, qb.batchCfg.MaxSize, exporterbatcher.SizerTypeItems, req)
95102
// If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch.
96103
if mergeSplitErr != nil || len(reqList) == 0 {
97104
done.OnDone(mergeSplitErr)
98-
qb.currentBatchMu.Unlock()
105+
batchEntry.mu.Unlock()
99106
return
100107
}
101108

@@ -111,15 +118,15 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
111118

112119
// Logic on how to deal with the current batch:
113120
// TODO: Deal with merging Context.
114-
qb.currentBatch.req = reqList[0]
115-
qb.currentBatch.done = append(qb.currentBatch.done, done)
121+
batchEntry.req = reqList[0]
122+
batchEntry.done = append(batchEntry.done, done)
116123
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
117124
// cannot unlock and re-lock because we are not done processing all the responses.
118125
var firstBatch *batch
119126
// Need to check the currentBatch if more than 1 result returned or if 1 result return but larger than MinSize.
120-
if len(reqList) > 1 || qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSize {
121-
firstBatch = qb.currentBatch
122-
qb.currentBatch = nil
127+
if len(reqList) > 1 || batchEntry.req.ItemsCount() >= qb.batchCfg.MinSize {
128+
firstBatch = batchEntry.batch
129+
batchEntry.batch = nil
123130
}
124131
// At this moment we dealt with the first result which is iter in the currentBatch or in the `firstBatch` we will flush.
125132
reqList = reqList[1:]
@@ -130,7 +137,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
130137
if lastReq.ItemsCount() < qb.batchCfg.MinSize {
131138
// Do not flush the last item and add it to the current batch.
132139
reqList = reqList[:len(reqList)-1]
133-
qb.currentBatch = &batch{
140+
batchEntry.batch = &batch{
134141
ctx: ctx,
135142
req: lastReq,
136143
done: multiDone{done},
@@ -139,7 +146,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
139146
}
140147
}
141148

142-
qb.currentBatchMu.Unlock()
149+
batchEntry.mu.Unlock()
143150
if firstBatch != nil {
144151
qb.flush(firstBatch.ctx, firstBatch.req, firstBatch.done)
145152
}
@@ -176,21 +183,23 @@ func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error {
176183

177184
// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
178185
func (qb *defaultBatcher) flushCurrentBatchIfNecessary(forceFlush bool) {
179-
qb.currentBatchMu.Lock()
180-
if qb.currentBatch == nil {
181-
qb.currentBatchMu.Unlock()
182-
return
183-
}
184-
if !forceFlush && time.Since(qb.currentBatch.created) < qb.batchCfg.FlushTimeout {
185-
qb.currentBatchMu.Unlock()
186-
return
187-
}
188-
batchToFlush := qb.currentBatch
189-
qb.currentBatch = nil
190-
qb.currentBatchMu.Unlock()
186+
qb.batchManager.forEachBatch(func(batchEntry *batchEntry) {
187+
batchEntry.mu.Lock()
188+
if batchEntry.batch == nil {
189+
batchEntry.mu.Unlock()
190+
return
191+
}
192+
if !forceFlush && time.Since(batchEntry.created) < qb.batchCfg.FlushTimeout {
193+
batchEntry.mu.Unlock()
194+
return
195+
}
196+
batchToFlush := batchEntry.batch
197+
batchEntry.batch = nil
198+
batchEntry.mu.Unlock()
191199

192-
// flush() blocks until successfully started a goroutine for flushing.
193-
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
200+
// flush() blocks until successfully started a goroutine for flushing.
201+
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
202+
})
194203
}
195204

196205
// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
10+
)
11+
12+
type KeyFunc func(ctx context.Context, req request.Request) string

exporter/exporterhelper/queue_batch.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ func WithBatcher(cfg exporterbatcher.Config) Option {
3333
return internal.WithBatcher(cfg)
3434
}
3535

36+
func WithMultiBatcher(cfg exporterbatcher.Config, keyFunc queuebatch.KeyFunc) Option {
37+
return internal.WithMultiBatcher(cfg, keyFunc)
38+
}
39+
3640
// QueueBatchConfig defines configuration for queueing and batching for the exporter.
3741
type QueueBatchConfig = internal.QueueConfig
3842

0 commit comments

Comments
 (0)