Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions exporter/exporterhelper/internal/queuebatch/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
)

type batch struct {
ctx context.Context
req request.Request
done multiDone
ctx context.Context
req request.Request
done multiDone
created time.Time
}

// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
Expand All @@ -30,7 +31,7 @@ type defaultBatcher struct {
stopWG sync.WaitGroup
currentBatchMu sync.Mutex
currentBatch *batch
timer *time.Timer
ticker *time.Ticker
shutdownCh chan struct{}
}

Expand All @@ -52,12 +53,6 @@ func newDefaultBatcher(batchCfg BatchConfig, consumeFunc sender.SendFunc[request
}
}

func (qb *defaultBatcher) resetTimer() {
if qb.batchCfg.FlushTimeout > 0 {
qb.timer.Reset(qb.batchCfg.FlushTimeout)
}
}

func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done Done) {
qb.currentBatchMu.Lock()

Expand All @@ -81,11 +76,11 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
req: lastReq,
done: multiDone{done},
ctx: ctx,
req: lastReq,
done: multiDone{done},
created: time.Now(),
}
qb.resetTimer()
}

qb.currentBatchMu.Unlock()
Expand Down Expand Up @@ -136,11 +131,11 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
req: lastReq,
done: multiDone{done},
ctx: ctx,
req: lastReq,
done: multiDone{done},
created: time.Now(),
}
qb.resetTimer()
}
}

Expand All @@ -162,8 +157,8 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
select {
case <-qb.shutdownCh:
return
case <-qb.timer.C:
qb.flushCurrentBatchIfNecessary()
case <-qb.ticker.C:
qb.flushCurrentBatchIfNecessary(false)
}
}
}()
Expand All @@ -172,27 +167,30 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error {
if qb.batchCfg.FlushTimeout > 0 {
qb.timer = time.NewTimer(qb.batchCfg.FlushTimeout)
qb.ticker = time.NewTicker(qb.batchCfg.FlushTimeout)
qb.startTimeBasedFlushingGoroutine()
}

return nil
}

// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
func (qb *defaultBatcher) flushCurrentBatchIfNecessary() {
func (qb *defaultBatcher) flushCurrentBatchIfNecessary(forceFlush bool) {
qb.currentBatchMu.Lock()
if qb.currentBatch == nil {
qb.currentBatchMu.Unlock()
return
}
if !forceFlush && time.Since(qb.currentBatch.created) < qb.batchCfg.FlushTimeout {
qb.currentBatchMu.Unlock()
return
}
batchToFlush := qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flush() blocks until successfully started a goroutine for flushing.
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
qb.resetTimer()
}

// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
Expand All @@ -214,7 +212,7 @@ func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done D
func (qb *defaultBatcher) Shutdown(_ context.Context) error {
close(qb.shutdownCh)
// Make sure execute one last flush if necessary.
qb.flushCurrentBatchIfNecessary()
qb.flushCurrentBatchIfNecessary(true)
qb.stopWG.Wait()
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,8 @@ func TestQueueBatchTimerFlush(t *testing.T) {
assert.LessOrEqual(t, 1, sink.RequestsCount())
assert.Equal(t, 8, sink.ItemsCount())

// Confirm that it is flushed after 100ms (using 60+50=110 here to be safe)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to replace this by assert.Eventually?

time.Sleep(50 * time.Millisecond)
// Confirm that it is flushed after 100ms (using 100+50=150 here to be safe)
time.Sleep(100 * time.Millisecond)
assert.LessOrEqual(t, 2, sink.RequestsCount())
assert.Equal(t, 12, sink.ItemsCount())
require.NoError(t, qb.Shutdown(context.Background()))
Expand Down
Loading