Skip to content

Remove deprecated old batcher config #13003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/rm-old-batcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove deprecated old batcher config

# One or more tracking issues or pull requests related to the change
issues: [13003]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
19 changes: 3 additions & 16 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type BaseExporter struct {

queueBatchSettings QueueBatchSettings[request.Request]
queueCfg queuebatch.Config
batcherCfg BatcherConfig
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sender.SendFunc[request.Request], options ...Option) (*BaseExporter, error) {
Expand Down Expand Up @@ -83,20 +82,20 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
return nil, err
}

if be.batcherCfg.Enabled || be.queueCfg.Batch != nil {
if be.queueCfg.Batch != nil {
// Batcher mutates the data.
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}

if be.queueCfg.Enabled || be.batcherCfg.Enabled {
if be.queueCfg.Enabled {
qSet := queuebatch.Settings[request.Request]{
Signal: signal,
ID: set.ID,
Telemetry: set.TelemetrySettings,
Encoding: be.queueBatchSettings.Encoding,
Sizers: be.queueBatchSettings.Sizers,
}
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.ExportFailureMessage, be.firstSender)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,18 +230,6 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
}
}

// WithBatcher enables batching for an exporter based on custom request types.
// For now, it can be used only with the New[Traces|Metrics|Logs|Profiles]Request exporter helpers and
// WithRequestBatchFuncs provided.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg BatcherConfig) Option {
return func(o *BaseExporter) error {
o.batcherCfg = cfg
return nil
}
}

// WithQueueBatchSettings is used to set the QueueBatchSettings for the new request based exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func WithQueueBatchSettings(set QueueBatchSettings[request.Request]) Option {
Expand Down
13 changes: 1 addition & 12 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,10 @@ func TestBaseExporterLogging(t *testing.T) {
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
qCfg := NewDefaultQueueConfig()
qCfg.Enabled = false
qCfg.WaitForResult = true
bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport,
WithQueueBatchSettings(newFakeQueueBatch()),
WithQueue(qCfg),
WithBatcher(NewDefaultBatcherConfig()),
WithRetry(rCfg))
require.NoError(t, err)
require.NoError(t, bs.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -102,11 +101,6 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
qs.Enabled = false
return WithQueue(qs)
}(),
func() Option {
bs := NewDefaultBatcherConfig()
bs.Enabled = false
return WithBatcher(bs)
}(),
},
},
{
Expand All @@ -117,11 +111,6 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
qs.Enabled = false
return WithQueueBatch(qs, newFakeQueueBatch())
}(),
func() Option {
bs := NewDefaultBatcherConfig()
bs.Enabled = false
return WithBatcher(bs)
}(),
},
},
}
Expand Down
108 changes: 1 addition & 107 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"errors"
"fmt"
"math"
"runtime"
"time"

"go.uber.org/zap"

Expand Down Expand Up @@ -45,7 +40,6 @@ func NewDefaultQueueConfig() queuebatch.Config {
func NewQueueSender(
qSet queuebatch.Settings[request.Request],
qCfg queuebatch.Config,
bCfg BatcherConfig,
exportFailureMessage string,
next sender.Sender[request.Request],
) (sender.Sender[request.Request], error) {
Expand All @@ -61,105 +55,5 @@ func NewQueueSender(
return nil
}

// TODO: Remove this when WithBatcher is removed.
if bCfg.Enabled {
return queuebatch.NewQueueBatchLegacyBatcher(qSet, newQueueBatchConfig(qCfg, bCfg), exportFunc)
}
return queuebatch.NewQueueBatch(qSet, newQueueBatchConfig(qCfg, bCfg), exportFunc)
}

func newQueueBatchConfig(qCfg queuebatch.Config, bCfg BatcherConfig) queuebatch.Config {
// Overwrite configuration with the legacy BatcherConfig configured via WithBatcher.
// TODO: Remove this when WithBatcher is removed.
if !bCfg.Enabled {
return qCfg
}

// User configured queueing, copy all config.
if qCfg.Enabled {
// Overwrite configuration with the legacy BatcherConfig configured via WithBatcher.
// TODO: Remove this when WithBatcher is removed.
qCfg.Batch = &queuebatch.BatchConfig{
FlushTimeout: bCfg.FlushTimeout,
MinSize: bCfg.MinSize,
MaxSize: bCfg.MaxSize,
}
return qCfg
}

// This can happen only if the deprecated way to configure batching is used with a "disabled" queue.
// TODO: Remove this when WithBatcher is removed.
return queuebatch.Config{
Enabled: true,
WaitForResult: true,
Sizer: request.SizerTypeRequests,
QueueSize: math.MaxInt,
NumConsumers: runtime.NumCPU(),
BlockOnOverflow: true,
StorageID: nil,
Batch: &queuebatch.BatchConfig{
FlushTimeout: bCfg.FlushTimeout,
MinSize: bCfg.MinSize,
MaxSize: bCfg.MaxSize,
},
}
}

// BatcherConfig defines a configuration for batching requests based on a timeout and a minimum number of items.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatcherConfig struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Enabled bool `mapstructure:"enabled"`

// FlushTimeout sets the time after which a batch will be sent regardless of its size.
FlushTimeout time.Duration `mapstructure:"flush_timeout"`

// SizeConfig sets the size limits for a batch.
SizeConfig `mapstructure:",squash"`
}

