Skip to content

Commit fbca1ee

Browse files
authored
[chore] Deprecate the interface exporter.Request.Merge() (#12012)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR removes the interface `exporter.Request.Merge()` because `Merge()` is just a special case of `MergeSplit()`. With just one interface, we no longer need an if-else loop in the batcher based on MaxConfigSize. `MergeSplit()` should be able to handle both the case where MaxConfigSize is specified or the case where it is not <!-- Issue number if applicable --> #### Link to tracking issue Fixes # <!--Describe what testing was performed and which tests were added.--> #### Testing All the existing cases should still pass. <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 3e87d0d commit fbca1ee

14 files changed

+77
-124
lines changed

exporter/exporterhelper/internal/batch_sender.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,12 +202,12 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request)
202202
bs.mu.Lock()
203203

204204
if bs.activeBatch.request != nil {
205-
var err error
206-
req, err = bs.activeBatch.request.Merge(ctx, req)
205+
res, err := bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req)
207206
if err != nil {
208207
bs.mu.Unlock()
209208
return err
210209
}
210+
req = res[0]
211211
}
212212

213213
bs.activeRequests.Add(1)

exporter/exporterhelper/internal/request.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -56,35 +56,25 @@ func (r *fakeRequest) ItemsCount() int {
5656
return r.items
5757
}
5858

59-
func (r *fakeRequest) Merge(_ context.Context,
60-
r2 internal.Request,
61-
) (internal.Request, error) {
62-
if r == nil {
63-
return r2, nil
64-
}
65-
fr2 := r2.(*fakeRequest)
66-
if fr2.mergeErr != nil {
67-
return nil, fr2.mergeErr
68-
}
69-
return &fakeRequest{
70-
items: r.items + fr2.items,
71-
sink: r.sink,
72-
exportErr: fr2.exportErr,
73-
delay: r.delay + fr2.delay,
74-
}, nil
75-
}
76-
77-
func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig,
78-
r2 internal.Request,
79-
) ([]internal.Request, error) {
59+
func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) {
8060
if r.mergeErr != nil {
8161
return nil, r.mergeErr
8262
}
8363

8464
maxItems := cfg.MaxSizeItems
8565
if maxItems == 0 {
86-
r, err := r.Merge(ctx, r2)
87-
return []internal.Request{r}, err
66+
fr2 := r2.(*fakeRequest)
67+
if fr2.mergeErr != nil {
68+
return nil, fr2.mergeErr
69+
}
70+
return []internal.Request{
71+
&fakeRequest{
72+
items: r.items + fr2.items,
73+
sink: r.sink,
74+
exportErr: fr2.exportErr,
75+
delay: r.delay + fr2.delay,
76+
},
77+
}, nil
8878
}
8979

9080
var fr2 *fakeRequest

exporter/exporterhelper/internal/retry_sender_test.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -418,10 +418,6 @@ func (mer *mockErrorRequest) ItemsCount() int {
418418
return 7
419419
}
420420

421-
func (mer *mockErrorRequest) Merge(context.Context, internal.Request) (internal.Request, error) {
422-
return nil, nil
423-
}
424-
425421
func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
426422
return nil, nil
427423
}
@@ -468,10 +464,6 @@ func (m *mockRequest) ItemsCount() int {
468464
return m.cnt
469465
}
470466

471-
func (m *mockRequest) Merge(context.Context, internal.Request) (internal.Request, error) {
472-
return nil, nil
473-
}
474-
475467
func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
476468
return nil, nil
477469
}

exporter/exporterhelper/logs_batch.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,6 @@ import (
1111
"go.opentelemetry.io/collector/pdata/plog"
1212
)
1313

14-
// Merge merges the provided logs request into the current request and returns the merged request.
15-
func (req *logsRequest) Merge(_ context.Context, r2 Request) (Request, error) {
16-
lr2, ok2 := r2.(*logsRequest)
17-
if !ok2 {
18-
return nil, errors.New("invalid input type")
19-
}
20-
lr2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
21-
return req, nil
22-
}
23-
2414
// MergeSplit splits and/or merges the provided logs request and the current request into one or more requests
2515
// conforming with the MaxSizeConfig.
2616
func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
@@ -33,6 +23,11 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
3323
}
3424
}
3525

26+
if cfg.MaxSizeItems == 0 {
27+
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
28+
return []Request{req}, nil
29+
}
30+
3631
var (
3732
res []Request
3833
destReq *logsRequest

exporter/exporterhelper/logs_batch_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ import (
1919
func TestMergeLogs(t *testing.T) {
2020
lr1 := &logsRequest{ld: testdata.GenerateLogs(2)}
2121
lr2 := &logsRequest{ld: testdata.GenerateLogs(3)}
22-
res, err := lr1.Merge(context.Background(), lr2)
22+
res, err := lr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, lr2)
2323
require.NoError(t, err)
24-
assert.Equal(t, 5, res.(*logsRequest).ld.LogRecordCount())
24+
require.Equal(t, 5, res[0].(*logsRequest).ld.LogRecordCount())
2525
}
2626

2727
func TestMergeLogsInvalidInput(t *testing.T) {
2828
lr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
2929
lr2 := &logsRequest{ld: testdata.GenerateLogs(3)}
30-
_, err := lr1.Merge(context.Background(), lr2)
30+
_, err := lr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, lr2)
3131
require.Error(t, err)
3232
}
3333

