Skip to content

Commit 922b424

Browse files
committed
Replace exporterhelper Request.Export with passing a ConsumeRequest func
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 9204a06 commit 922b424

24 files changed

+650
-667
lines changed

exporter/exporterhelper/constants.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ var (
1212
errNilConfig = errors.New("nil config")
1313
// errNilLogger is returned when a logger is nil
1414
errNilLogger = errors.New("nil logger")
15-
// errNilPushTraceData is returned when a nil PushTraces is given.
16-
errNilPushTraceData = errors.New("nil PushTraces")
17-
// errNilPushMetricsData is returned when a nil PushMetrics is given.
18-
errNilPushMetricsData = errors.New("nil PushMetrics")
19-
// errNilPushLogsData is returned when a nil PushLogs is given.
20-
errNilPushLogsData = errors.New("nil PushLogs")
15+
// errNilConsumeRequest is returned when a nil PushTraces is given.
16+
errNilConsumeRequest = errors.New("nil RequestConsumeFunc")
17+
// errNilPushTraces is returned when a nil PushTraces is given.
18+
errNilPushTraces = errors.New("nil PushTraces")
19+
// errNilPushMetrics is returned when a nil PushMetrics is given.
20+
errNilPushMetrics = errors.New("nil PushMetrics")
21+
// errNilPushLogs is returned when a nil PushLogs is given.
22+
errNilPushLogs = errors.New("nil PushLogs")
2123
// errNilTracesConverter is returned when a nil RequestFromTracesFunc is given.
2224
errNilTracesConverter = errors.New("nil RequestFromTracesFunc")
2325
// errNilMetricsConverter is returned when a nil RequestFromMetricsFunc is given.

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type BaseExporter struct {
5050
batcherCfg exporterbatcher.Config
5151
}
5252

53-
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...Option) (*BaseExporter, error) {
53+
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher func(context.Context, request.Request) error, options ...Option) (*BaseExporter, error) {
5454
be := &BaseExporter{
5555
Set: set,
5656
timeoutCfg: NewDefaultTimeoutConfig(),
@@ -68,9 +68,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O
6868
}
6969

7070
// Consumer Sender is always initialized.
71-
be.firstSender = newSender(func(ctx context.Context, req request.Request) error {
72-
return req.Export(ctx)
73-
})
71+
be.firstSender = newSender(pusher)
7472

7573
// Next setup the timeout Sender since we want the timeout to control only the export functionality.
7674
// Only initialize if not explicitly disabled.

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,8 @@ var (
3636
}()
3737
)
3838

39-
func newNoopExportSender() Sender[request.Request] {
40-
return newSender(func(ctx context.Context, req request.Request) error {
41-
select {
42-
case <-ctx.Done():
43-
return ctx.Err() // Returns the cancellation error
44-
default:
45-
return req.Export(ctx)
46-
}
47-
})
48-
}
49-
5039
func TestBaseExporter(t *testing.T) {
51-
be, err := NewBaseExporter(defaultSettings, defaultSignal)
40+
be, err := NewBaseExporter(defaultSettings, defaultSignal, noopExport)
5241
require.NoError(t, err)
5342
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
5443
require.NoError(t, be.Shutdown(context.Background()))
@@ -57,7 +46,7 @@ func TestBaseExporter(t *testing.T) {
5746
func TestBaseExporterWithOptions(t *testing.T) {
5847
want := errors.New("my error")
5948
be, err := NewBaseExporter(
60-
defaultSettings, defaultSignal,
49+
defaultSettings, defaultSignal, noopExport,
6150
WithStart(func(context.Context, component.Host) error { return want }),
6251
WithShutdown(func(context.Context) error { return want }),
6352
WithTimeout(NewDefaultTimeoutConfig()),
@@ -68,18 +57,18 @@ func TestBaseExporterWithOptions(t *testing.T) {
6857
}
6958

7059
func TestQueueOptionsWithRequestExporter(t *testing.T) {
71-
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
60+
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
7261
WithRetry(configretry.NewDefaultBackOffConfig()))
7362
require.NoError(t, err)
7463
require.Nil(t, bs.encoding)
75-
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
64+
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
7665
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
7766
require.Error(t, err)
7867

7968
qCfg := exporterqueue.NewDefaultConfig()
8069
storageID := component.NewID(component.MustNewType("test"))
8170
qCfg.StorageID = &storageID
82-
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
71+
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
8372
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
8473
WithRetry(configretry.NewDefaultBackOffConfig()),
8574
WithRequestQueue(qCfg, nil))
@@ -94,14 +83,13 @@ func TestBaseExporterLogging(t *testing.T) {
9483
rCfg.Enabled = false
9584
qCfg := exporterqueue.NewDefaultConfig()
9685
qCfg.Enabled = false
97-
bs, err := NewBaseExporter(set, defaultSignal,
86+
bs, err := NewBaseExporter(set, defaultSignal, errExport,
9887
WithRequestQueue(qCfg, newFakeEncoding(&requesttest.FakeRequest{})),
9988
WithBatcher(exporterbatcher.NewDefaultConfig()),
10089
WithRetry(rCfg))
10190
require.NoError(t, err)
10291
require.NoError(t, bs.Start(context.Background(), componenttest.NewNopHost()))
103-
sink := requesttest.NewSink()
104-
sendErr := bs.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: errors.New("my error")})
92+
sendErr := bs.Send(context.Background(), &requesttest.FakeRequest{Items: 2})
10593
require.Error(t, sendErr)
10694

10795
require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 2)
@@ -155,16 +143,22 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
155143
set := exportertest.NewNopSettings(exportertest.NopType)
156144
logger, observed := observer.New(zap.ErrorLevel)
157145
set.Logger = zap.New(logger)
158-
be, err := NewBaseExporter(set, pipeline.SignalLogs, tt.queueOptions...)
146+
be, err := NewBaseExporter(set, pipeline.SignalLogs, errExport, tt.queueOptions...)
159147
require.NoError(t, err)
160148
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
161-
sink := requesttest.NewSink()
162-
mockR := &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: errors.New("some error")}
149+
mockR := &requesttest.FakeRequest{Items: 2}
163150
require.Error(t, be.Send(context.Background(), mockR))
164151
assert.Len(t, observed.All(), 1)
165152
assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message)
166153
require.NoError(t, be.Shutdown(context.Background()))
167-
assert.Empty(t, 0, sink.RequestsCount())
168154
})
169155
}
170156
}
157+
158+
func errExport(context.Context, request.Request) error {
159+
return errors.New("my error")
160+
}
161+
162+
func noopExport(context.Context, request.Request) error {
163+
return nil
164+
}