// SizeConfig sets the size limits for a batch.
type SizeConfig struct {
Sizer request.SizerType `mapstructure:"sizer"`

// MinSize defines the configuration for the minimum size of a batch.
MinSize int64 `mapstructure:"min_size"`
// MaxSize defines the configuration for the maximum size of a batch.
MaxSize int64 `mapstructure:"max_size"`
}

func (c *BatcherConfig) Validate() error {
if !c.Enabled {
return nil
}

if c.FlushTimeout <= 0 {
return errors.New("`flush_timeout` must be greater than zero")
}

if c.Sizer != request.SizerTypeItems {
return fmt.Errorf("unsupported sizer type: %q", c.Sizer)
}
if c.MinSize < 0 {
return errors.New("`min_size` must be greater than or equal to zero")
}
if c.MaxSize < 0 {
return errors.New("`max_size` must be greater than or equal to zero")
}
if c.MaxSize != 0 && c.MaxSize < c.MinSize {
return errors.New("`max_size` must be greater than or equal to mix_size")
}
return nil
}

func NewDefaultBatcherConfig() BatcherConfig {
return BatcherConfig{
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
SizeConfig: SizeConfig{
Sizer: request.SizerTypeItems,
MinSize: 8192,
},
}
return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
}
58 changes: 1 addition & 57 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -36,7 +35,7 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
logger, observed := observer.New(zap.ErrorLevel)
qSet.Telemetry.Logger = zap.New(logger)
be, err := NewQueueSender(
qSet, NewDefaultQueueConfig(), BatcherConfig{}, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
qSet, NewDefaultQueueConfig(), "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
require.NoError(t, err)

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -61,58 +60,3 @@ func TestQueueConfig_Validate(t *testing.T) {
qCfg.Enabled = false
assert.NoError(t, qCfg.Validate())
}

func TestBatcherConfig_Validate(t *testing.T) {
cfg := NewDefaultBatcherConfig()
require.NoError(t, cfg.Validate())

cfg = NewDefaultBatcherConfig()
cfg.FlushTimeout = 0
require.EqualError(t, cfg.Validate(), "`flush_timeout` must be greater than zero")
}

func TestSizeConfig_Validate(t *testing.T) {
cfg := BatcherConfig{
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
SizeConfig: SizeConfig{
Sizer: request.SizerTypeBytes,
MinSize: 100,
MaxSize: 1000,
},
}
require.EqualError(t, cfg.Validate(), "unsupported sizer type: {\"bytes\"}")

cfg = BatcherConfig{
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
SizeConfig: SizeConfig{
Sizer: request.SizerTypeItems,
MinSize: 100,
MaxSize: -1000,
},
}
require.EqualError(t, cfg.Validate(), "`max_size` must be greater than or equal to zero")

cfg = BatcherConfig{
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
SizeConfig: SizeConfig{
Sizer: request.SizerTypeItems,
MinSize: -100,
MaxSize: 1000,
},
}
require.EqualError(t, cfg.Validate(), "`min_size` must be greater than or equal to zero")

cfg = BatcherConfig{
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
SizeConfig: SizeConfig{
Sizer: request.SizerTypeItems,
MinSize: 1000,
MaxSize: 100,
},
}
require.EqualError(t, cfg.Validate(), "`max_size` must be greater than or equal to mix_size")
}
19 changes: 0 additions & 19 deletions exporter/exporterhelper/queue_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,13 @@ import (
// Deprecated: [v0.123.0] use QueueBatchConfig.
type QueueConfig = QueueBatchConfig

// Deprecated: [v0.123.0] use WithQueueBatch.
func WithRequestQueue(cfg QueueBatchConfig, encoding QueueBatchEncoding[Request]) Option {
return WithQueueBatch(cfg, QueueBatchSettings{Encoding: encoding})
}

// WithQueue overrides the default QueueBatchConfig for an exporter.
// The default QueueBatchConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueBatchConfig) Option {
return internal.WithQueue(config)
}

// Deprecated: [v0.123.0] use WithQueueBatch.
func WithBatcher(cfg BatcherConfig) Option {
return internal.WithBatcher(cfg)
}

// QueueBatchConfig defines configuration for queueing and batching for the exporter.
type QueueBatchConfig = queuebatch.Config

Expand Down Expand Up @@ -61,12 +51,3 @@ func WithQueueBatch(cfg QueueBatchConfig, set QueueBatchSettings) Option {
// NewDefaultQueueConfig returns the default config for QueueBatchConfig.
// By default, the queue stores 1000 requests of telemetry and is non-blocking when full.
var NewDefaultQueueConfig = internal.NewDefaultQueueConfig

// Deprecated: [v0.123.0] use WithQueueBatch.
type BatcherConfig = internal.BatcherConfig

// Deprecated: [v0.123.0] use WithQueueBatch.
type SizeConfig = internal.SizeConfig

// Deprecated: [v0.123.0] use WithQueueBatch.
var NewDefaultBatcherConfig = internal.NewDefaultBatcherConfig
Loading