Skip to content

[chore] Deprecate the interface exporter.Request.Merge() #12012

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request)
bs.mu.Lock()

if bs.activeBatch.request != nil {
var err error
req, err = bs.activeBatch.request.Merge(ctx, req)
res, err := bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req)
if err != nil {
bs.mu.Unlock()
return err
}
req = res[0]
}

bs.activeRequests.Add(1)
Expand Down
36 changes: 13 additions & 23 deletions exporter/exporterhelper/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,35 +56,25 @@ func (r *fakeRequest) ItemsCount() int {
return r.items
}

func (r *fakeRequest) Merge(_ context.Context,
r2 internal.Request,
) (internal.Request, error) {
if r == nil {
return r2, nil
}
fr2 := r2.(*fakeRequest)
if fr2.mergeErr != nil {
return nil, fr2.mergeErr
}
return &fakeRequest{
items: r.items + fr2.items,
sink: r.sink,
exportErr: fr2.exportErr,
delay: r.delay + fr2.delay,
}, nil
}

func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig,
r2 internal.Request,
) ([]internal.Request, error) {
func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) {
if r.mergeErr != nil {
return nil, r.mergeErr
}

maxItems := cfg.MaxSizeItems
if maxItems == 0 {
r, err := r.Merge(ctx, r2)
return []internal.Request{r}, err
fr2 := r2.(*fakeRequest)
if fr2.mergeErr != nil {
return nil, fr2.mergeErr
}
return []internal.Request{
&fakeRequest{
items: r.items + fr2.items,
sink: r.sink,
exportErr: fr2.exportErr,
delay: r.delay + fr2.delay,
},
}, nil
}

var fr2 *fakeRequest
Expand Down
8 changes: 0 additions & 8 deletions exporter/exporterhelper/internal/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,6 @@ func (mer *mockErrorRequest) ItemsCount() int {
return 7
}

func (mer *mockErrorRequest) Merge(context.Context, internal.Request) (internal.Request, error) {
return nil, nil
}

func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
return nil, nil
}
Expand Down Expand Up @@ -468,10 +464,6 @@ func (m *mockRequest) ItemsCount() int {
return m.cnt
}

func (m *mockRequest) Merge(context.Context, internal.Request) (internal.Request, error) {
return nil, nil
}

func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
return nil, nil
}
Expand Down
15 changes: 5 additions & 10 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

// Merge merges the provided logs request into the current request and returns the merged request.
func (req *logsRequest) Merge(_ context.Context, r2 Request) (Request, error) {
lr2, ok2 := r2.(*logsRequest)
if !ok2 {
return nil, errors.New("invalid input type")
}
lr2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
return req, nil
}

// MergeSplit splits and/or merges the provided logs request and the current request into one or more requests
// conforming with the MaxSizeConfig.
func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
Expand All @@ -33,6 +23,11 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
}
}

if cfg.MaxSizeItems == 0 {
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
return []Request{req}, nil
}

var (
res []Request
destReq *logsRequest
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/logs_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import (
func TestMergeLogs(t *testing.T) {
lr1 := &logsRequest{ld: testdata.GenerateLogs(2)}
lr2 := &logsRequest{ld: testdata.GenerateLogs(3)}
res, err := lr1.Merge(context.Background(), lr2)
res, err := lr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, lr2)
require.NoError(t, err)
assert.Equal(t, 5, res.(*logsRequest).ld.LogRecordCount())
require.Equal(t, 5, res[0].(*logsRequest).ld.LogRecordCount())
}

func TestMergeLogsInvalidInput(t *testing.T) {
lr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
lr2 := &logsRequest{ld: testdata.GenerateLogs(3)}
_, err := lr1.Merge(context.Background(), lr2)
_, err := lr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, lr2)
require.Error(t, err)
}

Expand Down
15 changes: 5 additions & 10 deletions exporter/exporterhelper/metrics_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
)

// Merge merges the provided metrics request into the current request and returns the merged request.
func (req *metricsRequest) Merge(_ context.Context, r2 Request) (Request, error) {
mr2, ok2 := r2.(*metricsRequest)
if !ok2 {
return nil, errors.New("invalid input type")
}
mr2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics())
return req, nil
}

// MergeSplit splits and/or merges the provided metrics request and the current request into one or more requests
// conforming with the MaxSizeConfig.
func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
Expand All @@ -33,6 +23,11 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max
}
}

if cfg.MaxSizeItems == 0 {
req2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics())
return []Request{req}, nil
}

var (
res []Request
destReq *metricsRequest
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/metrics_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
func TestMergeMetrics(t *testing.T) {
mr1 := &metricsRequest{md: testdata.GenerateMetrics(2)}
mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)}
res, err := mr1.Merge(context.Background(), mr2)
res, err := mr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, mr2)
require.NoError(t, err)
assert.Equal(t, 5, res.(*metricsRequest).md.MetricCount())
assert.Equal(t, 5, res[0].(*metricsRequest).md.MetricCount())
}

func TestMergeMetricsInvalidInput(t *testing.T) {
mr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)}
_, err := mr1.Merge(context.Background(), mr2)
_, err := mr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, mr2)
require.Error(t, err)
}

Expand Down
15 changes: 5 additions & 10 deletions exporter/exporterhelper/traces_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Merge merges the provided traces request into the current request and returns the merged request.
func (req *tracesRequest) Merge(_ context.Context, r2 Request) (Request, error) {
tr2, ok2 := r2.(*tracesRequest)
if !ok2 {
return nil, errors.New("invalid input type")
}
tr2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans())
return req, nil
}

// MergeSplit splits and/or merges the provided traces request and the current request into one or more requests
// conforming with the MaxSizeConfig.
func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
Expand All @@ -33,6 +23,11 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS
}
}

