Skip to content

Commit 16d9ebd

Browse files
committed
Avoid the exporter for being stuck when telemetry data is bigger than batch.max_size
Signed-off-by: Israel Blancas <[email protected]>
1 parent c9aaed8 commit 16d9ebd

File tree

13 files changed

+427
-44
lines changed

13 files changed

+427
-44
lines changed

.chloggen/12893.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Prevents the exporter for being stuck when telemetry data is bigger than batch.max_size"
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12893]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: []
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package partialsuccess // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/partialsuccess"
5+
6+
import "fmt"
7+
8+
var _ error = &PartialSuccessError{}
9+
10+
type PartialSuccessError struct {
11+
FailureCount int
12+
Reason string
13+
}
14+
15+
func (e *PartialSuccessError) Error() string {
16+
return fmt.Sprintf("partial success: %d failed: %s", e.FailureCount, e.Reason)
17+
}

exporter/exporterhelper/internal/queuebatch/multi_batcher.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"context"
77
"sync"
88

9+
"go.uber.org/zap"
10+
911
"go.opentelemetry.io/collector/component"
1012
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1113
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
@@ -17,6 +19,7 @@ type batcherSettings[T any] struct {
1719
partitioner Partitioner[T]
1820
next sender.SendFunc[T]
1921
maxWorkers int
22+
logger *zap.Logger
2023
}
2124

2225
type multiBatcher struct {
@@ -26,6 +29,7 @@ type multiBatcher struct {
2629
sizer request.Sizer[request.Request]
2730
partitioner Partitioner[request.Request]
2831
consumeFunc sender.SendFunc[request.Request]
32+
logger *zap.Logger
2933

3034
singleShard *shardBatcher
3135
shards sync.Map
@@ -41,17 +45,19 @@ func newMultiBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *m
4145
workerPool <- struct{}{}
4246
}
4347
}
48+
4449
mb := &multiBatcher{
4550
cfg: bCfg,
4651
wp: workerPool,
4752
sizerType: bSet.sizerType,
4853
sizer: bSet.sizer,
4954
partitioner: bSet.partitioner,
5055
consumeFunc: bSet.next,
56+
logger: bSet.logger,
5157
}
5258

5359
if bSet.partitioner == nil {
54-
mb.singleShard = newShard(mb.cfg, mb.sizerType, mb.sizer, mb.wp, mb.consumeFunc)
60+
mb.singleShard = newShard(mb.cfg, mb.sizerType, mb.sizer, mb.wp, mb.consumeFunc, mb.logger.With(zap.String("shard", "single")))
5561
}
5662
return mb
5763
}
@@ -67,7 +73,7 @@ func (mb *multiBatcher) getShard(ctx context.Context, req request.Request) *shar
6773
if found {
6874
return s.(*shardBatcher)
6975
}
70-
newS := newShard(mb.cfg, mb.sizerType, mb.sizer, mb.wp, mb.consumeFunc)
76+
newS := newShard(mb.cfg, mb.sizerType, mb.sizer, mb.wp, mb.consumeFunc, mb.logger.With(zap.String("shard", key)))
7177
newS.start(ctx, nil)
7278
s, loaded := mb.shards.LoadOrStore(key, newS)
7379
// If not loaded, there was a race condition in adding the new shard. Shutdown the newly created shard.

exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
13+
"go.uber.org/zap"
1314

1415
"go.opentelemetry.io/collector/component/componenttest"
1516
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -33,6 +34,7 @@ func TestMultiBatcher_NoTimeout(t *testing.T) {
3334
}),
3435
next: sink.Export,
3536
maxWorkers: 1,
37+
logger: zap.NewNop(),
3638
})
3739

3840
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
@@ -84,6 +86,7 @@ func TestMultiBatcher_Timeout(t *testing.T) {
8486
}),
8587
next: sink.Export,
8688
maxWorkers: 1,
89+
logger: zap.NewNop(),
8790
})
8891

8992
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

exporter/exporterhelper/internal/queuebatch/queue_batch.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"errors"
99
"fmt"
1010