exporter/exporterhelper/metrics_batch.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,6 @@ import (
1111
"go.opentelemetry.io/collector/pdata/pmetric"
1212
)
1313

14-
// Merge merges the provided metrics request into the current request and returns the merged request.
15-
func (req *metricsRequest) Merge(_ context.Context, r2 Request) (Request, error) {
16-
mr2, ok2 := r2.(*metricsRequest)
17-
if !ok2 {
18-
return nil, errors.New("invalid input type")
19-
}
20-
mr2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics())
21-
return req, nil
22-
}
23-
2414
// MergeSplit splits and/or merges the provided metrics request and the current request into one or more requests
2515
// conforming with the MaxSizeConfig.
2616
func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
@@ -33,6 +23,11 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max
3323
}
3424
}
3525

26+
if cfg.MaxSizeItems == 0 {
27+
req2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics())
28+
return []Request{req}, nil
29+
}
30+
3631
var (
3732
res []Request
3833
destReq *metricsRequest

exporter/exporterhelper/metrics_batch_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ import (
1818
func TestMergeMetrics(t *testing.T) {
1919
mr1 := &metricsRequest{md: testdata.GenerateMetrics(2)}
2020
mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)}
21-
res, err := mr1.Merge(context.Background(), mr2)
21+
res, err := mr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, mr2)
2222
require.NoError(t, err)
23-
assert.Equal(t, 5, res.(*metricsRequest).md.MetricCount())
23+
assert.Equal(t, 5, res[0].(*metricsRequest).md.MetricCount())
2424
}
2525

2626
func TestMergeMetricsInvalidInput(t *testing.T) {
2727
mr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
2828
mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)}
29-
_, err := mr1.Merge(context.Background(), mr2)
29+
_, err := mr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, mr2)
3030
require.Error(t, err)
3131
}
3232

exporter/exporterhelper/traces_batch.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,6 @@ import (
1111
"go.opentelemetry.io/collector/pdata/ptrace"
1212
)
1313

14-
// Merge merges the provided traces request into the current request and returns the merged request.
15-
func (req *tracesRequest) Merge(_ context.Context, r2 Request) (Request, error) {
16-
tr2, ok2 := r2.(*tracesRequest)
17-
if !ok2 {
18-
return nil, errors.New("invalid input type")
19-
}
20-
tr2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans())
21-
return req, nil
22-
}
23-
2414
// MergeSplit splits and/or merges the provided traces request and the current request into one or more requests
2515
// conforming with the MaxSizeConfig.
2616
func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
@@ -33,6 +23,11 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS
3323
}
3424
}
3525

26+
if cfg.MaxSizeItems == 0 {
27+
req2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans())
28+
return []Request{req}, nil
29+
}
30+
3631
var (
3732
res []Request
3833
destReq *tracesRequest

exporter/exporterhelper/traces_batch_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ import (
1818
func TestMergeTraces(t *testing.T) {
1919
tr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
2020
tr2 := &tracesRequest{td: testdata.GenerateTraces(3)}
21-
res, err := tr1.Merge(context.Background(), tr2)
21+
res, err := tr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, tr2)
2222
require.NoError(t, err)
23-
assert.Equal(t, 5, res.(*tracesRequest).td.SpanCount())
23+
assert.Equal(t, 5, res[0].(*tracesRequest).td.SpanCount())
2424
}
2525

2626
func TestMergeTracesInvalidInput(t *testing.T) {
2727
tr1 := &logsRequest{ld: testdata.GenerateLogs(2)}
2828
tr2 := &tracesRequest{td: testdata.GenerateTraces(3)}
29-
_, err := tr1.Merge(context.Background(), tr2)
29+
_, err := tr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, tr2)
3030
require.Error(t, err)
3131
}
3232

exporter/exporterhelper/xexporterhelper/profiles_batch.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,31 @@ import (
1212
"go.opentelemetry.io/collector/pdata/pprofile"
1313
)
1414