if cfg.MaxSizeItems == 0 {
req2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans())
return []Request{req}, nil
}

var (
res []Request
destReq *tracesRequest
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/traces_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
func TestMergeTraces(t *testing.T) {
tr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
tr2 := &tracesRequest{td: testdata.GenerateTraces(3)}
res, err := tr1.Merge(context.Background(), tr2)
res, err := tr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, tr2)
require.NoError(t, err)
assert.Equal(t, 5, res.(*tracesRequest).td.SpanCount())
assert.Equal(t, 5, res[0].(*tracesRequest).td.SpanCount())
}

func TestMergeTracesInvalidInput(t *testing.T) {
tr1 := &logsRequest{ld: testdata.GenerateLogs(2)}
tr2 := &tracesRequest{td: testdata.GenerateTraces(3)}
_, err := tr1.Merge(context.Background(), tr2)
_, err := tr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, tr2)
require.Error(t, err)
}

Expand Down
32 changes: 16 additions & 16 deletions exporter/exporterhelper/xexporterhelper/profiles_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,31 @@ import (
"go.opentelemetry.io/collector/pdata/pprofile"
)

// Merge merges two profiles requests into one.
func (req *profilesRequest) Merge(_ context.Context, r2 exporterhelper.Request) (exporterhelper.Request, error) {
tr2, ok2 := r2.(*profilesRequest)
if !ok2 {
return nil, errors.New("invalid input type")
}
tr2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles())
return req, nil
}

// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
var req2 *profilesRequest
if r2 != nil {
var ok bool
req2, ok = r2.(*profilesRequest)
if !ok {
return nil, errors.New("invalid input type")
}
}

if cfg.MaxSizeItems == 0 {
req2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles())
return []exporterhelper.Request{req}, nil
}

var (
res []exporterhelper.Request
destReq *profilesRequest
capacityLeft = cfg.MaxSizeItems
)
for _, r := range []exporterhelper.Request{req, r2} {
if r == nil {
for _, srcReq := range []*profilesRequest{req, req2} {
if srcReq == nil {
continue
}
srcReq, ok := r.(*profilesRequest)
if !ok {
return nil, errors.New("invalid input type")
}
if srcReq.pd.SampleCount() <= capacityLeft {
if destReq == nil {
destReq = srcReq
Expand Down
13 changes: 5 additions & 8 deletions exporter/exporterhelper/xexporterhelper/profiles_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ import (
func TestMergeProfiles(t *testing.T) {
pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
res, err := pr1.Merge(context.Background(), pr2)
res, err := pr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr2)
require.NoError(t, err)
fmt.Fprintf(os.Stdout, "%#v\n", res.(*profilesRequest).pd)
assert.Equal(t, 5, res.(*profilesRequest).pd.SampleCount())
assert.Len(t, res, 1)
fmt.Fprintf(os.Stdout, "%#v\n", res[0].(*profilesRequest).pd)
assert.Equal(t, 5, res[0].(*profilesRequest).pd.SampleCount())
}

func TestMergeProfilesInvalidInput(t *testing.T) {
pr1 := &dummyRequest{}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
_, err := pr2.Merge(context.Background(), pr1)
_, err := pr2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr1)
assert.Error(t, err)
}

Expand Down Expand Up @@ -154,10 +155,6 @@ func (req *dummyRequest) ItemsCount() int {
return 1
}

func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.Request) (exporterhelper.Request, error) {
return nil, nil
}

func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.Request) (
[]exporterhelper.Request, error,
) {
Expand Down
5 changes: 3 additions & 2 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
idxList: []uint64{idx},
}
} else {
mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req)
// TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified
mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
req: mergedReq,
req: mergedReq[0],
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx),
}
Expand Down
33 changes: 13 additions & 20 deletions exporter/internal/queue/fake_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,25 @@ func (r *fakeRequest) ItemsCount() int {
return r.items
}

func (r *fakeRequest) Merge(_ context.Context,
r2 internal.Request,
) (internal.Request, error) {
fr2 := r2.(*fakeRequest)
if fr2.mergeErr != nil {
return nil, fr2.mergeErr
}
return &fakeRequest{
items: r.items + fr2.items,
sink: r.sink,
exportErr: fr2.exportErr,
delay: r.delay + fr2.delay,
}, nil
}

func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig,
r2 internal.Request,
) ([]internal.Request, error) {
func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) {
if r.mergeErr != nil {
return nil, r.mergeErr
}

maxItems := cfg.MaxSizeItems
if maxItems == 0 {
r, err := r.Merge(ctx, r2)
return []internal.Request{r}, err
fr2 := r2.(*fakeRequest)
if fr2.mergeErr != nil {
return nil, fr2.mergeErr
}
return []internal.Request{
&fakeRequest{
items: r.items + fr2.items,
sink: r.sink,
exportErr: fr2.exportErr,
delay: r.delay + fr2.delay,
},
}, nil
}

var fr2 *fakeRequest
Expand Down
7 changes: 1 addition & 6 deletions exporter/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@ type Request interface {
// sent. For example, for OTLP exporter, this value represents the number of spans,
// metric data points or log records.
ItemsCount() int
// Merge is a function that merges this request with another one into a single request.
// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is
// marked as not mutable.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Merge(context.Context, Request) (Request, error)
// MergeSplit is a function that merge and/or splits this request with another one into multiple requests based on the
// configured limit provided in MaxSizeConfig.
// MergeSplit does not split if all fields in MaxSizeConfig are not initialized (zero).
// All the returned requests MUST have a number of items that does not exceed the maximum number of items.
// Size of the last returned request MUST be less or equal than the size of any other returned request.
// The original request MUST not be mutated if error is returned after mutation or if the exporter is
Expand Down
Loading