Skip to content

Commit 81d149c

Browse files
committed
Avoid the exporter for being stuck when telemetry data is bigger than batch.max_size
Signed-off-by: Israel Blancas <[email protected]>
1 parent 2e61528 commit 81d149c

File tree

11 files changed

+282
-47
lines changed

11 files changed

+282
-47
lines changed

.chloggen/12893.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Prevents the exporter for being stuck when telemetry data is bigger than batch.max_size"
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12893]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: []
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package partialsuccess // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/partialsuccess"
5+
6+
import "fmt"
7+
8+
var _ error = &PartialSuccessError{}
9+
10+
type PartialSuccessError struct {
11+
FailureCount int
12+
Reason string
13+
}
14+
15+
func (e *PartialSuccessError) Error() string {
16+
return fmt.Sprintf("partial success: %d failed: %s", e.FailureCount, e.Reason)
17+
}

exporter/exporterhelper/internal/queuebatch/default_batcher.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel
55

66
import (
77
"context"
8+
"errors"
89
"sync"
910
"time"
1011

1112
"go.uber.org/multierr"
13+
"go.uber.org/zap"
1214

1315
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/partialsuccess"
1417
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1518
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1619
)
@@ -40,9 +43,10 @@ type defaultBatcher struct {
4043
currentBatch *batch
4144
timer *time.Timer
4245
shutdownCh chan struct{}
46+
logger *zap.Logger
4347
}
4448

45-
func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *defaultBatcher {
49+
func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request], set Settings[request.Request]) *defaultBatcher {
4650
// TODO: Determine what is the right behavior for this in combination with async queue.
4751
var workerPool chan struct{}
4852
if bSet.maxWorkers != 0 {
@@ -59,6 +63,7 @@ func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request])
5963
consumeFunc: bSet.next,
6064
stopWG: sync.WaitGroup{},
6165
shutdownCh: make(chan struct{}, 1),
66+
logger: set.Telemetry.Logger,
6267
}
6368
}
6469

@@ -73,8 +78,21 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
7378

7479
if qb.currentBatch == nil {
7580
reqList, mergeSplitErr := req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, nil)
76-
if mergeSplitErr != nil || len(reqList) == 0 {
77-
done.OnDone(mergeSplitErr)
81+
if mergeSplitErr != nil {
82+
var partialSuccessErr *partialsuccess.PartialSuccessError
83+
if !errors.As(mergeSplitErr, &partialSuccessErr) {
84+
done.OnDone(mergeSplitErr)
85+
qb.currentBatchMu.Unlock()
86+
return
87+
}
88+
qb.logger.Warn(
89+
"failed to split request",
90+
zap.Int("failure_count", partialSuccessErr.FailureCount),
91+
zap.String("reason", partialSuccessErr.Reason),
92+
)
93+
}
94+
95+
if len(reqList) == 0 {
7896
qb.currentBatchMu.Unlock()
7997
return
8098
}
@@ -108,8 +126,20 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
108126

109127
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
110128
// If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch.
111-
if mergeSplitErr != nil || len(reqList) == 0 {
112-
done.OnDone(mergeSplitErr)
129+
if mergeSplitErr != nil {
130+
var partialSuccessErr *partialsuccess.PartialSuccessError
131+
if !errors.As(mergeSplitErr, &partialSuccessErr) {
132+
done.OnDone(mergeSplitErr)
133+
qb.currentBatchMu.Unlock()
134+
return
135+
}
136+
qb.logger.Warn(
137+
"failed to split request",
138+
zap.Int("failure_count", partialSuccessErr.FailureCount),
139+
zap.String("reason", partialSuccessErr.Reason),
140+
)
141+
}
142+
if len(reqList) == 0 {
113143
qb.currentBatchMu.Unlock()
114144
return
115145
}

exporter/exporterhelper/internal/queuebatch/default_batcher_test.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import (
1313

1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
16+
"go.uber.org/zap"
1617

18+
"go.opentelemetry.io/collector/component"
1719
"go.opentelemetry.io/collector/component/componenttest"
1820
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1921
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
@@ -64,6 +66,11 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
6466
sizer: tt.sizer,
6567
next: sink.Export,
6668
maxWorkers: tt.maxWorkers,
69+
}, Settings[request.Request]{
70+
ID: exporterID,
71+
Telemetry: component.TelemetrySettings{
72+
Logger: zap.NewNop(),
73+
},
6774
})
6875
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
6976
t.Cleanup(func() {
@@ -134,7 +141,13 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
134141
sizer: tt.sizer,
135142
next: sink.Export,
136143
maxWorkers: tt.maxWorkers,
137-
})
144+
},
145+
Settings[request.Request]{
146+
ID: exporterID,
147+
Telemetry: component.TelemetrySettings{
148+
Logger: zap.NewNop(),
149+
},
150+
})
138151
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
139152

