Skip to content

Commit ba96c23

Browse files
committed
Replace exporterhelper Request.Export with passing a ConsumeRequest func
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 0e9e259 commit ba96c23

30 files changed

+768
-753
lines changed

.chloggen/rm-request-export.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Remove the Request.Export function in favor of an equivalent request consume func in the New[Traces|Metrics|Logs|Profiles]Request
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12637]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/constants.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ var (
1212
errNilConfig = errors.New("nil config")
1313
// errNilLogger is returned when a logger is nil
1414
errNilLogger = errors.New("nil logger")
15-
// errNilPushTraceData is returned when a nil PushTraces is given.
16-
errNilPushTraceData = errors.New("nil PushTraces")
17-
// errNilPushMetricsData is returned when a nil PushMetrics is given.
18-
errNilPushMetricsData = errors.New("nil PushMetrics")
19-
// errNilPushLogsData is returned when a nil PushLogs is given.
20-
errNilPushLogsData = errors.New("nil PushLogs")
15+
// errNilConsumeRequest is returned when a nil PushTraces is given.
16+
errNilConsumeRequest = errors.New("nil RequestConsumeFunc")
17+
// errNilPushTraces is returned when a nil PushTraces is given.
18+
errNilPushTraces = errors.New("nil PushTraces")
19+
// errNilPushMetrics is returned when a nil PushMetrics is given.
20+
errNilPushMetrics = errors.New("nil PushMetrics")
21+
// errNilPushLogs is returned when a nil PushLogs is given.
22+
errNilPushLogs = errors.New("nil PushLogs")
2123
// errNilTracesConverter is returned when a nil RequestFromTracesFunc is given.
2224
errNilTracesConverter = errors.New("nil RequestFromTracesFunc")
2325
// errNilMetricsConverter is returned when a nil RequestFromMetricsFunc is given.

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type BaseExporter struct {
5050
batcherCfg exporterbatcher.Config
5151
}
5252

53-
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...Option) (*BaseExporter, error) {
53+
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher func(context.Context, request.Request) error, options ...Option) (*BaseExporter, error) {
5454
be := &BaseExporter{
5555
Set: set,
5656
timeoutCfg: NewDefaultTimeoutConfig(),
@@ -62,15 +62,13 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O
6262
}
6363
}
6464

65-
//nolint: staticcheck
65+
//nolint:staticcheck
6666
if be.batcherCfg.MinSizeItems != nil || be.batcherCfg.MaxSizeItems != nil {
6767
set.Logger.Warn("Using of deprecated fields `min_size_items` and `max_size_items`")
6868
}
6969

7070
// Consumer Sender is always initialized.
71-
be.firstSender = newSender(func(ctx context.Context, req request.Request) error {
72-
return req.Export(ctx)
73-
})
71+
be.firstSender = newSender(pusher)
7472

7573
// Next setup the timeout Sender since we want the timeout to control only the export functionality.
7674
// Only initialize if not explicitly disabled.

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,8 @@ var (
3636
}()
3737
)
3838

39-
func newNoopExportSender() Sender[request.Request] {
40-
return newSender(func(ctx context.Context, req request.Request) error {
41-
select {
42-
case <-ctx.Done():
43-
return ctx.Err() // Returns the cancellation error
44-
default:
45-
return req.Export(ctx)
46-
}
47-
})
48-
}
49-
5039
func TestBaseExporter(t *testing.T) {
51-
be, err := NewBaseExporter(defaultSettings, defaultSignal)
40+
be, err := NewBaseExporter(defaultSettings, defaultSignal, noopExport)
5241
require.NoError(t, err)
5342
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
5443
require.NoError(t, be.Shutdown(context.Background()))
@@ -57,7 +46,7 @@ func TestBaseExporter(t *testing.T) {
5746
func TestBaseExporterWithOptions(t *testing.T) {
5847
want := errors.New("my error")
5948
be, err := NewBaseExporter(
60-
defaultSettings, defaultSignal,
49+
defaultSettings, defaultSignal, noopExport,
6150
WithStart(func(context.Context, component.Host) error { return want }),
6251
WithShutdown(func(context.Context) error { return want }),
6352
WithTimeout(NewDefaultTimeoutConfig()),
@@ -68,18 +57,18 @@ func TestBaseExporterWithOptions(t *testing.T) {
6857
}
6958

7059
func TestQueueOptionsWithRequestExporter(t *testing.T) {
71-
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
60+
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
7261
WithRetry(configretry.NewDefaultBackOffConfig()))
7362
require.NoError(t, err)
7463
require.Nil(t, bs.encoding)
75-
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
64+
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
7665
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
7766
require.Error(t, err)
7867

7968
qCfg := exporterqueue.NewDefaultConfig()
8069
storageID := component.NewID(component.MustNewType("test"))
8170
qCfg.StorageID = &storageID
82-
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
71+
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
8372
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
8473
WithRetry(configretry.NewDefaultBackOffConfig()),
8574
WithRequestQueue(qCfg, nil))
@@ -94,14 +83,13 @@ func TestBaseExporterLogging(t *testing.T) {
9483
rCfg.Enabled = false
9584
qCfg := exporterqueue.NewDefaultConfig()
9685
qCfg.Enabled = false
97-
bs, err := NewBaseExporter(set, defaultSignal,
86+
bs, err := NewBaseExporter(set, defaultSignal, errExport,
9887
WithRequestQueue(qCfg, newFakeEncoding(&requesttest.FakeRequest{})),
9988
WithBatcher(exporterbatcher.NewDefaultConfig()),
10089
WithRetry(rCfg))
10190
require.NoError(t, err)
10291
require.NoError(t, bs.Start(context.Background(), componenttest.NewNopHost()))
103-
sink := requesttest.NewSink()
104-
sendErr := bs.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: errors.New("my error")})
92+
sendErr := bs.Send(context.Background(), &requesttest.FakeRequest{Items: 2})
10593
require.Error(t, sendErr)
10694

10795
require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 2)
@@ -155,16 +143,22 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
155143
set := exportertest.NewNopSettings(exportertest.NopType)
156144
logger, observed := observer.New(zap.ErrorLevel)
157145
set.Logger = zap.New(logger)
158-
be, err := NewBaseExporter(set, pipeline.SignalLogs, tt.queueOptions...)
146+
be, err := NewBaseExporter(set, pipeline.SignalLogs, errExport, tt.queueOptions...)
159147
require.NoError(t, err)
160148
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
161-
sink := requesttest.NewSink()
162-
mockR := &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: errors.New("some error")}
149+
mockR := &requesttest.FakeRequest{Items: 2}
163150
require.Error(t, be.Send(context.Background(), mockR))
164151
assert.Len(t, observed.All(), 1)
165152
assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message)
166153
require.NoError(t, be.Shutdown(context.Background()))
167-
assert.Empty(t, 0, sink.RequestsCount())
168154
})
169155
}
170156
}
157+
158+
func errExport(context.Context, request.Request) error {
159+
return errors.New("my error")
160+
}
161+
162+
func noopExport(context.Context, request.Request) error {
163+
return nil
164+
}

0 commit comments

Comments
 (0)