exporter/exporterhelper/internal/batcher/default_batcher_test.go

Lines changed: 56 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616

1717
"go.opentelemetry.io/collector/component/componenttest"
1818
"go.opentelemetry.io/collector/exporter/exporterbatcher"
19-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2019
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2120
)
2221

@@ -44,26 +43,26 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
4443
MinSize: 0,
4544
}
4645

47-
ba, err := NewBatcher(cfg,
48-
func(ctx context.Context, req request.Request) error { return req.Export(ctx) },
49-
tt.maxWorkers)
46+
sink := requesttest.NewSink()
47+
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
5048
require.NoError(t, err)
5149
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
5250
t.Cleanup(func() {
5351
require.NoError(t, ba.Shutdown(context.Background()))
5452
})
5553

5654
done := newFakeDone()
57-
sink := requesttest.NewSink()
58-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}, done)
59-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, ExportErr: errors.New("transient error"), Sink: sink}, done)
60-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink}, done)
61-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}, done)
62-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 35, Sink: sink}, done)
63-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}, done)
55+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8}, done)
56+
sink.SetExportErr(errors.New("transient error"))
57+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8}, done)
58+
<-time.After(10 * time.Millisecond)
59+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 17}, done)
60+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 13}, done)
61+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 35}, done)
62+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 2}, done)
6463
assert.Eventually(t, func() bool {
6564
return sink.RequestsCount() == 5 && sink.ItemsCount() == 75
66-
}, 30*time.Millisecond, 10*time.Millisecond)
65+
}, 1*time.Second, 10*time.Millisecond)
6766
// Check that done callback is called for the right amount of times.
6867
assert.EqualValues(t, 1, done.errors.Load())
6968
assert.EqualValues(t, 5, done.success.Load())
@@ -95,30 +94,30 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
9594
MinSize: 10,
9695
}
9796

98-
ba, err := NewBatcher(cfg,
99-
func(ctx context.Context, req request.Request) error { return req.Export(ctx) },
100-
tt.maxWorkers)
97+
sink := requesttest.NewSink()
98+
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
10199
require.NoError(t, err)
102100
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
103101

104102
done := newFakeDone()
105-
sink := requesttest.NewSink()
106103
// These two requests will be dropped because of export error.
107-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}, done)
108-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, ExportErr: errors.New("transient error"), Sink: sink}, done)
104+
sink.SetExportErr(errors.New("transient error"))
105+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8}, done)
106+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8}, done)
107+
<-time.After(10 * time.Millisecond)
109108

110-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 7, Sink: sink}, done)
109+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 7}, done)
111110
// This requests will be dropped because of merge error.
112-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, MergeErr: errors.New("transient error"), Sink: sink}, done)
111+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, MergeErr: errors.New("transient error")}, done)
113112

114-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}, done)
115-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 35, Sink: sink}, done)
116-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}, done)
113+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 13}, done)
114+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 35}, done)
115+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 2}, done)
117116

118-
// Only the requests with 13 and 35 will be flushed.
117+
// Only the requests with 7+13 and 35 will be flushed.
119118
assert.Eventually(t, func() bool {
120119
return sink.RequestsCount() == 2 && sink.ItemsCount() == 55
121-
}, 30*time.Millisecond, 10*time.Millisecond)
120+
}, 1*time.Second, 10*time.Millisecond)
122121

123122
require.NoError(t, ba.Shutdown(context.Background()))
124123

@@ -161,28 +160,26 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
161160
MinSize: 100,
162161
}
163162

