diff --git a/.chloggen/rm-old-batcher.yaml b/.chloggen/rm-old-batcher.yaml new file mode 100644 index 00000000000..928eae27ad4 --- /dev/null +++ b/.chloggen/rm-old-batcher.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove deprecated old batcher config + +# One or more tracking issues or pull requests related to the change +issues: [13003] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 841c173881a..87e7bb2e1d6 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -48,7 +48,6 @@ type BaseExporter struct { queueBatchSettings QueueBatchSettings[request.Request] queueCfg queuebatch.Config - batcherCfg BatcherConfig } func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sender.SendFunc[request.Request], options ...Option) (*BaseExporter, error) { @@ -83,12 +82,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende return nil, err } - if be.batcherCfg.Enabled || be.queueCfg.Batch != nil { + if be.queueCfg.Batch != nil { // Batcher mutates the data. be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})) } - if be.queueCfg.Enabled || be.batcherCfg.Enabled { + if be.queueCfg.Enabled { qSet := queuebatch.Settings[request.Request]{ Signal: signal, ID: set.ID, @@ -96,7 +95,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende Encoding: be.queueBatchSettings.Encoding, Sizers: be.queueBatchSettings.Sizers, } - be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender) + be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.ExportFailureMessage, be.firstSender) if err != nil { return nil, err } @@ -231,18 +230,6 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } -// WithBatcher enables batching for an exporter based on custom request types. -// For now, it can be used only with the New[Traces|Metrics|Logs|Profiles]Request exporter helpers and -// WithRequestBatchFuncs provided. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithBatcher(cfg BatcherConfig) Option { - return func(o *BaseExporter) error { - o.batcherCfg = cfg - return nil - } -} - // WithQueueBatchSettings is used to set the QueueBatchSettings for the new request based exporter helper. // It must be provided as the first option when creating a new exporter helper. func WithQueueBatchSettings(set QueueBatchSettings[request.Request]) Option { diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index e72740eaaee..5aa492ab179 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -68,11 +68,10 @@ func TestBaseExporterLogging(t *testing.T) { rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false qCfg := NewDefaultQueueConfig() - qCfg.Enabled = false + qCfg.WaitForResult = true bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport, WithQueueBatchSettings(newFakeQueueBatch()), WithQueue(qCfg), - WithBatcher(NewDefaultBatcherConfig()), WithRetry(rCfg)) require.NoError(t, err) require.NoError(t, bs.Start(context.Background(), componenttest.NewNopHost())) @@ -102,11 +101,6 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { qs.Enabled = false return WithQueue(qs) }(), - func() Option { - bs := NewDefaultBatcherConfig() - bs.Enabled = false - return WithBatcher(bs) - }(), }, }, { @@ -117,11 +111,6 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { qs.Enabled = false return WithQueueBatch(qs, newFakeQueueBatch()) }(), - func() Option { - bs := NewDefaultBatcherConfig() - bs.Enabled = false - return WithBatcher(bs) - }(), }, }, } diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 1045aa72064..03d45bcbc9e 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -5,11 +5,6 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe import ( "context" - "errors" - "fmt" - "math" - "runtime" - "time" "go.uber.org/zap" @@ -45,7 +40,6 @@ func NewDefaultQueueConfig() queuebatch.Config { func NewQueueSender( qSet queuebatch.Settings[request.Request], qCfg queuebatch.Config, - bCfg BatcherConfig, exportFailureMessage string, next sender.Sender[request.Request], ) (sender.Sender[request.Request], error) { @@ -61,105 +55,5 @@ func NewQueueSender( return nil } - // TODO: Remove this when WithBatcher is removed. - if bCfg.Enabled { - return queuebatch.NewQueueBatchLegacyBatcher(qSet, newQueueBatchConfig(qCfg, bCfg), exportFunc) - } - return queuebatch.NewQueueBatch(qSet, newQueueBatchConfig(qCfg, bCfg), exportFunc) -} - -func newQueueBatchConfig(qCfg queuebatch.Config, bCfg BatcherConfig) queuebatch.Config { - // Overwrite configuration with the legacy BatcherConfig configured via WithBatcher. - // TODO: Remove this when WithBatcher is removed. - if !bCfg.Enabled { - return qCfg - } - - // User configured queueing, copy all config. - if qCfg.Enabled { - // Overwrite configuration with the legacy BatcherConfig configured via WithBatcher. - // TODO: Remove this when WithBatcher is removed. - qCfg.Batch = &queuebatch.BatchConfig{ - FlushTimeout: bCfg.FlushTimeout, - MinSize: bCfg.MinSize, - MaxSize: bCfg.MaxSize, - } - return qCfg - } - - // This can happen only if the deprecated way to configure batching is used with a "disabled" queue. - // TODO: Remove this when WithBatcher is removed. - return queuebatch.Config{ - Enabled: true, - WaitForResult: true, - Sizer: request.SizerTypeRequests, - QueueSize: math.MaxInt, - NumConsumers: runtime.NumCPU(), - BlockOnOverflow: true, - StorageID: nil, - Batch: &queuebatch.BatchConfig{ - FlushTimeout: bCfg.FlushTimeout, - MinSize: bCfg.MinSize, - MaxSize: bCfg.MaxSize, - }, - } -} - -// BatcherConfig defines a configuration for batching requests based on a timeout and a minimum number of items. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type BatcherConfig struct { - // Enabled indicates whether to not enqueue batches before sending to the consumerSender. - Enabled bool `mapstructure:"enabled"` - - // FlushTimeout sets the time after which a batch will be sent regardless of its size. - FlushTimeout time.Duration `mapstructure:"flush_timeout"` - - // SizeConfig sets the size limits for a batch. - SizeConfig `mapstructure:",squash"` -} - -// SizeConfig sets the size limits for a batch. -type SizeConfig struct { - Sizer request.SizerType `mapstructure:"sizer"` - - // MinSize defines the configuration for the minimum size of a batch. - MinSize int64 `mapstructure:"min_size"` - // MaxSize defines the configuration for the maximum size of a batch. - MaxSize int64 `mapstructure:"max_size"` -} - -func (c *BatcherConfig) Validate() error { - if !c.Enabled { - return nil - } - - if c.FlushTimeout <= 0 { - return errors.New("`flush_timeout` must be greater than zero") - } - - if c.Sizer != request.SizerTypeItems { - return fmt.Errorf("unsupported sizer type: %q", c.Sizer) - } - if c.MinSize < 0 { - return errors.New("`min_size` must be greater than or equal to zero") - } - if c.MaxSize < 0 { - return errors.New("`max_size` must be greater than or equal to zero") - } - if c.MaxSize != 0 && c.MaxSize < c.MinSize { - return errors.New("`max_size` must be greater than or equal to mix_size") - } - return nil -} - -func NewDefaultBatcherConfig() BatcherConfig { - return BatcherConfig{ - Enabled: true, - FlushTimeout: 200 * time.Millisecond, - SizeConfig: SizeConfig{ - Sizer: request.SizerTypeItems, - MinSize: 8192, - }, - } + return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc) } diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index da398e497ee..d06748ea1a1 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -7,7 +7,6 @@ import ( "context" "errors" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -36,7 +35,7 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) { logger, observed := observer.New(zap.ErrorLevel) qSet.Telemetry.Logger = zap.New(logger) be, err := NewQueueSender( - qSet, NewDefaultQueueConfig(), BatcherConfig{}, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") })) + qSet, NewDefaultQueueConfig(), "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") })) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -61,58 +60,3 @@ func TestQueueConfig_Validate(t *testing.T) { qCfg.Enabled = false assert.NoError(t, qCfg.Validate()) } - -func TestBatcherConfig_Validate(t *testing.T) { - cfg := NewDefaultBatcherConfig() - require.NoError(t, cfg.Validate()) - - cfg = NewDefaultBatcherConfig() - cfg.FlushTimeout = 0 - require.EqualError(t, cfg.Validate(), "`flush_timeout` must be greater than zero") -} - -func TestSizeConfig_Validate(t *testing.T) { - cfg := BatcherConfig{ - Enabled: true, - FlushTimeout: 200 * time.Millisecond, - SizeConfig: SizeConfig{ - Sizer: request.SizerTypeBytes, - MinSize: 100, - MaxSize: 1000, - }, - } - require.EqualError(t, cfg.Validate(), "unsupported sizer type: {\"bytes\"}") - - cfg = BatcherConfig{ - Enabled: true, - FlushTimeout: 200 * time.Millisecond, - SizeConfig: SizeConfig{ - Sizer: request.SizerTypeItems, - MinSize: 100, - MaxSize: -1000, - }, - } - require.EqualError(t, cfg.Validate(), "`max_size` must be greater than or equal to zero") - - cfg = BatcherConfig{ - Enabled: true, - FlushTimeout: 200 * time.Millisecond, - SizeConfig: SizeConfig{ - Sizer: request.SizerTypeItems, - MinSize: -100, - MaxSize: 1000, - }, - } - require.EqualError(t, cfg.Validate(), "`min_size` must be greater than or equal to zero") - - cfg = BatcherConfig{ - Enabled: true, - FlushTimeout: 200 * time.Millisecond, - SizeConfig: SizeConfig{ - Sizer: request.SizerTypeItems, - MinSize: 1000, - MaxSize: 100, - }, - } - require.EqualError(t, cfg.Validate(), "`max_size` must be greater than or equal to mix_size") -} diff --git a/exporter/exporterhelper/queue_batch.go b/exporter/exporterhelper/queue_batch.go index d31dd1a31eb..7b11aafef50 100644 --- a/exporter/exporterhelper/queue_batch.go +++ b/exporter/exporterhelper/queue_batch.go @@ -11,11 +11,6 @@ import ( // Deprecated: [v0.123.0] use QueueBatchConfig. type QueueConfig = QueueBatchConfig -// Deprecated: [v0.123.0] use WithQueueBatch. -func WithRequestQueue(cfg QueueBatchConfig, encoding QueueBatchEncoding[Request]) Option { - return WithQueueBatch(cfg, QueueBatchSettings{Encoding: encoding}) -} - // WithQueue overrides the default QueueBatchConfig for an exporter. // The default QueueBatchConfig is to disable queueing. // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. @@ -23,11 +18,6 @@ func WithQueue(config QueueBatchConfig) Option { return internal.WithQueue(config) } -// Deprecated: [v0.123.0] use WithQueueBatch. -func WithBatcher(cfg BatcherConfig) Option { - return internal.WithBatcher(cfg) -} - // QueueBatchConfig defines configuration for queueing and batching for the exporter. type QueueBatchConfig = queuebatch.Config @@ -61,12 +51,3 @@ func WithQueueBatch(cfg QueueBatchConfig, set QueueBatchSettings) Option { // NewDefaultQueueConfig returns the default config for QueueBatchConfig. // By default, the queue stores 1000 requests of telemetry and is non-blocking when full. var NewDefaultQueueConfig = internal.NewDefaultQueueConfig - -// Deprecated: [v0.123.0] use WithQueueBatch. -type BatcherConfig = internal.BatcherConfig - -// Deprecated: [v0.123.0] use WithQueueBatch. -type SizeConfig = internal.SizeConfig - -// Deprecated: [v0.123.0] use WithQueueBatch. -var NewDefaultBatcherConfig = internal.NewDefaultBatcherConfig