15-
// Merge merges two profiles requests into one.
16-
func (req *profilesRequest) Merge(_ context.Context, r2 exporterhelper.Request) (exporterhelper.Request, error) {
17-
tr2, ok2 := r2.(*profilesRequest)
18-
if !ok2 {
19-
return nil, errors.New("invalid input type")
20-
}
21-
tr2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles())
22-
return req, nil
23-
}
24-
2515
// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
2616
func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
17+
var req2 *profilesRequest
18+
if r2 != nil {
19+
var ok bool
20+
req2, ok = r2.(*profilesRequest)
21+
if !ok {
22+
return nil, errors.New("invalid input type")
23+
}
24+
}
25+
26+
if cfg.MaxSizeItems == 0 {
27+
req2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles())
28+
return []exporterhelper.Request{req}, nil
29+
}
30+
2731
var (
2832
res []exporterhelper.Request
2933
destReq *profilesRequest
3034
capacityLeft = cfg.MaxSizeItems
3135
)
32-
for _, r := range []exporterhelper.Request{req, r2} {
33-
if r == nil {
36+
for _, srcReq := range []*profilesRequest{req, req2} {
37+
if srcReq == nil {
3438
continue
3539
}
36-
srcReq, ok := r.(*profilesRequest)
37-
if !ok {
38-
return nil, errors.New("invalid input type")
39-
}
4040
if srcReq.pd.SampleCount() <= capacityLeft {
4141
if destReq == nil {
4242
destReq = srcReq

exporter/exporterhelper/xexporterhelper/profiles_batch_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,17 @@ import (
2121
func TestMergeProfiles(t *testing.T) {
2222
pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)}
2323
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
24-
res, err := pr1.Merge(context.Background(), pr2)
24+
res, err := pr1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr2)
2525
require.NoError(t, err)
26-
fmt.Fprintf(os.Stdout, "%#v\n", res.(*profilesRequest).pd)
27-
assert.Equal(t, 5, res.(*profilesRequest).pd.SampleCount())
26+
assert.Len(t, res, 1)
27+
fmt.Fprintf(os.Stdout, "%#v\n", res[0].(*profilesRequest).pd)
28+
assert.Equal(t, 5, res[0].(*profilesRequest).pd.SampleCount())
2829
}
2930

3031
func TestMergeProfilesInvalidInput(t *testing.T) {
3132
pr1 := &dummyRequest{}
3233
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
33-
_, err := pr2.Merge(context.Background(), pr1)
34+
_, err := pr2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, pr1)
3435
assert.Error(t, err)
3536
}
3637

@@ -154,10 +155,6 @@ func (req *dummyRequest) ItemsCount() int {
154155
return 1
155156
}
156157

157-
func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.Request) (exporterhelper.Request, error) {
158-
return nil, nil
159-
}
160-
161158
func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.Request) (
162159
[]exporterhelper.Request, error,
163160
) {

exporter/internal/queue/default_batcher.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,15 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
8989
idxList: []uint64{idx},
9090
}
9191
} else {
92-
mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req)
92+
// TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified
93+
mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req)
9394
if mergeErr != nil {
9495
qb.queue.OnProcessingFinished(idx, mergeErr)
9596
qb.currentBatchMu.Unlock()
9697
continue
9798
}
9899
qb.currentBatch = &batch{
99-
req: mergedReq,
100+
req: mergedReq[0],
100101
ctx: qb.currentBatch.ctx,
101102
idxList: append(qb.currentBatch.idxList, idx),
102103
}

exporter/internal/queue/fake_request_test.go

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -53,32 +53,25 @@ func (r *fakeRequest) ItemsCount() int {
5353
return r.items
5454
}
5555

56-
func (r *fakeRequest) Merge(_ context.Context,
57-
r2 internal.Request,
58-
) (internal.Request, error) {
59-
fr2 := r2.(*fakeRequest)
60-
if fr2.mergeErr != nil {
61-
return nil, fr2.mergeErr
62-
}
63-
return &fakeRequest{
64-
items: r.items + fr2.items,
65-
sink: r.sink,
66-
exportErr: fr2.exportErr,
67-
delay: r.delay + fr2.delay,
68-
}, nil
69-
}
70-
71-
func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig,
72-
r2 internal.Request,
73-
) ([]internal.Request, error) {
56+
func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) {
7457
if r.mergeErr != nil {
7558
return nil, r.mergeErr
7659
}
7760

7861
maxItems := cfg.MaxSizeItems
7962
if maxItems == 0 {
80-
r, err := r.Merge(ctx, r2)
81-
return []internal.Request{r}, err
63+
fr2 := r2.(*fakeRequest)
64+
if fr2.mergeErr != nil {
65+
return nil, fr2.mergeErr
66+
}
67+
return []internal.Request{
68+
&fakeRequest{
69+
items: r.items + fr2.items,
70+
sink: r.sink,
71+
exportErr: fr2.exportErr,
72+
delay: r.delay + fr2.delay,
73+
},
74+
}, nil
8275
}
8376

8477
var fr2 *fakeRequest

exporter/internal/request.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,9 @@ type Request interface {
1919
// sent. For example, for OTLP exporter, this value represents the number of spans,
2020
// metric data points or log records.
2121
ItemsCount() int
22-
// Merge is a function that merges this request with another one into a single request.
23-
// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is
24-
// marked as not mutable.
25-
// Experimental: This API is at the early stage of development and may change without backward compatibility
26-
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
27-
Merge(context.Context, Request) (Request, error)
2822
// MergeSplit is a function that merge and/or splits this request with another one into multiple requests based on the
2923
// configured limit provided in MaxSizeConfig.
24+
// MergeSplit does not split if all fields in MaxSizeConfig are not initialized (zero).
3025
// All the returned requests MUST have a number of items that does not exceed the maximum number of items.
3126
// Size of the last returned request MUST be less or equal than the size of any other returned request.
3227
// The original request MUST not be mutated if error is returned after mutation or if the exporter is

0 commit comments

Comments
 (0)