164-
ba, err := NewBatcher(cfg,
165-
func(ctx context.Context, req request.Request) error { return req.Export(ctx) },
166-
tt.maxWorkers)
163+
sink := requesttest.NewSink()
164+
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
167165
require.NoError(t, err)
168166
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
169167
t.Cleanup(func() {
170168
require.NoError(t, ba.Shutdown(context.Background()))
171169
})
172170

173171
done := newFakeDone()
174-
sink := requesttest.NewSink()
175-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}, done)
176-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink}, done)
172+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8}, done)
173+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 17}, done)
177174
// This requests will be dropped because of merge error.
178-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, MergeErr: errors.New("transient error"), Sink: sink}, done)
175+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, MergeErr: errors.New("transient error")}, done)
179176

180-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}, done)
181-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 35, Sink: sink}, done)
182-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}, done)
177+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 13}, done)
178+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 35}, done)
179+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 2}, done)
183180
assert.Eventually(t, func() bool {
184181
return sink.RequestsCount() == 1 && sink.ItemsCount() == 75
185-
}, 100*time.Millisecond, 10*time.Millisecond)
182+
}, 1*time.Second, 10*time.Millisecond)
186183

187184
// Check that done callback is called for the right amount of times.
188185
assert.EqualValues(t, 1, done.errors.Load())
@@ -220,33 +217,31 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
220217
MaxSize: 100,
221218
}
222219

223-
ba, err := NewBatcher(cfg,
224-
func(ctx context.Context, req request.Request) error { return req.Export(ctx) },
225-
tt.maxWorkers)
220+
sink := requesttest.NewSink()
221+
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
226222
require.NoError(t, err)
227223
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
228224

229225
done := newFakeDone()
230-
sink := requesttest.NewSink()
231226
// This requests will be dropped because of merge error.
232-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, MergeErr: errors.New("transient error"), Sink: sink}, done)
233-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}, done)
234-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink}, done)
227+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, MergeErr: errors.New("transient error")}, done)
228+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8}, done)
229+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 17}, done)
235230
// This requests will be dropped because of merge error.
236-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, MergeErr: errors.New("transient error"), Sink: sink}, done)
231+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, MergeErr: errors.New("transient error")}, done)
237232

238-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}, done)
239-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 35, Sink: sink}, done)
240-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}, done)
241-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 30, Sink: sink}, done)
233+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 13}, done)
234+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 35}, done)
235+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 2}, done)
236+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 30}, done)
242237
assert.Eventually(t, func() bool {
243238
return sink.RequestsCount() == 1 && sink.ItemsCount() == 100
244-
}, 100*time.Millisecond, 10*time.Millisecond)
239+
}, 1*time.Second, 10*time.Millisecond)
245240

246-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 900, Sink: sink}, done)
241+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 900}, done)
247242
assert.Eventually(t, func() bool {
248243
return sink.RequestsCount() == 10 && sink.ItemsCount() == 1000
249-
}, 100*time.Millisecond, 10*time.Millisecond)
244+
}, 1*time.Second, 10*time.Millisecond)
250245

251246
// At this point the 7th not failing request is still pending.
252247
assert.EqualValues(t, 6, done.success.Load())
@@ -269,16 +264,14 @@ func TestDefaultBatcher_Shutdown(t *testing.T) {
269264
batchCfg.MinSize = 10
270265
batchCfg.FlushTimeout = 100 * time.Second
271266

272-
ba, err := NewBatcher(batchCfg,
273-
func(ctx context.Context, req request.Request) error { return req.Export(ctx) },
274-
2)
267+
sink := requesttest.NewSink()
268+
ba, err := NewBatcher(batchCfg, sink.Export, 2)
275269
require.NoError(t, err)
276270
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
277271

278272
done := newFakeDone()
279-
sink := requesttest.NewSink()
280-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink}, done)
281-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}, done)
273+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 1}, done)
274+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 2}, done)
282275

283276
assert.EqualValues(t, 0, sink.RequestsCount())
284277
assert.EqualValues(t, 0, sink.ItemsCount())
@@ -298,19 +291,19 @@ func TestDefaultBatcher_MergeError(t *testing.T) {
298291
batchCfg.MinSize = 5
299292
batchCfg.MaxSize = 7
300293

301-
ba, err := NewBatcher(batchCfg,
302-
func(ctx context.Context, req request.Request) error { return req.Export(ctx) },
303-
2)
294+
sink := requesttest.NewSink()
295+
ba, err := NewBatcher(batchCfg, sink.Export, 2)
296+
304297
require.NoError(t, err)
305298
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
306299
t.Cleanup(func() {
307300
require.NoError(t, ba.Shutdown(context.Background()))
308301
})
309302

310303
done := newFakeDone()
311-
sink := requesttest.NewSink()
312-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 9, Sink: sink}, done)
313-
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink, ExportErr: errors.New("transient error")}, done)
304+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 9}, done)
305+
sink.SetExportErr(errors.New("transient error"))
306+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 4}, done)
314307

315308
assert.Eventually(t, func() bool {
316309
return sink.RequestsCount() == 1 && sink.ItemsCount() == 7

0 commit comments

Comments
 (0)