140153
done := newFakeDone()
@@ -219,6 +232,11 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
219232
sizer: tt.sizer,
220233
next: sink.Export,
221234
maxWorkers: tt.maxWorkers,
235+
}, Settings[request.Request]{
236+
ID: exporterID,
237+
Telemetry: component.TelemetrySettings{
238+
Logger: zap.NewNop(),
239+
},
222240
})
223241
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
224242
t.Cleanup(func() {
@@ -295,6 +313,11 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
295313
sizer: tt.sizer,
296314
next: sink.Export,
297315
maxWorkers: tt.maxWorkers,
316+
}, Settings[request.Request]{
317+
ID: exporterID,
318+
Telemetry: component.TelemetrySettings{
319+
Logger: zap.NewNop(),
320+
},
298321
})
299322
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
300323

@@ -347,6 +370,11 @@ func TestDefaultBatcher_Shutdown(t *testing.T) {
347370
sizer: request.NewItemsSizer(),
348371
next: sink.Export,
349372
maxWorkers: 2,
373+
}, Settings[request.Request]{
374+
ID: exporterID,
375+
Telemetry: component.TelemetrySettings{
376+
Logger: zap.NewNop(),
377+
},
350378
})
351379
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
352380

@@ -380,6 +408,11 @@ func TestDefaultBatcher_MergeError(t *testing.T) {
380408
sizer: request.NewItemsSizer(),
381409
next: sink.Export,
382410
maxWorkers: 2,
411+
}, Settings[request.Request]{
412+
ID: exporterID,
413+
Telemetry: component.TelemetrySettings{
414+
Logger: zap.NewNop(),
415+
},
383416
})
384417

385418
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

exporter/exporterhelper/internal/queuebatch/queue_batch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,14 @@ func newQueueBatch(
7070
sizer: request.NewItemsSizer(),
7171
next: next,
7272
maxWorkers: cfg.NumConsumers,
73-
})
73+
}, set)
7474
} else {
7575
b = newDefaultBatcher(*cfg.Batch, batcherSettings[request.Request]{
7676
sizerType: cfg.Sizer,
7777
sizer: sizer,
7878
next: next,
7979
maxWorkers: cfg.NumConsumers,
80-
})
80+
}, set)
8181
}
8282
// Keep the number of queue consumers to 1 if batching is enabled until we support sharding as described in
8383
// https://github.com/open-telemetry/opentelemetry-collector/issues/12473

exporter/exporterhelper/logs_batch.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte
66
import (
77
"context"
88
"errors"
9+
"strconv"
910

11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/partialsuccess"
1012
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1113
"go.opentelemetry.io/collector/pdata/plog"
1214
)
@@ -36,7 +38,7 @@ func (req *logsRequest) MergeSplit(_ context.Context, maxSize int, szt RequestSi
3638
return []Request{req}, nil
3739
}
3840

39-
return req.split(maxSize, sz), nil
41+
return req.split(maxSize, sz)
4042
}
4143

