Skip to content

Commit dec97c9

Browse files
committed
Fix MergeSplit issue that ignores the initial message size
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 643a35f commit dec97c9

File tree

9 files changed

+154
-223
lines changed

9 files changed

+154
-223
lines changed

.chloggen/fix-traces-split.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix MergeSplit issue that ignores the initial message size
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12257]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/exporterhelper/logs_batch.go

Lines changed: 18 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,73 +14,36 @@ import (
1414
// MergeSplit splits and/or merges the provided logs request and the current request into one or more requests
1515
// conforming with the MaxSizeConfig.
1616
func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
17-
var req2 *logsRequest
1817
if r2 != nil {
19-
var ok bool
20-
req2, ok = r2.(*logsRequest)
18+
req2, ok := r2.(*logsRequest)
2119
if !ok {
2220
return nil, errors.New("invalid input type")
2321
}
22+
req2.mergeTo(req)
2423
}
2524

25+
// If no limit we can simply merge the new request into the current and return.
2626
if cfg.MaxSizeItems == 0 {
27-
req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount())
28-
req2.setCachedItemsCount(0)
29-
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
3027
return []Request{req}, nil
3128
}
29+
return req.split(cfg)
30+
}
3231

33-
var (
34-
res []Request
35-
destReq *logsRequest
36-
capacityLeft = cfg.MaxSizeItems
37-
)
38-
for _, srcReq := range []*logsRequest{req, req2} {
39-
if srcReq == nil {
40-
continue
41-
}
42-
43-
srcCount := srcReq.ItemsCount()
44-
if srcCount <= capacityLeft {
45-
if destReq == nil {
46-
destReq = srcReq
47-
} else {
48-
destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount)
49-
srcReq.setCachedItemsCount(0)
50-
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
51-
}
52-
capacityLeft -= srcCount
53-
continue
54-
}
55-
56-
for {
57-
extractedLogs := extractLogs(srcReq.ld, capacityLeft)
58-
extractedCount := extractedLogs.LogRecordCount()
59-
if extractedCount == 0 {
60-
break
61-
}
62-
63-
if destReq == nil {
64-
destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher, cachedItemsCount: extractedCount}
65-
} else {
66-
extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
67-
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
68-
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
69-
}
70-
71-
// Create new batch once capacity is reached.
72-
capacityLeft -= extractedCount
73-
if capacityLeft == 0 {
74-
res = append(res, destReq)
75-
destReq = nil
76-
capacityLeft = cfg.MaxSizeItems
77-
}
78-
}
79-
}
32+
func (req *logsRequest) mergeTo(dst *logsRequest) {
33+
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
34+
req.setCachedItemsCount(0)
35+
req.ld.ResourceLogs().MoveAndAppendTo(dst.ld.ResourceLogs())
36+
}
8037

81-
if destReq != nil {
82-
res = append(res, destReq)
38+
func (req *logsRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) {
39+
var res []Request
40+
for req.ItemsCount() > cfg.MaxSizeItems {
41+
ld := extractLogs(req.ld, cfg.MaxSizeItems)
42+
size := ld.LogRecordCount()
43+
req.setCachedItemsCount(req.ItemsCount() - size)
44+
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size})
8345
}
46+
res = append(res, req)
8447
return res, nil
8548
}
8649

exporter/exporterhelper/logs_batch_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,22 @@ func TestExtractLogs(t *testing.T) {
153153
}
154154
}
155155