11+
"go.uber.org/zap"
12+
1113
"go.opentelemetry.io/collector/component"
1214
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1315
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
@@ -72,6 +74,7 @@ func newQueueBatch(
7274
partitioner: set.Partitioner,
7375
next: next,
7476
maxWorkers: cfg.NumConsumers,
77+
logger: set.Telemetry.Logger.With(zap.String("component", "multi_batcher")),
7578
})
7679
} else {
7780
b = newMultiBatcher(*cfg.Batch, batcherSettings[request.Request]{
@@ -80,6 +83,7 @@ func newQueueBatch(
8083
partitioner: set.Partitioner,
8184
next: next,
8285
maxWorkers: cfg.NumConsumers,
86+
logger: set.Telemetry.Logger.With(zap.String("component", "multi_batcher")),
8387
})
8488
}
8589
// Keep the number of queue consumers to 1 if batching is enabled until we support sharding as described in

exporter/exporterhelper/internal/queuebatch/shard_batcher.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel
55

66
import (
77
"context"
8+
"errors"
89
"sync"
910
"time"
1011

1112
"go.uber.org/multierr"
13+
"go.uber.org/zap"
1214

1315
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/partialsuccess"
1417
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1518
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1619
)
@@ -33,16 +36,18 @@ type shardBatcher struct {
3336
currentBatch *batch
3437
timer *time.Timer
3538
shutdownCh chan struct{}
39+
logger *zap.Logger
3640
}
3741

38-
func newShard(cfg BatchConfig, sizerType request.SizerType, sizer request.Sizer[request.Request], workerPool chan struct{}, next sender.SendFunc[request.Request]) *shardBatcher {
42+
func newShard(cfg BatchConfig, sizerType request.SizerType, sizer request.Sizer[request.Request], workerPool chan struct{}, next sender.SendFunc[request.Request], logger *zap.Logger) *shardBatcher {
3943
return &shardBatcher{
4044
cfg: cfg,
4145
workerPool: workerPool,
4246
sizerType: sizerType,
4347
sizer: sizer,
4448
consumeFunc: next,
4549
shutdownCh: make(chan struct{}, 1),
50+
logger: logger,
4651
}
4752
}
4853

@@ -57,7 +62,24 @@ func (qb *shardBatcher) Consume(ctx context.Context, req request.Request, done D
5762

5863
if qb.currentBatch == nil {
5964
reqList, mergeSplitErr := req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, nil)
60-
if mergeSplitErr != nil || len(reqList) == 0 {
65+
if mergeSplitErr != nil {
66+
var partialSuccessErr *partialsuccess.PartialSuccessError
67+
if !errors.As(mergeSplitErr, &partialSuccessErr) {
68+
done.OnDone(mergeSplitErr)
69+
qb.currentBatchMu.Unlock()
70+
return
71+
}
72+
qb.logger.Warn(
73+
"failed to split request",
74+
zap.Int("failure_count", partialSuccessErr.FailureCount),
75+
zap.String("reason", partialSuccessErr.Reason),
76+
)
77+
done.OnDone(mergeSplitErr)
78+
qb.currentBatchMu.Unlock()
79+
return
80+
}
81+
82+
if len(reqList) == 0 {
6183
done.OnDone(mergeSplitErr)
6284
qb.currentBatchMu.Unlock()
6385
return
@@ -92,7 +114,24 @@ func (qb *shardBatcher) Consume(ctx context.Context, req request.Request, done D
92114

93115
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
94116
// If failed to merge signal all Done callbacks from the current batch as well as the current request and reset the current batch.
95-
if mergeSplitErr != nil || len(reqList) == 0 {
117+
if mergeSplitErr != nil {
118+
var partialSuccessErr *partialsuccess.PartialSuccessError
119+
if !errors.As(mergeSplitErr, &partialSuccessErr) {
120+
done.OnDone(mergeSplitErr)
121+
qb.currentBatchMu.Unlock()
122+
return
123+
}
124+
qb.logger.Warn(
125+
"failed to split request",
126+
zap.Int("failure_count", partialSuccessErr.FailureCount),
127+
zap.String("reason", partialSuccessErr.Reason),
128+
)
129+
done.OnDone(mergeSplitErr)
130+
qb.currentBatchMu.Unlock()
131+
return
132+
}
133+
134+
if len(reqList) == 0 {
96135
done.OnDone(mergeSplitErr)
97136
qb.currentBatchMu.Unlock()
98137
return

0 commit comments

Comments
 (0)