From dfa9a23df25fae5a7d4127a51ec1642aa62c8d90 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Sat, 1 Feb 2025 13:55:12 -0800 Subject: [PATCH] Cleanup initialization order for exporterhelper Signed-off-by: Bogdan Drutu --- .chloggen/clenup-initialization.yaml | 25 ++++ .../exporterhelper/internal/base_exporter.go | 140 ++++++++++-------- .../internal/base_exporter_test.go | 10 +- .../exporterhelper/internal/batch_sender.go | 18 ++- .../internal/obs_report_sender.go | 11 +- .../exporterhelper/internal/queue_sender.go | 42 +++--- .../internal/queue_sender_test.go | 14 +- .../exporterhelper/internal/request_sender.go | 21 +-- .../exporterhelper/internal/retry_sender.go | 9 +- .../internal/retry_sender_test.go | 10 +- .../exporterhelper/internal/timeout_sender.go | 4 +- 11 files changed, 174 insertions(+), 130 deletions(-) create mode 100644 .chloggen/clenup-initialization.yaml diff --git a/.chloggen/clenup-initialization.yaml b/.chloggen/clenup-initialization.yaml new file mode 100644 index 00000000000..e7ffeed1e73 --- /dev/null +++ b/.chloggen/clenup-initialization.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix bug that the exporter with new batcher may have been marked as non mutation. + +# One or more tracking issues or pull requests related to the change +issues: [12239] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Only affects users that manually turned on `exporter.UsePullingBasedExporterQueueBatcher` featuregate. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index ea68353ab18..a2b2991361b 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -32,7 +32,7 @@ var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegis featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"), ) -type ObsrepSenderFactory = func(obsrep *ObsReport) Sender[internal.Request] +type ObsrepSenderFactory = func(obsrep *ObsReport, next Sender[internal.Request]) Sender[internal.Request] // Option apply changes to BaseExporter. type Option func(*BaseExporter) error @@ -52,17 +52,20 @@ type BaseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. - BatchSender Sender[internal.Request] - QueueSender Sender[internal.Request] - ObsrepSender Sender[internal.Request] - RetrySender Sender[internal.Request] - TimeoutSender *TimeoutSender // TimeoutSender is always initialized. + BatchSender Sender[internal.Request] + QueueSender Sender[internal.Request] + ObsrepSender Sender[internal.Request] + RetrySender Sender[internal.Request] + + firstSender Sender[internal.Request] ConsumerOptions []consumer.Option - queueCfg exporterqueue.Config + timeoutCfg TimeoutConfig + retryCfg configretry.BackOffConfig queueFactory exporterqueue.Factory[internal.Request] - BatcherCfg exporterbatcher.Config + queueCfg exporterqueue.Config + batcherCfg exporterbatcher.Config } func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) { @@ -72,50 +75,51 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe } be := &BaseExporter{ - BatchSender: &BaseSender[internal.Request]{}, - QueueSender: &BaseSender[internal.Request]{}, - ObsrepSender: osf(obsReport), - RetrySender: &BaseSender[internal.Request]{}, - TimeoutSender: &TimeoutSender{cfg: NewDefaultTimeoutConfig()}, - - Set: set, + timeoutCfg: NewDefaultTimeoutConfig(), + Set: set, } for _, op := range options { - err = multierr.Append(err, op(be)) + if err = op(be); err != nil { + return nil, err + } } - if err != nil { - return nil, err + + // TimeoutSender is always initialized. + be.firstSender = &TimeoutSender{cfg: be.timeoutCfg} + if be.retryCfg.Enabled { + be.RetrySender = newRetrySender(be.retryCfg, set, be.firstSender) + be.firstSender = be.RetrySender + } + + be.ObsrepSender = osf(obsReport, be.firstSender) + be.firstSender = be.ObsrepSender + + if be.batcherCfg.Enabled { + // Batcher mutates the data. + be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})) + } + + if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled || + usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled && !be.queueCfg.Enabled { + concurrencyLimit := int64(0) + if be.queueCfg.Enabled { + concurrencyLimit = int64(be.queueCfg.NumConsumers) + } + be.BatchSender = NewBatchSender(be.batcherCfg, set, concurrencyLimit, be.firstSender) + be.firstSender = be.BatchSender } if be.queueCfg.Enabled { qSet := exporterqueue.Settings{ Signal: signal, - ExporterSettings: be.Set, + ExporterSettings: set, } - q := be.queueFactory(context.Background(), qSet, be.queueCfg) - q, err = newObsQueue(qSet, q) + be.QueueSender, err = NewQueueSender(be.queueFactory, qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender) if err != nil { return nil, err } - be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.BatcherCfg) - } - - if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled || - usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { - bs := NewBatchSender(be.BatcherCfg, be.Set) - be.BatchSender = bs - } - - be.connectSenders() - - if bs, ok := be.BatchSender.(*BatchSender); ok { - // If queue sender is enabled assign to the batch sender the same number of workers. - if qs, ok := be.QueueSender.(*QueueSender); ok { - bs.concurrencyLimit = int64(qs.numConsumers) - } - // Batcher sender mutates the data. - be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})) + be.firstSender = be.QueueSender } return be, nil @@ -123,7 +127,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe // Send sends the request using the first sender in the chain. func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error { - err := be.QueueSender.Send(ctx, req) + err := be.firstSender.Send(ctx, req) if err != nil { be.Set.Logger.Error("Exporting failed. Rejecting data."+be.ExportFailureMessage, zap.Error(err), zap.Int("rejected_items", req.ItemsCount())) @@ -131,39 +135,47 @@ func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error { return err } -// connectSenders connects the senders in the predefined order. -func (be *BaseExporter) connectSenders() { - be.QueueSender.SetNextSender(be.BatchSender) - be.BatchSender.SetNextSender(be.ObsrepSender) - be.ObsrepSender.SetNextSender(be.RetrySender) - be.RetrySender.SetNextSender(be.TimeoutSender) -} - func (be *BaseExporter) Start(ctx context.Context, host component.Host) error { // First start the wrapped exporter. if err := be.StartFunc.Start(ctx, host); err != nil { return err } - // If no error then start the BatchSender. - if err := be.BatchSender.Start(ctx, host); err != nil { - return err + if be.BatchSender != nil { + // If no error then start the BatchSender. + if err := be.BatchSender.Start(ctx, host); err != nil { + return err + } } // Last start the queueSender. - return be.QueueSender.Start(ctx, host) + if be.QueueSender != nil { + return be.QueueSender.Start(ctx, host) + } + + return nil } func (be *BaseExporter) Shutdown(ctx context.Context) error { - return multierr.Combine( - // First shutdown the retry sender, so the queue sender can flush the queue without retries. - be.RetrySender.Shutdown(ctx), - // Then shutdown the batch sender - be.BatchSender.Shutdown(ctx), - // Then shutdown the queue sender. - be.QueueSender.Shutdown(ctx), - // Last shutdown the wrapped exporter itself. - be.ShutdownFunc.Shutdown(ctx)) + var err error + + // First shutdown the retry sender, so the queue sender can flush the queue without retries. + if be.RetrySender != nil { + err = multierr.Append(err, be.RetrySender.Shutdown(ctx)) + } + + // Then shutdown the batch sender + if be.BatchSender != nil { + err = multierr.Append(err, be.BatchSender.Shutdown(ctx)) + } + + // Then shutdown the queue sender. + if be.QueueSender != nil { + err = multierr.Append(err, be.QueueSender.Shutdown(ctx)) + } + + // Last shutdown the wrapped exporter itself. + return multierr.Append(err, be.ShutdownFunc.Shutdown(ctx)) } // WithStart overrides the default Start function for an exporter. @@ -188,7 +200,7 @@ func WithShutdown(shutdown component.ShutdownFunc) Option { // The default TimeoutConfig is 5 seconds. func WithTimeout(timeoutConfig TimeoutConfig) Option { return func(o *BaseExporter) error { - o.TimeoutSender.cfg = timeoutConfig + o.timeoutCfg = timeoutConfig return nil } } @@ -201,7 +213,7 @@ func WithRetry(config configretry.BackOffConfig) Option { o.ExportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors." return nil } - o.RetrySender = newRetrySender(config, o.Set) + o.retryCfg = config return nil } } @@ -268,7 +280,7 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func WithBatcher(cfg exporterbatcher.Config) Option { return func(o *BaseExporter) error { - o.BatcherCfg = cfg + o.batcherCfg = cfg return nil } } diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index 12c56bb76a0..11dc47b4d00 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -33,8 +33,14 @@ var ( }() ) -func newNoopObsrepSender(*ObsReport) Sender[internal.Request] { - return &BaseSender[internal.Request]{} +type noopSender struct { + component.StartFunc + component.ShutdownFunc + SendFunc[internal.Request] +} + +func newNoopObsrepSender(_ *ObsReport, next Sender[internal.Request]) Sender[internal.Request] { + return &noopSender{SendFunc: next.Send} } func TestBaseExporter(t *testing.T) { diff --git a/exporter/exporterhelper/internal/batch_sender.go b/exporter/exporterhelper/internal/batch_sender.go index 4cb3ace63b0..51aa700d3ab 100644 --- a/exporter/exporterhelper/internal/batch_sender.go +++ b/exporter/exporterhelper/internal/batch_sender.go @@ -23,8 +23,8 @@ import ( // - cfg.FlushTimeout is elapsed since the timestamp when the previous batch was sent out. // - concurrencyLimit is reached. type BatchSender struct { - BaseSender[internal.Request] - cfg exporterbatcher.Config + cfg exporterbatcher.Config + next Sender[internal.Request] // concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher. // If this number is reached and all the goroutines are busy, the batch will be sent right away. @@ -43,11 +43,13 @@ type BatchSender struct { stopped *atomic.Bool } -// newBatchSender returns a new batch consumer component. -func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender { +// NewBatchSender returns a new batch consumer component. +func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings, concurrencyLimit int64, next Sender[internal.Request]) *BatchSender { bs := &BatchSender{ - activeBatch: newEmptyBatch(), cfg: cfg, + next: next, + concurrencyLimit: concurrencyLimit, + activeBatch: newEmptyBatch(), logger: set.Logger, shutdownCh: nil, shutdownCompleteCh: make(chan struct{}), @@ -119,7 +121,7 @@ func newEmptyBatch() *batch { // Caller must hold the lock. func (bs *BatchSender) exportActiveBatch() { go func(b *batch) { - b.err = bs.NextSender.Send(b.ctx, b.request) + b.err = bs.next.Send(b.ctx, b.request) close(b.done) bs.activeRequests.Add(-b.requestsBlocked) }(bs.activeBatch) @@ -138,7 +140,7 @@ func (bs *BatchSender) isActiveBatchReady() bool { func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error { // Stopped batch sender should act as pass-through to allow the queue to be drained. if bs.stopped.Load() { - return bs.NextSender.Send(ctx, req) + return bs.next.Send(ctx, req) } if bs.cfg.MaxSizeItems > 0 { @@ -190,7 +192,7 @@ func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Req // Intentionally do not put the last request in the active batch to not block it. // TODO: Consider including the partial request in the error to avoid double publishing. for _, r := range reqs { - if err := bs.NextSender.Send(ctx, r); err != nil { + if err := bs.next.Send(ctx, r); err != nil { return err } } diff --git a/exporter/exporterhelper/internal/obs_report_sender.go b/exporter/exporterhelper/internal/obs_report_sender.go index e72e62d7128..d33a43e0e0d 100644 --- a/exporter/exporterhelper/internal/obs_report_sender.go +++ b/exporter/exporterhelper/internal/obs_report_sender.go @@ -6,23 +6,26 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe import ( "context" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/internal" ) type obsReportSender[K internal.Request] struct { - BaseSender[K] + component.StartFunc + component.ShutdownFunc obsrep *ObsReport + next Sender[K] } -func NewObsReportSender[K internal.Request](obsrep *ObsReport) Sender[K] { - return &obsReportSender[K]{obsrep: obsrep} +func NewObsReportSender[K internal.Request](obsrep *ObsReport, next Sender[K]) Sender[K] { + return &obsReportSender[K]{obsrep: obsrep, next: next} } func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error { c := ors.obsrep.StartOp(ctx) items := req.ItemsCount() // Forward the data to the next consumer (this pusher is the next). - err := ors.NextSender.Send(c, req) + err := ors.next.Send(c, req) ors.obsrep.EndOp(c, items, err) return err } diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 978c83aa6bf..35d0b45cebb 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -11,7 +11,6 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/internal" @@ -67,43 +66,42 @@ func (qCfg *QueueConfig) Validate() error { } type QueueSender struct { - BaseSender[internal.Request] - queue exporterqueue.Queue[internal.Request] - numConsumers int - batcher queue.Batcher - consumers *queue.Consumers[internal.Request] - exporterID component.ID - logger *zap.Logger + queue exporterqueue.Queue[internal.Request] + batcher queue.Batcher + consumers *queue.Consumers[internal.Request] } func NewQueueSender( - q exporterqueue.Queue[internal.Request], - set exporter.Settings, - numConsumers int, + qf exporterqueue.Factory[internal.Request], + qSet exporterqueue.Settings, + qCfg exporterqueue.Config, + bCfg exporterbatcher.Config, exportFailureMessage string, - batcherCfg exporterbatcher.Config, -) *QueueSender { + next Sender[internal.Request], +) (*QueueSender, error) { + q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg)) + if err != nil { + return nil, err + } + qs := &QueueSender{ - queue: q, - numConsumers: numConsumers, - exporterID: set.ID, - logger: set.Logger, + queue: q, } exportFunc := func(ctx context.Context, req internal.Request) error { - err := qs.NextSender.Send(ctx, req) + err := next.Send(ctx, req) if err != nil { - set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, + qSet.ExporterSettings.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) } return err } if usePullingBasedExporterQueueBatcher.IsEnabled() { - qs.batcher, _ = queue.NewBatcher(batcherCfg, q, exportFunc, numConsumers) + qs.batcher, _ = queue.NewBatcher(bCfg, q, exportFunc, qCfg.NumConsumers) } else { - qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, exportFunc) + qs.consumers = queue.NewQueueConsumers[internal.Request](q, qCfg.NumConsumers, exportFunc) } - return qs + return qs, nil } // Start is invoked during service startup. diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 9d8c1e76fb6..8bc9a080619 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -502,14 +502,12 @@ func TestQueueSenderNoStartShutdown(t *testing.T) { defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() set := exportertest.NewNopSettings() set.ID = exporterID - queue := exporterqueue.NewMemoryQueueFactory[internal.Request]()( - context.Background(), - exporterqueue.Settings{ - Signal: pipeline.SignalTraces, - ExporterSettings: set, - }, - exporterqueue.NewDefaultConfig()) - qs := NewQueueSender(queue, set, 1, "", exporterbatcher.NewDefaultConfig()) + qSet := exporterqueue.Settings{ + Signal: pipeline.SignalTraces, + ExporterSettings: set, + } + qs, err := NewQueueSender(exporterqueue.NewMemoryQueueFactory[internal.Request](), qSet, exporterqueue.NewDefaultConfig(), exporterbatcher.NewDefaultConfig(), "", noopSender{}) + require.NoError(t, err) assert.NoError(t, qs.Shutdown(context.Background())) }) } diff --git a/exporter/exporterhelper/internal/request_sender.go b/exporter/exporterhelper/internal/request_sender.go index 8ca75f66fb6..5504da5c638 100644 --- a/exporter/exporterhelper/internal/request_sender.go +++ b/exporter/exporterhelper/internal/request_sender.go @@ -7,27 +7,18 @@ import ( "context" // Sender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter/internal" ) type Sender[K any] interface { component.Component Send(context.Context, K) error - SetNextSender(nextSender Sender[K]) } -type BaseSender[K any] struct { - component.StartFunc - component.ShutdownFunc - NextSender Sender[K] -} - -var _ Sender[internal.Request] = (*BaseSender[internal.Request])(nil) - -func (b *BaseSender[K]) Send(ctx context.Context, req K) error { - return b.NextSender.Send(ctx, req) -} +type SendFunc[K any] func(context.Context, K) error -func (b *BaseSender[K]) SetNextSender(nextSender Sender[K]) { - b.NextSender = nextSender +func (f SendFunc[K]) Send(ctx context.Context, k K) error { + if f == nil { + return nil + } + return f(ctx, k) } diff --git a/exporter/exporterhelper/internal/retry_sender.go b/exporter/exporterhelper/internal/retry_sender.go index 0a81f12503f..2aaf3fd9810 100644 --- a/exporter/exporterhelper/internal/retry_sender.go +++ b/exporter/exporterhelper/internal/retry_sender.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" @@ -44,17 +45,19 @@ func NewThrottleRetry(err error, delay time.Duration) error { } type retrySender struct { - BaseSender[internal.Request] + component.StartFunc cfg configretry.BackOffConfig stopCh chan struct{} logger *zap.Logger + next Sender[internal.Request] } -func newRetrySender(config configretry.BackOffConfig, set exporter.Settings) *retrySender { +func newRetrySender(config configretry.BackOffConfig, set exporter.Settings, next Sender[internal.Request]) *retrySender { return &retrySender{ cfg: config, stopCh: make(chan struct{}), logger: set.Logger, + next: next, } } @@ -84,7 +87,7 @@ func (rs *retrySender) Send(ctx context.Context, req internal.Request) error { "Sending request.", trace.WithAttributes(attribute.Int64("retry_num", retryNum))) - err := rs.NextSender.Send(ctx, req) + err := rs.next.Send(ctx, req) if err == nil { return nil } diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 470f4c62e82..35b64bc9b41 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -16,6 +16,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer/consumererror" @@ -477,22 +478,25 @@ func newMockRequest(cnt int, consumeError error) *mockRequest { } type observabilityConsumerSender struct { - BaseSender[internal.Request] + component.StartFunc + component.ShutdownFunc waitGroup *sync.WaitGroup sentItemsCount *atomic.Int64 droppedItemsCount *atomic.Int64 + next Sender[internal.Request] } -func newObservabilityConsumerSender(*ObsReport) Sender[internal.Request] { +func newObservabilityConsumerSender(_ *ObsReport, next Sender[internal.Request]) Sender[internal.Request] { return &observabilityConsumerSender{ waitGroup: new(sync.WaitGroup), droppedItemsCount: &atomic.Int64{}, sentItemsCount: &atomic.Int64{}, + next: next, } } func (ocs *observabilityConsumerSender) Send(ctx context.Context, req internal.Request) error { - err := ocs.NextSender.Send(ctx, req) + err := ocs.next.Send(ctx, req) if err != nil { ocs.droppedItemsCount.Add(int64(req.ItemsCount())) } else { diff --git a/exporter/exporterhelper/internal/timeout_sender.go b/exporter/exporterhelper/internal/timeout_sender.go index a47ddccfb8c..31da1840eff 100644 --- a/exporter/exporterhelper/internal/timeout_sender.go +++ b/exporter/exporterhelper/internal/timeout_sender.go @@ -8,6 +8,7 @@ import ( "errors" "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/internal" ) @@ -35,7 +36,8 @@ func NewDefaultTimeoutConfig() TimeoutConfig { // TimeoutSender is a requestSender that adds a `timeout` to every request that passes this sender. type TimeoutSender struct { - BaseSender[internal.Request] + component.StartFunc + component.ShutdownFunc cfg TimeoutConfig }