diff --git a/exporter/exporterhelper/internal/batch_sender.go b/exporter/exporterhelper/internal/batch_sender.go index 21eed2c91d8..cb7e1ed6116 100644 --- a/exporter/exporterhelper/internal/batch_sender.go +++ b/exporter/exporterhelper/internal/batch_sender.go @@ -202,12 +202,12 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request) bs.mu.Lock() if bs.activeBatch.request != nil { - var err error - req, err = bs.activeBatch.request.Merge(ctx, req) + res, err := bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req) if err != nil { bs.mu.Unlock() return err } + req = res[0] } bs.activeRequests.Add(1) diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 85bbf3311b1..daf06830fec 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -56,35 +56,25 @@ func (r *fakeRequest) ItemsCount() int { return r.items } -func (r *fakeRequest) Merge(_ context.Context, - r2 internal.Request, -) (internal.Request, error) { - if r == nil { - return r2, nil - } - fr2 := r2.(*fakeRequest) - if fr2.mergeErr != nil { - return nil, fr2.mergeErr - } - return &fakeRequest{ - items: r.items + fr2.items, - sink: r.sink, - exportErr: fr2.exportErr, - delay: r.delay + fr2.delay, - }, nil -} - -func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig, - r2 internal.Request, -) ([]internal.Request, error) { +func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) { if r.mergeErr != nil { return nil, r.mergeErr } maxItems := cfg.MaxSizeItems if maxItems == 0 { - r, err := r.Merge(ctx, r2) - return []internal.Request{r}, err + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return []internal.Request{ + &fakeRequest{ + items: r.items + fr2.items, + sink: r.sink, + exportErr: fr2.exportErr, + delay: r.delay + fr2.delay, + }, + }, nil } var fr2 *fakeRequest diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index b990d399f28..525a043ac92 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -418,10 +418,6 @@ func (mer *mockErrorRequest) ItemsCount() int { return 7 } -func (mer *mockErrorRequest) Merge(context.Context, internal.Request) (internal.Request, error) { - return nil, nil -} - func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) { return nil, nil } @@ -468,10 +464,6 @@ func (m *mockRequest) ItemsCount() int { return m.cnt } -func (m *mockRequest) Merge(context.Context, internal.Request) (internal.Request, error) { - return nil, nil -} - func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) { return nil, nil } diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index 202bd13f53f..9d5c7077ca6 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -11,16 +11,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" ) -// Merge merges the provided logs request into the current request and returns the merged request. -func (req *logsRequest) Merge(_ context.Context, r2 Request) (Request, error) { - lr2, ok2 := r2.(*logsRequest) - if !ok2 { - return nil, errors.New("invalid input type") - } - lr2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs()) - return req, nil -} - // MergeSplit splits and/or merges the provided logs request and the current request into one or more requests // conforming with the MaxSizeConfig. func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { @@ -33,6 +23,11 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz } } + if cfg.MaxSizeItems == 0 { + req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs()) + return []Request{req}, nil + } + var ( res []Request destReq *logsRequest diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index 5780f027123..d05d87764ee 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -19,15 +19,15 @@ import ( func TestMergeLogs(t *testing.T) { lr1 := &logsRequest{ld: testdata.GenerateLogs(2)} lr2 := &logsRequest{ld: testdata.GenerateLogs(3)} - res, err := lr1.Merge(context.Background(), lr2) + res, err := lr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, lr2) require.NoError(t, err) - assert.Equal(t, 5, res.(*logsRequest).ld.LogRecordCount()) + require.Equal(t, 5, res[0].(*logsRequest).ld.LogRecordCount()) } func TestMergeLogsInvalidInput(t *testing.T) { lr1 := &tracesRequest{td: testdata.GenerateTraces(2)} lr2 := &logsRequest{ld: testdata.GenerateLogs(3)} - _, err := lr1.Merge(context.Background(), lr2) + _, err := lr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, lr2) require.Error(t, err) } diff --git a/exporter/exporterhelper/metrics_batch.go b/exporter/exporterhelper/metrics_batch.go index 66251d8c6ac..66466492932 100644 --- a/exporter/exporterhelper/metrics_batch.go +++ b/exporter/exporterhelper/metrics_batch.go @@ -11,16 +11,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) -// Merge merges the provided metrics request into the current request and returns the merged request. -func (req *metricsRequest) Merge(_ context.Context, r2 Request) (Request, error) { - mr2, ok2 := r2.(*metricsRequest) - if !ok2 { - return nil, errors.New("invalid input type") - } - mr2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics()) - return req, nil -} - // MergeSplit splits and/or merges the provided metrics request and the current request into one or more requests // conforming with the MaxSizeConfig. func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { @@ -33,6 +23,11 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max } } + if cfg.MaxSizeItems == 0 { + req2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics()) + return []Request{req}, nil + } + var ( res []Request destReq *metricsRequest diff --git a/exporter/exporterhelper/metrics_batch_test.go b/exporter/exporterhelper/metrics_batch_test.go index 879a9460f79..d6f583a6844 100644 --- a/exporter/exporterhelper/metrics_batch_test.go +++ b/exporter/exporterhelper/metrics_batch_test.go @@ -18,15 +18,15 @@ import ( func TestMergeMetrics(t *testing.T) { mr1 := &metricsRequest{md: testdata.GenerateMetrics(2)} mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)} - res, err := mr1.Merge(context.Background(), mr2) + res, err := mr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, mr2) require.NoError(t, err) - assert.Equal(t, 5, res.(*metricsRequest).md.MetricCount()) + assert.Equal(t, 5, res[0].(*metricsRequest).md.MetricCount()) } func TestMergeMetricsInvalidInput(t *testing.T) { mr1 := &tracesRequest{td: testdata.GenerateTraces(2)} mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)} - _, err := mr1.Merge(context.Background(), mr2) + _, err := mr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, mr2) require.Error(t, err) } diff --git a/exporter/exporterhelper/traces_batch.go b/exporter/exporterhelper/traces_batch.go index 7a176dd5483..63535567b6a 100644 --- a/exporter/exporterhelper/traces_batch.go +++ b/exporter/exporterhelper/traces_batch.go @@ -11,16 +11,6 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -// Merge merges the provided traces request into the current request and returns the merged request. -func (req *tracesRequest) Merge(_ context.Context, r2 Request) (Request, error) { - tr2, ok2 := r2.(*tracesRequest) - if !ok2 { - return nil, errors.New("invalid input type") - } - tr2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans()) - return req, nil -} - // MergeSplit splits and/or merges the provided traces request and the current request into one or more requests // conforming with the MaxSizeConfig. func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { @@ -33,6 +23,11 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS } } + if cfg.MaxSizeItems == 0 { + req2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans()) + return []Request{req}, nil + } + var ( res []Request destReq *tracesRequest diff --git a/exporter/exporterhelper/traces_batch_test.go b/exporter/exporterhelper/traces_batch_test.go index 445001fb499..2d84f254ed9 100644 --- a/exporter/exporterhelper/traces_batch_test.go +++ b/exporter/exporterhelper/traces_batch_test.go @@ -18,15 +18,15 @@ import ( func TestMergeTraces(t *testing.T) { tr1 := &tracesRequest{td: testdata.GenerateTraces(2)} tr2 := &tracesRequest{td: testdata.GenerateTraces(3)} - res, err := tr1.Merge(context.Background(), tr2) + res, err := tr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, tr2) require.NoError(t, err) - assert.Equal(t, 5, res.(*tracesRequest).td.SpanCount()) + assert.Equal(t, 5, res[0].(*tracesRequest).td.SpanCount()) } func TestMergeTracesInvalidInput(t *testing.T) { tr1 := &logsRequest{ld: testdata.GenerateLogs(2)} tr2 := &tracesRequest{td: testdata.GenerateTraces(3)} - _, err := tr1.Merge(context.Background(), tr2) + _, err := tr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, tr2) require.Error(t, err) } diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch.go b/exporter/exporterhelper/xexporterhelper/profiles_batch.go index ae7295e791b..8fd337878be 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch.go @@ -12,31 +12,31 @@ import ( "go.opentelemetry.io/collector/pdata/pprofile" ) -// Merge merges two profiles requests into one. -func (req *profilesRequest) Merge(_ context.Context, r2 exporterhelper.Request) (exporterhelper.Request, error) { - tr2, ok2 := r2.(*profilesRequest) - if !ok2 { - return nil, errors.New("invalid input type") - } - tr2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles()) - return req, nil -} - // MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig. func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.Request) ([]exporterhelper.Request, error) { + var req2 *profilesRequest + if r2 != nil { + var ok bool + req2, ok = r2.(*profilesRequest) + if !ok { + return nil, errors.New("invalid input type") + } + } + + if cfg.MaxSizeItems == 0 { + req2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles()) + return []exporterhelper.Request{req}, nil + } + var ( res []exporterhelper.Request destReq *profilesRequest capacityLeft = cfg.MaxSizeItems ) - for _, r := range []exporterhelper.Request{req, r2} { - if r == nil { + for _, srcReq := range []*profilesRequest{req, req2} { + if srcReq == nil { continue } - srcReq, ok := r.(*profilesRequest) - if !ok { - return nil, errors.New("invalid input type") - } if srcReq.pd.SampleCount() <= capacityLeft { if destReq == nil { destReq = srcReq diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index 76ec1df164b..2981d11830b 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -21,16 +21,17 @@ import ( func TestMergeProfiles(t *testing.T) { pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)} pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} - res, err := pr1.Merge(context.Background(), pr2) + res, err := pr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr2) require.NoError(t, err) - fmt.Fprintf(os.Stdout, "%#v\n", res.(*profilesRequest).pd) - assert.Equal(t, 5, res.(*profilesRequest).pd.SampleCount()) + assert.Len(t, res, 1) + fmt.Fprintf(os.Stdout, "%#v\n", res[0].(*profilesRequest).pd) + assert.Equal(t, 5, res[0].(*profilesRequest).pd.SampleCount()) } func TestMergeProfilesInvalidInput(t *testing.T) { pr1 := &dummyRequest{} pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} - _, err := pr2.Merge(context.Background(), pr1) + _, err := pr2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr1) assert.Error(t, err) } @@ -154,10 +155,6 @@ func (req *dummyRequest) ItemsCount() int { return 1 } -func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.Request) (exporterhelper.Request, error) { - return nil, nil -} - func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.Request) ( []exporterhelper.Request, error, ) { diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index 3023fa4df46..7e4e9273ed9 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -89,14 +89,15 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { idxList: []uint64{idx}, } } else { - mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req) + // TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified + mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req) if mergeErr != nil { qb.queue.OnProcessingFinished(idx, mergeErr) qb.currentBatchMu.Unlock() continue } qb.currentBatch = &batch{ - req: mergedReq, + req: mergedReq[0], ctx: qb.currentBatch.ctx, idxList: append(qb.currentBatch.idxList, idx), } diff --git a/exporter/internal/queue/fake_request_test.go b/exporter/internal/queue/fake_request_test.go index 65ee3a289a3..b062cdd0d68 100644 --- a/exporter/internal/queue/fake_request_test.go +++ b/exporter/internal/queue/fake_request_test.go @@ -53,32 +53,25 @@ func (r *fakeRequest) ItemsCount() int { return r.items } -func (r *fakeRequest) Merge(_ context.Context, - r2 internal.Request, -) (internal.Request, error) { - fr2 := r2.(*fakeRequest) - if fr2.mergeErr != nil { - return nil, fr2.mergeErr - } - return &fakeRequest{ - items: r.items + fr2.items, - sink: r.sink, - exportErr: fr2.exportErr, - delay: r.delay + fr2.delay, - }, nil -} - -func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig, - r2 internal.Request, -) ([]internal.Request, error) { +func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) { if r.mergeErr != nil { return nil, r.mergeErr } maxItems := cfg.MaxSizeItems if maxItems == 0 { - r, err := r.Merge(ctx, r2) - return []internal.Request{r}, err + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return []internal.Request{ + &fakeRequest{ + items: r.items + fr2.items, + sink: r.sink, + exportErr: fr2.exportErr, + delay: r.delay + fr2.delay, + }, + }, nil } var fr2 *fakeRequest diff --git a/exporter/internal/request.go b/exporter/internal/request.go index bd24d982de6..88a914b9e36 100644 --- a/exporter/internal/request.go +++ b/exporter/internal/request.go @@ -19,14 +19,9 @@ type Request interface { // sent. For example, for OTLP exporter, this value represents the number of spans, // metric data points or log records. ItemsCount() int - // Merge is a function that merges this request with another one into a single request. - // Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is - // marked as not mutable. - // Experimental: This API is at the early stage of development and may change without backward compatibility - // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. - Merge(context.Context, Request) (Request, error) // MergeSplit is a function that merge and/or splits this request with another one into multiple requests based on the // configured limit provided in MaxSizeConfig. + // MergeSplit does not split if all fields in MaxSizeConfig are not initialized (zero). // All the returned requests MUST have a number of items that does not exceed the maximum number of items. // Size of the last returned request MUST be less or equal than the size of any other returned request. // The original request MUST not be mutated if error is returned after mutation or if the exporter is