Skip to content

Commit 9c3481b

Browse files
dmitryaxcarsonip
andauthored
[exporterhelper] Fix potential deadlock in the batch sender (#10315)
Concurrent handling of the flush timeouts can run into a deadlock when a batch is simultaneously sent by reaching the minimum size and flush timeout. The deadlock can happen on the following lines: - https://github.com/open-telemetry/opentelemetry-collector/blob/115bc8e28e009ca93565dc4deb4cf6608fa63622/exporter/exporterhelper/batch_sender.go#L131 - https://github.com/open-telemetry/opentelemetry-collector/blob/115bc8e28e009ca93565dc4deb4cf6608fa63622/exporter/exporterhelper/batch_sender.go#L87 Co-authored-by: Carson Ip <[email protected]>
1 parent 1e44a9c commit 9c3481b

File tree

3 files changed

+114
-18
lines changed

3 files changed

+114
-18
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix potential deadlock in the batch sender
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [10315]
14+
15+
# Optional: The change log or logs in which this entry should be included.
16+
# e.g. '[user]' or '[user, api]'
17+
# Include 'user' if the change is relevant to end users.
18+
# Include 'api' if there is a change to a library API.
19+
change_logs: [user]

exporter/exporterhelper/batch_sender.go

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ type batchSender struct {
3333
concurrencyLimit uint64
3434
activeRequests atomic.Uint64
3535

36-
resetTimerCh chan struct{}
37-
3836
mu sync.Mutex
3937
activeBatch *batch
38+
lastFlushed time.Time
4039

4140
logger *zap.Logger
4241

@@ -57,7 +56,6 @@ func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings,
5756
shutdownCh: nil,
5857
shutdownCompleteCh: make(chan struct{}),
5958
stopped: &atomic.Bool{},
60-
resetTimerCh: make(chan struct{}),
6159
}
6260
return bs
6361
}
@@ -85,16 +83,17 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
8583
return
8684
case <-timer.C:
8785
bs.mu.Lock()
86+
nextFlush := bs.cfg.FlushTimeout
8887
if bs.activeBatch.request != nil {
89-
bs.exportActiveBatch()
88+
sinceLastFlush := time.Since(bs.lastFlushed)
89+
if sinceLastFlush >= bs.cfg.FlushTimeout {
90+
bs.exportActiveBatch()
91+
} else {
92+
nextFlush = bs.cfg.FlushTimeout - sinceLastFlush
93+
}
9094
}
9195
bs.mu.Unlock()
92-
timer.Reset(bs.cfg.FlushTimeout)
93-
case <-bs.resetTimerCh:
94-
if !timer.Stop() {
95-
<-timer.C
96-
}
97-
timer.Reset(bs.cfg.FlushTimeout)
96+
timer.Reset(nextFlush)
9897
}
9998
}
10099
}()
@@ -123,15 +122,10 @@ func (bs *batchSender) exportActiveBatch() {
123122
b.err = bs.nextSender.send(b.ctx, b.request)
124123
close(b.done)
125124
}(bs.activeBatch)
125+
bs.lastFlushed = time.Now()
126126
bs.activeBatch = newEmptyBatch()
127127
}
128128

129-
func (bs *batchSender) resetTimer() {
130-
if !bs.stopped.Load() {
131-
bs.resetTimerCh <- struct{}{}
132-
}
133-
}
134-
135129
// isActiveBatchReady returns true if the active batch is ready to be exported.
136130
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
137131
// Caller must hold the lock.
@@ -168,7 +162,6 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err
168162
batch := bs.activeBatch
169163
if bs.isActiveBatchReady() || len(reqs) > 1 {
170164
bs.exportActiveBatch()
171-
bs.resetTimer()
172165
}
173166
bs.mu.Unlock()
174167
<-batch.done
@@ -208,7 +201,6 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
208201
batch := bs.activeBatch
209202
if bs.isActiveBatchReady() {
210203
bs.exportActiveBatch()
211-
bs.resetTimer()
212204
}
213205
bs.mu.Unlock()
214206
<-batch.done

exporter/exporterhelper/batch_sender_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,91 @@ func TestBatchSenderWithTimeout(t *testing.T) {
535535
assert.EqualValues(t, 12, sink.itemsCount.Load())
536536
}
537537

538+
func TestBatchSenderTimerResetNoConflict(t *testing.T) {
539+
delayBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) {
540+
time.Sleep(30 * time.Millisecond)
541+
if r1 == nil {
542+
return r2, nil
543+
}
544+
fr1 := r1.(*fakeRequest)
545+
fr2 := r2.(*fakeRequest)
546+
if fr2.mergeErr != nil {
547+
return nil, fr2.mergeErr
548+
}
549+
return &fakeRequest{
550+
items: fr1.items + fr2.items,
551+
sink: fr1.sink,
552+
exportErr: fr2.exportErr,
553+
delay: fr1.delay + fr2.delay,
554+
}, nil
555+
}
556+
bCfg := exporterbatcher.NewDefaultConfig()
557+
bCfg.MinSizeItems = 8
558+
bCfg.FlushTimeout = 50 * time.Millisecond
559+
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
560+
WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc)))
561+
require.NoError(t, err)
562+
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
563+
sink := newFakeRequestSink()
564+
565+
// Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer
566+
go func() {
567+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
568+
}()
569+
time.Sleep(30 * time.Millisecond)
570+
go func() {
571+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
572+
}()
573+
574+
// The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict
575+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
576+
assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load())
577+
assert.EqualValues(c, 8, sink.itemsCount.Load())
578+
}, 200*time.Millisecond, 10*time.Millisecond)
579+
580+
require.NoError(t, be.Shutdown(context.Background()))
581+
}
582+
583+
func TestBatchSenderTimerFlush(t *testing.T) {
584+
bCfg := exporterbatcher.NewDefaultConfig()
585+
bCfg.MinSizeItems = 8
586+
bCfg.FlushTimeout = 100 * time.Millisecond
587+
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
588+
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
589+
require.NoError(t, err)
590+
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
591+
sink := newFakeRequestSink()
592+
time.Sleep(50 * time.Millisecond)
593+
594+
// Send 2 concurrent requests that should be merged in one batch and sent immediately
595+
go func() {
596+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
597+
}()
598+
go func() {
599+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
600+
}()
601+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
602+
assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load())
603+
assert.EqualValues(c, 8, sink.itemsCount.Load())
604+
}, 30*time.Millisecond, 5*time.Millisecond)
605+
606+
// Send another request that should be flushed after 100ms instead of 50ms since last flush
607+
go func() {
608+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
609+
}()
610+
611+
// Confirm that it is not flushed in 50ms
612+
time.Sleep(60 * time.Millisecond)
613+
assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load())
614+
assert.EqualValues(t, 8, sink.itemsCount.Load())
615+
616+
// Confirm that it is flushed after 100ms (using 60+50=110 here to be safe)
617+
time.Sleep(50 * time.Millisecond)
618+
assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load())
619+
assert.EqualValues(t, 12, sink.itemsCount.Load())
620+
require.NoError(t, be.Shutdown(context.Background()))
621+
}
622+
538623
func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter {
539624
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption,
540625
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))

0 commit comments

Comments
 (0)