156-
func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) {
156+
func TestMergeSplitManySmallLogs(t *testing.T) {
157157
// All requests merge into a single batch.
158158
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
159+
merged := []Request{newLogsRequest(testdata.GenerateLogs(1), nil)}
160+
for j := 0; j < 1000; j++ {
161+
lr2 := newLogsRequest(testdata.GenerateLogs(10), nil)
162+
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
163+
merged = append(merged[0:len(merged)-1], res...)
164+
}
165+
assert.Len(t, merged, 2)
166+
}
167+
168+
func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) {
169+
// All requests merge into a single batch.
170+
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10010}
171+
b.ReportAllocs()
159172
for i := 0; i < b.N; i++ {
160173
merged := []Request{newLogsRequest(testdata.GenerateLogs(10), nil)}
161174
for j := 0; j < 1000; j++ {
@@ -170,6 +183,7 @@ func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) {
170183
func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) {
171184
// Every incoming request results in a split.
172185
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
186+
b.ReportAllocs()
173187
for i := 0; i < b.N; i++ {
174188
merged := []Request{newLogsRequest(testdata.GenerateLogs(0), nil)}
175189
for j := 0; j < 10; j++ {
@@ -184,6 +198,7 @@ func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B)
184198
func BenchmarkSplittingBasedOnItemCountHugeLogs(b *testing.B) {
185199
// One request splits into many batches.
186200
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
201+
b.ReportAllocs()
187202
for i := 0; i < b.N; i++ {
188203
merged := []Request{newLogsRequest(testdata.GenerateLogs(0), nil)}
189204
lr2 := newLogsRequest(testdata.GenerateLogs(100000), nil)

exporter/exporterhelper/metrics_batch.go

Lines changed: 18 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -14,74 +14,36 @@ import (
1414
// MergeSplit splits and/or merges the provided metrics request and the current request into one or more requests
1515
// conforming with the MaxSizeConfig.
1616
func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
17-
var req2 *metricsRequest
1817
if r2 != nil {
19-
var ok bool
20-
req2, ok = r2.(*metricsRequest)
18+
req2, ok := r2.(*metricsRequest)
2119
if !ok {
2220
return nil, errors.New("invalid input type")
2321
}
22+
req2.mergeTo(req)
2423
}
2524

25+
// If no limit we can simply merge the new request into the current and return.
2626
if cfg.MaxSizeItems == 0 {
27-
req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount())
28-
req2.setCachedItemsCount(0)
29-
req2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics())
3027
return []Request{req}, nil
3128
}
29+
return req.split(cfg)
30+
}
3231

33-
var (
34-
res []Request
35-
destReq *metricsRequest
36-
capacityLeft = cfg.MaxSizeItems
37-
)
38-
for _, srcReq := range []*metricsRequest{req, req2} {
39-
if srcReq == nil {
40-
continue
41-
}
42-
43-
srcCount := srcReq.ItemsCount()
44-
if srcCount <= capacityLeft {
45-
if destReq == nil {
46-
destReq = srcReq
47-
} else {
48-
destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount)
49-
srcReq.setCachedItemsCount(0)
50-
srcReq.md.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics())
51-
}
52-
capacityLeft -= srcCount
53-
continue
54-
}
55-
56-
for {
57-
extractedMetrics := extractMetrics(srcReq.md, capacityLeft)
58-
extractedCount := extractedMetrics.DataPointCount()
59-
if extractedCount == 0 {
60-
break
61-
}
62-
63-
if destReq == nil {
64-
destReq = &metricsRequest{md: extractedMetrics, pusher: srcReq.pusher, cachedItemsCount: extractedCount}
65-
} else {
66-
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
67-
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
68-
extractedMetrics.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics())
69-
}
70-
71-
// Create new batch once capacity is reached.
72-
capacityLeft -= extractedCount
73-
if capacityLeft == 0 {
74-
res = append(res, destReq)
75-
destReq = nil
76-
capacityLeft = cfg.MaxSizeItems
77-
}
78-
}
79-
}
32+
func (req *metricsRequest) mergeTo(dst *metricsRequest) {
33+
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
34+
req.setCachedItemsCount(0)
35+
req.md.ResourceMetrics().MoveAndAppendTo(dst.md.ResourceMetrics())
36+
}
8037