4244
func (req *logsRequest) mergeTo(dst *logsRequest, sz sizer.LogsSizer) {
@@ -47,15 +49,34 @@ func (req *logsRequest) mergeTo(dst *logsRequest, sz sizer.LogsSizer) {
4749
req.ld.ResourceLogs().MoveAndAppendTo(dst.ld.ResourceLogs())
4850
}
4951

50-
func (req *logsRequest) split(maxSize int, sz sizer.LogsSizer) []Request {
52+
func (req *logsRequest) split(maxSize int, sz sizer.LogsSizer) ([]Request, error) {
5153
var res []Request
52-
for req.size(sz) > maxSize {
53-
ld, rmSize := extractLogs(req.ld, maxSize, sz)
54-
req.setCachedSize(req.size(sz) - rmSize)
55-
res = append(res, newLogsRequest(ld))
54+
var ld plog.Logs
55+
rmSize := -1
56+
57+
previousSize := req.size(sz)
58+
59+
for req.size(sz) > maxSize && rmSize != 0 {
60+
ld, rmSize = extractLogs(req.ld, maxSize, sz)
61+
if ld.LogRecordCount() > 0 {
62+
req.setCachedSize(req.size(sz) - rmSize)
63+
res = append(res, newLogsRequest(ld))
64+
}
5665
}
66+
67+
if req.size(sz) == previousSize && req.size(sz) > maxSize {
68+
// This means we cannot split the log and is not possible
69+
// to fit it into the max size.
70+
err := &partialsuccess.PartialSuccessError{
71+
FailureCount: req.ld.LogRecordCount(),
72+
Reason: "failed to split logs request: size is greater than max size. size: " +
73+
strconv.Itoa(req.size(sz)) + ", max_size: " + strconv.Itoa(maxSize),
74+
}
75+
return res, err
76+
}
77+
5778
res = append(res, req)
58-
return res
79+
return res, nil
5980
}
6081

6182
// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.

exporter/exporterhelper/logs_batch_test.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
1212

13+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/partialsuccess"
1314
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1415
"go.opentelemetry.io/collector/pdata/plog"
1516
"go.opentelemetry.io/collector/pdata/testdata"
@@ -128,12 +129,13 @@ func TestMergeSplitLogs(t *testing.T) {
128129

129130
func TestMergeSplitLogsBasedOnByteSize(t *testing.T) {
130131
tests := []struct {
131-
name string
132-
szt RequestSizerType
133-
maxSize int
134-
lr1 Request
135-
lr2 Request
136-
expected []Request
132+
name string
133+
szt RequestSizerType
134+
maxSize int
135+
lr1 Request
136+
lr2 Request
137+
expected []Request
138+
expectPartialError bool
137139
}{
138140
{
139141
name: "both_requests_empty",
@@ -219,10 +221,30 @@ func TestMergeSplitLogsBasedOnByteSize(t *testing.T) {
219221
}()),
220222
},
221223
},
224+
{
225+
name: "unsplittable_large_log",
226+
szt: RequestSizerTypeBytes,
227+
maxSize: 10,
228+
lr1: newLogsRequest(func() plog.Logs {
229+
ld := testdata.GenerateLogs(1)
230+
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(string(make([]byte, 100)))
231+
return ld
232+
}()),
233+
lr2: nil,
234+
expected: []Request{},
235+
expectPartialError: true,
236+
},
222237
}
223238
for _, tt := range tests {
224239
t.Run(tt.name, func(t *testing.T) {
225240
res, err := tt.lr1.MergeSplit(context.Background(), tt.maxSize, tt.szt, tt.lr2)
241+
if tt.expectPartialError {
242+
require.Error(t, err)
243+
var partialErr *partialsuccess.PartialSuccessError
244+
require.ErrorAs(t, err, &partialErr)
245+
assert.Contains(t, err.Error(), "failed to split logs request: size is greater than max size")
246+
return
247+
}
226248
require.NoError(t, err)
227249
assert.Len(t, res, len(tt.expected))
228250
for i := range res {

0 commit comments

Comments
 (0)