diff --git a/.chloggen/fix-traces-split.yaml b/.chloggen/fix-traces-split.yaml new file mode 100644 index 00000000000..982000839cb --- /dev/null +++ b/.chloggen/fix-traces-split.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix MergeSplit issue that ignores the initial message size. + +# One or more tracking issues or pull requests related to the change +issues: [12257] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index ff2eb323d60..3618182d466 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -14,73 +14,36 @@ import ( // MergeSplit splits and/or merges the provided logs request and the current request into one or more requests // conforming with the MaxSizeConfig. func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { - var req2 *logsRequest if r2 != nil { - var ok bool - req2, ok = r2.(*logsRequest) + req2, ok := r2.(*logsRequest) if !ok { return nil, errors.New("invalid input type") } + req2.mergeTo(req) } + // If no limit we can simply merge the new request into the current and return. if cfg.MaxSizeItems == 0 { - req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount()) - req2.setCachedItemsCount(0) - req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs()) return []Request{req}, nil } + return req.split(cfg) +} - var ( - res []Request - destReq *logsRequest - capacityLeft = cfg.MaxSizeItems - ) - for _, srcReq := range []*logsRequest{req, req2} { - if srcReq == nil { - continue - } - - srcCount := srcReq.ItemsCount() - if srcCount <= capacityLeft { - if destReq == nil { - destReq = srcReq - } else { - destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount) - srcReq.setCachedItemsCount(0) - srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) - } - capacityLeft -= srcCount - continue - } - - for { - extractedLogs := extractLogs(srcReq.ld, capacityLeft) - extractedCount := extractedLogs.LogRecordCount() - if extractedCount == 0 { - break - } - - if destReq == nil { - destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher, cachedItemsCount: extractedCount} - } else { - extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) - destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount) - srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount) - } - - // Create new batch once capacity is reached. - capacityLeft -= extractedCount - if capacityLeft == 0 { - res = append(res, destReq) - destReq = nil - capacityLeft = cfg.MaxSizeItems - } - } - } +func (req *logsRequest) mergeTo(dst *logsRequest) { + dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount()) + req.setCachedItemsCount(0) + req.ld.ResourceLogs().MoveAndAppendTo(dst.ld.ResourceLogs()) +} - if destReq != nil { - res = append(res, destReq) +func (req *logsRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) { + var res []Request + for req.ItemsCount() > cfg.MaxSizeItems { + ld := extractLogs(req.ld, cfg.MaxSizeItems) + size := ld.LogRecordCount() + req.setCachedItemsCount(req.ItemsCount() - size) + res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size}) } + res = append(res, req) return res, nil } diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index d7b21292fd0..666b9748d19 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -153,9 +153,22 @@ func TestExtractLogs(t *testing.T) { } } -func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) { +func TestMergeSplitManySmallLogs(t *testing.T) { // All requests merge into a single batch. cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + merged := []Request{newLogsRequest(testdata.GenerateLogs(1), nil)} + for j := 0; j < 1000; j++ { + lr2 := newLogsRequest(testdata.GenerateLogs(10), nil) + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(t, merged, 2) +} + +func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) { + // All requests merge into a single batch. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10010} + b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newLogsRequest(testdata.GenerateLogs(10), nil)} for j := 0; j < 1000; j++ { @@ -170,6 +183,7 @@ func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) { func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) { // Every incoming request results in a split. cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newLogsRequest(testdata.GenerateLogs(0), nil)} for j := 0; j < 10; j++ { @@ -184,6 +198,7 @@ func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) func BenchmarkSplittingBasedOnItemCountHugeLogs(b *testing.B) { // One request splits into many batches. cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newLogsRequest(testdata.GenerateLogs(0), nil)} lr2 := newLogsRequest(testdata.GenerateLogs(100000), nil) diff --git a/exporter/exporterhelper/metrics_batch.go b/exporter/exporterhelper/metrics_batch.go index 57e00f58b4d..390517c6943 100644 --- a/exporter/exporterhelper/metrics_batch.go +++ b/exporter/exporterhelper/metrics_batch.go @@ -14,74 +14,36 @@ import ( // MergeSplit splits and/or merges the provided metrics request and the current request into one or more requests // conforming with the MaxSizeConfig. func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { - var req2 *metricsRequest if r2 != nil { - var ok bool - req2, ok = r2.(*metricsRequest) + req2, ok := r2.(*metricsRequest) if !ok { return nil, errors.New("invalid input type") } + req2.mergeTo(req) } + // If no limit we can simply merge the new request into the current and return. if cfg.MaxSizeItems == 0 { - req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount()) - req2.setCachedItemsCount(0) - req2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics()) return []Request{req}, nil } + return req.split(cfg) +} - var ( - res []Request - destReq *metricsRequest - capacityLeft = cfg.MaxSizeItems - ) - for _, srcReq := range []*metricsRequest{req, req2} { - if srcReq == nil { - continue - } - - srcCount := srcReq.ItemsCount() - if srcCount <= capacityLeft { - if destReq == nil { - destReq = srcReq - } else { - destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount) - srcReq.setCachedItemsCount(0) - srcReq.md.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics()) - } - capacityLeft -= srcCount - continue - } - - for { - extractedMetrics := extractMetrics(srcReq.md, capacityLeft) - extractedCount := extractedMetrics.DataPointCount() - if extractedCount == 0 { - break - } - - if destReq == nil { - destReq = &metricsRequest{md: extractedMetrics, pusher: srcReq.pusher, cachedItemsCount: extractedCount} - } else { - destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount) - srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount) - extractedMetrics.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics()) - } - - // Create new batch once capacity is reached. - capacityLeft -= extractedCount - if capacityLeft == 0 { - res = append(res, destReq) - destReq = nil - capacityLeft = cfg.MaxSizeItems - } - } - } +func (req *metricsRequest) mergeTo(dst *metricsRequest) { + dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount()) + req.setCachedItemsCount(0) + req.md.ResourceMetrics().MoveAndAppendTo(dst.md.ResourceMetrics()) +} - if destReq != nil { - res = append(res, destReq) +func (req *metricsRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) { + var res []Request + for req.ItemsCount() > cfg.MaxSizeItems { + md := extractMetrics(req.md, cfg.MaxSizeItems) + size := md.DataPointCount() + req.setCachedItemsCount(req.ItemsCount() - size) + res = append(res, &metricsRequest{md: md, pusher: req.pusher, cachedItemsCount: size}) } - + res = append(res, req) return res, nil } diff --git a/exporter/exporterhelper/metrics_batch_test.go b/exporter/exporterhelper/metrics_batch_test.go index 090b504c3c7..5969e0f75c8 100644 --- a/exporter/exporterhelper/metrics_batch_test.go +++ b/exporter/exporterhelper/metrics_batch_test.go @@ -161,9 +161,22 @@ func TestExtractMetricsInvalidMetric(t *testing.T) { assert.Equal(t, 0, md.ResourceMetrics().Len()) } -func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) { +func TestMergeSplitManySmallMetrics(t *testing.T) { // All requests merge into a single batch. cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000} + merged := []Request{newMetricsRequest(testdata.GenerateMetrics(1), nil)} + for j := 0; j < 1000; j++ { + lr2 := newMetricsRequest(testdata.GenerateMetrics(10), nil) + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(t, merged, 2) +} + +func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) { + // All requests merge into a single batch. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20020} + b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newMetricsRequest(testdata.GenerateMetrics(10), nil)} for j := 0; j < 1000; j++ { @@ -178,6 +191,7 @@ func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) { func BenchmarkSplittingBasedOnItemCountManyMetricsSlightlyAboveLimit(b *testing.B) { // Every incoming request results in a split. cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000} + b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newMetricsRequest(testdata.GenerateMetrics(0), nil)} for j := 0; j < 10; j++ { @@ -192,6 +206,7 @@ func BenchmarkSplittingBasedOnItemCountManyMetricsSlightlyAboveLimit(b *testing. func BenchmarkSplittingBasedOnItemCountHugeMetrics(b *testing.B) { // One request splits into many batches. cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000} + b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newMetricsRequest(testdata.GenerateMetrics(0), nil)} lr2 := newMetricsRequest(testdata.GenerateMetrics(100000), nil) diff --git a/exporter/exporterhelper/traces_batch.go b/exporter/exporterhelper/traces_batch.go index 5e8f7230e68..91b936ec611 100644 --- a/exporter/exporterhelper/traces_batch.go +++ b/exporter/exporterhelper/traces_batch.go @@ -14,73 +14,36 @@ import ( // MergeSplit splits and/or merges the provided traces request and the current request into one or more requests // conforming with the MaxSizeConfig. func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { - var req2 *tracesRequest if r2 != nil { - var ok bool - req2, ok = r2.(*tracesRequest) + req2, ok := r2.(*tracesRequest) if !ok { return nil, errors.New("invalid input type") } + req2.mergeTo(req) } + // If no limit we can simply merge the new request into the current and return. if cfg.MaxSizeItems == 0 { - req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount()) - req2.setCachedItemsCount(0) - req2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans()) return []Request{req}, nil } + return req.split(cfg) +} - var ( - res []Request - destReq *tracesRequest - capacityLeft = cfg.MaxSizeItems - ) - for _, srcReq := range []*tracesRequest{req, req2} { - if srcReq == nil { - continue - } - - srcCount := srcReq.ItemsCount() - if srcCount <= capacityLeft { - if destReq == nil { - destReq = srcReq - } else { - destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount) - srcReq.setCachedItemsCount(0) - srcReq.td.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans()) - } - capacityLeft -= srcCount - continue - } - - for { - extractedTraces := extractTraces(srcReq.td, capacityLeft) - extractedCount := extractedTraces.SpanCount() - if extractedCount == 0 { - break - } - - if destReq == nil { - destReq = &tracesRequest{td: extractedTraces, pusher: srcReq.pusher, cachedItemsCount: extractedCount} - } else { - destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount) - srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount) - extractedTraces.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans()) - } - - // Create new batch once capacity is reached. - capacityLeft -= extractedCount - if capacityLeft == 0 { - res = append(res, destReq) - destReq = nil - capacityLeft = cfg.MaxSizeItems - } - } - } +func (req *tracesRequest) mergeTo(dst *tracesRequest) { + dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount()) + req.setCachedItemsCount(0) + req.td.ResourceSpans().MoveAndAppendTo(dst.td.ResourceSpans()) +} - if destReq != nil { - res = append(res, destReq) +func (req *tracesRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) { + var res []Request + for req.ItemsCount() > cfg.MaxSizeItems { + td := extractTraces(req.td, cfg.MaxSizeItems) + size := td.SpanCount() + req.setCachedItemsCount(req.ItemsCount() - size) + res = append(res, &tracesRequest{td: td, pusher: req.pusher, cachedItemsCount: size}) } + res = append(res, req) return res, nil } diff --git a/exporter/exporterhelper/traces_batch_test.go b/exporter/exporterhelper/traces_batch_test.go index 122b0bb5e60..7444273260e 100644 --- a/exporter/exporterhelper/traces_batch_test.go +++ b/exporter/exporterhelper/traces_batch_test.go @@ -160,9 +160,20 @@ func TestExtractTraces(t *testing.T) { } } +func TestMergeSplitManySmallTraces(t *testing.T) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + merged := []Request{newTracesRequest(testdata.GenerateTraces(1), nil)} + for j := 0; j < 1000; j++ { + lr2 := newTracesRequest(testdata.GenerateTraces(10), nil) + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(t, merged, 2) +} + func BenchmarkSplittingBasedOnItemCountManySmallTraces(b *testing.B) { // All requests merge into a single batch. - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10010} b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newTracesRequest(testdata.GenerateTraces(10), nil)} @@ -193,6 +204,7 @@ func BenchmarkSplittingBasedOnItemCountManyTracesSlightlyAboveLimit(b *testing.B func BenchmarkSplittingBasedOnItemCountHugeTraces(b *testing.B) { // One request splits into many batches. cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + b.ReportAllocs() for i := 0; i < b.N; i++ { merged := []Request{newTracesRequest(testdata.GenerateTraces(0), nil)} lr2 := newTracesRequest(testdata.GenerateTraces(100000), nil) diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch.go b/exporter/exporterhelper/xexporterhelper/profiles_batch.go index da7d251a7e5..6efd95d82c3 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch.go @@ -14,72 +14,36 @@ import ( // MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig. func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.Request) ([]exporterhelper.Request, error) { - var req2 *profilesRequest if r2 != nil { - var ok bool - req2, ok = r2.(*profilesRequest) + req2, ok := r2.(*profilesRequest) if !ok { return nil, errors.New("invalid input type") } + req2.mergeTo(req) } + // If no limit we can simply merge the new request into the current and return. if cfg.MaxSizeItems == 0 { - req.setCachedItemsCount(req.ItemsCount() + req2.ItemsCount()) - req2.setCachedItemsCount(0) - req2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles()) return []exporterhelper.Request{req}, nil } + return req.split(cfg) +} - var ( - res []exporterhelper.Request - destReq *profilesRequest - capacityLeft = cfg.MaxSizeItems - ) - for _, srcReq := range []*profilesRequest{req, req2} { - if srcReq == nil { - continue - } - - srcCount := srcReq.pd.SampleCount() - if srcCount <= capacityLeft { - if destReq == nil { - destReq = srcReq - } else { - destReq.setCachedItemsCount(destReq.ItemsCount() + srcCount) - srcReq.setCachedItemsCount(0) - srcReq.pd.ResourceProfiles().MoveAndAppendTo(destReq.pd.ResourceProfiles()) - } - capacityLeft -= srcCount - continue - } - - for { - extractedProfiles := extractProfiles(srcReq.pd, capacityLeft) - extractedCount := extractedProfiles.SampleCount() - if extractedCount == 0 { - break - } - - capacityLeft -= extractedProfiles.SampleCount() - if destReq == nil { - destReq = newProfilesRequest(extractedProfiles, srcReq.pusher).(*profilesRequest) - } else { - destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount) - srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount) - extractedProfiles.ResourceProfiles().MoveAndAppendTo(destReq.pd.ResourceProfiles()) - } - // Create new batch once capacity is reached. - if capacityLeft == 0 { - res = append(res, destReq) - destReq = nil - capacityLeft = cfg.MaxSizeItems - } - } - } +func (req *profilesRequest) mergeTo(dst *profilesRequest) { + dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount()) + req.setCachedItemsCount(0) + req.pd.ResourceProfiles().MoveAndAppendTo(dst.pd.ResourceProfiles()) +} - if destReq != nil { - res = append(res, destReq) +func (req *profilesRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]exporterhelper.Request, error) { + var res []exporterhelper.Request + for req.ItemsCount() > cfg.MaxSizeItems { + pd := extractProfiles(req.pd, cfg.MaxSizeItems) + size := pd.SampleCount() + req.setCachedItemsCount(req.ItemsCount() - size) + res = append(res, &profilesRequest{pd: pd, pusher: req.pusher, cachedItemsCount: size}) } + res = append(res, req) return res, nil } diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index 2b044fac14c..cca1949ec89 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -133,3 +133,15 @@ func TestExtractProfiles(t *testing.T) { assert.Equal(t, 10-i, ld.SampleCount()) } } + +func TestMergeSplitManySmallLogs(t *testing.T) { + // All requests merge into a single batch. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + merged := []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(1), nil)} + for j := 0; j < 1000; j++ { + lr2 := newProfilesRequest(testdata.GenerateProfiles(10), nil) + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(t, merged, 2) +}