81-
if destReq != nil {
82-
res = append(res, destReq)
38+
func (req *metricsRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) {
39+
var res []Request
40+
for req.ItemsCount() > cfg.MaxSizeItems {
41+
md := extractMetrics(req.md, cfg.MaxSizeItems)
42+
size := md.DataPointCount()
43+
req.setCachedItemsCount(req.ItemsCount() - size)
44+
res = append(res, &metricsRequest{md: md, pusher: req.pusher, cachedItemsCount: size})
8345
}
84-
46+
res = append(res, req)
8547
return res, nil
8648
}
8749

exporter/exporterhelper/metrics_batch_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,22 @@ func TestExtractMetricsInvalidMetric(t *testing.T) {
161161
assert.Equal(t, 0, md.ResourceMetrics().Len())
162162
}
163163

164-
func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) {
164+
func TestMergeSplitManySmallMetrics(t *testing.T) {
165165
// All requests merge into a single batch.
166166
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000}
167+
merged := []Request{newMetricsRequest(testdata.GenerateMetrics(1), nil)}
168+
for j := 0; j < 1000; j++ {
169+
lr2 := newMetricsRequest(testdata.GenerateMetrics(10), nil)
170+
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
171+
merged = append(merged[0:len(merged)-1], res...)
172+
}
173+
assert.Len(t, merged, 2)
174+
}
175+
176+
func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) {
177+
// All requests merge into a single batch.
178+
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20020}
179+
b.ReportAllocs()
167180
for i := 0; i < b.N; i++ {
168181
merged := []Request{newMetricsRequest(testdata.GenerateMetrics(10), nil)}
169182
for j := 0; j < 1000; j++ {
@@ -178,6 +191,7 @@ func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) {
178191
func BenchmarkSplittingBasedOnItemCountManyMetricsSlightlyAboveLimit(b *testing.B) {
179192
// Every incoming request results in a split.
180193
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000}
194+
b.ReportAllocs()
181195
for i := 0; i < b.N; i++ {
182196
merged := []Request{newMetricsRequest(testdata.GenerateMetrics(0), nil)}
183197
for j := 0; j < 10; j++ {
@@ -192,6 +206,7 @@ func BenchmarkSplittingBasedOnItemCountManyMetricsSlightlyAboveLimit(b *testing.
192206
func BenchmarkSplittingBasedOnItemCountHugeMetrics(b *testing.B) {
193207
// One request splits into many batches.
194208
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000}
209+
b.ReportAllocs()
195210
for i := 0; i < b.N; i++ {
196211
merged := []Request{newMetricsRequest(testdata.GenerateMetrics(0), nil)}
197212
lr2 := newMetricsRequest(testdata.GenerateMetrics(100000), nil)

exporter/exporterhelper/traces_batch.go

Lines changed: 18 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,73 +14,36 @@ import (
1414
// MergeSplit splits and/or merges the provided traces request and the current request into one or more requests
1515
// conforming with the MaxSizeConfig.
1616
func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
17-
var req2 *tracesRequest
1817
if r2 != nil {
19-
var ok bool
20-
req2, ok = r2.(*tracesRequest)
18+
req2, ok := r2.(*tracesRequest)
2119
if !ok {
2220
return nil, errors.New("invalid input type")
2321
}
22+
req2.mergeTo(req)
2423
}
2524

25+
// If no limit we can simply merge the new request into the current and return.
2626
if cfg.MaxSizeItems == 0 {
27-
req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount())
28-
req2.setCachedItemsCount(0)
29-
req2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans())
3027
return []Request{req}, nil
3128
}
29+
return req.split(cfg)
30+
}
3231

33-
var (
34-
res []Request
35-
destReq *tracesRequest
36-
capacityLeft = cfg.MaxSizeItems
37-
)
38-
for _, srcReq := range []*tracesRequest{req, req2} {
39-
if srcReq == nil {
40-
continue
41-
}
42-
43-
srcCount := srcReq.ItemsCount()
44-
if srcCount <= capacityLeft {
45-
if destReq == nil {
46-
destReq = srcReq
47-
} else {
48-
destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount)
49-
srcReq.setCachedItemsCount(0)
50-
srcReq.td.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans())
51-
}
52-
capacityLeft -= srcCount
53-
continue
54-
}
55-
56-
for {
57-
extractedTraces := extractTraces(srcReq.td, capacityLeft)
58-
extractedCount := extractedTraces.SpanCount()
59-
if extractedCount == 0 {
60-
break
61-
}
62-
63-
if destReq == nil {
64-
destReq = &tracesRequest{td: extractedTraces, pusher: srcReq.pusher, cachedItemsCount: extractedCount}
65-
} else {
66-
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
67-
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
68-
extractedTraces.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans())
69-
}
70-
71-
// Create new batch once capacity is reached.
72-
capacityLeft -= extractedCount
73-
if capacityLeft == 0 {
74-
res = append(res, destReq)
75-
destReq = nil
76-
capacityLeft = cfg.MaxSizeItems
77-
}
78-
}
79-
}
32+
func (req *tracesRequest) mergeTo(dst *tracesRequest) {
33+
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
34+
req.setCachedItemsCount(0)
35+
req.td.ResourceSpans().MoveAndAppendTo(dst.td.ResourceSpans())
36+
}
8037

81-
if destReq != nil {
82-
res = append(res, destReq)
38+
func (req *tracesRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) {
39+
var res []Request
40+
for req.ItemsCount() > cfg.MaxSizeItems {
41+
td := extractTraces(req.td, cfg.MaxSizeItems)
42+
size := td.SpanCount()
43+
req.setCachedItemsCount(req.ItemsCount() - size)
44+
res = append(res, &tracesRequest{td: td, pusher: req.pusher, cachedItemsCount: size})
8345
}
46+
res = append(res, req)
8447
return res, nil
8548
}
8649

0 commit comments

Comments
 (0)