diff --git a/.chloggen/allow-sizer.yaml b/.chloggen/allow-sizer.yaml new file mode 100644 index 00000000000..3073f63c3c5 --- /dev/null +++ b/.chloggen/allow-sizer.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow exporter memory queue to use different type of sizers. + +# One or more tracking issues or pull requests related to the change +issues: [12708] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index a4455addc25..043078819f2 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -28,6 +28,7 @@ type QueueBatchSettings[K any] struct { func NewDefaultQueueConfig() QueueConfig { return QueueConfig{ Enabled: true, + Sizer: request.SizerTypeRequests, NumConsumers: 10, // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue // This can be estimated at 1-4 GB worth of maximum memory usage @@ -44,10 +45,16 @@ func NewDefaultQueueConfig() QueueConfig { type QueueConfig struct { // Enabled indicates whether to not enqueue batches before exporting. Enabled bool `mapstructure:"enabled"` + + // Sizer determines the type of size measurement used by this component. + // It accepts "requests", "items", or "bytes". + Sizer request.SizerType `mapstructure:"sizer"` + + // QueueSize represents the maximum data size allowed for concurrent storage and processing. + QueueSize int `mapstructure:"queue_size"` + // NumConsumers is the number of consumers from the queue. NumConsumers int `mapstructure:"num_consumers"` - // QueueSize is the maximum number of requests allowed in queue at any given time. - QueueSize int `mapstructure:"queue_size"` // Blocking controls the queue behavior when full. // If true it blocks until enough space to add the new request to the queue. Blocking bool `mapstructure:"blocking"` @@ -61,12 +68,20 @@ func (qCfg *QueueConfig) Validate() error { if !qCfg.Enabled { return nil } + if qCfg.NumConsumers <= 0 { return errors.New("`num_consumers` must be positive") } + if qCfg.QueueSize <= 0 { return errors.New("`queue_size` must be positive") } + + // Only support request sizer for persistent queue at this moment. + if qCfg.StorageID != nil && qCfg.Sizer != request.SizerTypeRequests { + return errors.New("persistent queue only supports `requests` sizer") + } + return nil } @@ -96,7 +111,7 @@ func newQueueBatchConfig(qCfg QueueConfig, bCfg BatcherConfig) queuebatch.Config qbCfg := queuebatch.Config{ Enabled: true, WaitForResult: !qCfg.Enabled, - Sizer: request.SizerTypeRequests, + Sizer: qCfg.Sizer, QueueSize: qCfg.QueueSize, NumConsumers: qCfg.NumConsumers, BlockOnOverflow: qCfg.Blocking, diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch.go b/exporter/exporterhelper/internal/queuebatch/queue_batch.go index 76ca2ac1269..b2f866cd402 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch.go @@ -6,6 +6,7 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel import ( "context" "errors" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" @@ -42,32 +43,35 @@ func NewQueueBatch( b = newDefaultBatcher(*cfg.Batch, next, cfg.NumConsumers) } - sizer, ok := qSet.Sizers[request.SizerTypeRequests] - if !ok { - return nil, errors.New("queue_batch: unsupported sizer") - } - var q Queue[request.Request] switch { case cfg.WaitForResult: q = newDisabledQueue(b.Consume) - case cfg.StorageID != nil: - q = newAsyncQueue(newPersistentQueue[request.Request](persistentQueueSettings[request.Request]{ - sizer: sizer, - capacity: int64(cfg.QueueSize), - blocking: cfg.BlockOnOverflow, - signal: qSet.Signal, - storageID: *cfg.StorageID, - encoding: qSet.Encoding, - id: qSet.ID, - telemetry: qSet.Telemetry, - }), cfg.NumConsumers, b.Consume) default: - q = newAsyncQueue(newMemoryQueue[request.Request](memoryQueueSettings[request.Request]{ - sizer: sizer, - capacity: int64(cfg.QueueSize), - blocking: cfg.BlockOnOverflow, - }), cfg.NumConsumers, b.Consume) + sizer, ok := qSet.Sizers[cfg.Sizer] + if !ok { + return nil, fmt.Errorf("queue_batch: unsupported sizer %q", cfg.Sizer) + } + + switch cfg.StorageID != nil { + case true: + q = newAsyncQueue(newPersistentQueue[request.Request](persistentQueueSettings[request.Request]{ + sizer: sizer, + capacity: int64(cfg.QueueSize), + blocking: cfg.BlockOnOverflow, + signal: qSet.Signal, + storageID: *cfg.StorageID, + encoding: qSet.Encoding, + id: qSet.ID, + telemetry: qSet.Telemetry, + }), cfg.NumConsumers, b.Consume) + default: + q = newAsyncQueue(newMemoryQueue[request.Request](memoryQueueSettings[request.Request]{ + sizer: sizer, + capacity: int64(cfg.QueueSize), + blocking: cfg.BlockOnOverflow, + }), cfg.NumConsumers, b.Consume) + } } oq, err := newObsQueue(qSet, q) diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go index 728272ab14f..5f51a4bfead 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go @@ -95,17 +95,15 @@ func TestQueueBatchDoNotPreserveCancellation(t *testing.T) { } func TestQueueBatchHappyPath(t *testing.T) { - cfg := Config{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - } + cfg := newTestConfig() + cfg.BlockOnOverflow = false + cfg.QueueSize = 56 sink := requesttest.NewSink() qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sink.Export) require.NoError(t, err) for i := 0; i < 10; i++ { - require.NoError(t, qb.Send(context.Background(), &requesttest.FakeRequest{Items: i})) + require.NoError(t, qb.Send(context.Background(), &requesttest.FakeRequest{Items: i + 1})) } // expect queue to be full @@ -113,7 +111,9 @@ func TestQueueBatchHappyPath(t *testing.T) { require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost())) assert.Eventually(t, func() bool { - return sink.RequestsCount() == 10 && sink.ItemsCount() == 45 + // Because batching is used, cannot guarantee that will be 1 batch or multiple because of the flush interval. + // Check only for total items count. + return sink.ItemsCount() == 55 }, 1*time.Second, 10*time.Millisecond) require.NoError(t, qb.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/queue_batch.go b/exporter/exporterhelper/queue_batch.go index 9bf84b3b389..ec5f47af893 100644 --- a/exporter/exporterhelper/queue_batch.go +++ b/exporter/exporterhelper/queue_batch.go @@ -61,17 +61,7 @@ func WithQueueBatch(cfg QueueBatchConfig, set QueueBatchSettings) Option { // NewDefaultQueueConfig returns the default config for QueueConfig. // By default, the queue stores 1000 items of telemetry and is non-blocking when full. -func NewDefaultQueueConfig() QueueConfig { - return QueueConfig{ - Enabled: true, - NumConsumers: 10, - // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue - // This can be estimated at 1-4 GB worth of maximum memory usage - // This default is probably still too high, and may be adjusted further down in a future release - QueueSize: 1_000, - Blocking: false, - } -} +var NewDefaultQueueConfig = internal.NewDefaultQueueConfig // BatcherConfig defines a configuration for batching requests based on a timeout and a minimum number of items. // Experimental: This API is at the early stage of development and may change without backward compatibility diff --git a/exporter/exporterqueue/config.go b/exporter/exporterqueue/config.go index c5d943d53a7..20b6e3edc3f 100644 --- a/exporter/exporterqueue/config.go +++ b/exporter/exporterqueue/config.go @@ -15,6 +15,7 @@ type Config = exporterhelper.QueueConfig func NewDefaultConfig() Config { return Config{ Enabled: true, + Sizer: exporterhelper.RequestSizerTypeRequests, NumConsumers: 10, QueueSize: 1_000, Blocking: true, diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 4a56ef693c2..b13b7bb6c1f 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -52,6 +52,7 @@ func TestUnmarshalConfig(t *testing.T) { }, QueueConfig: exporterhelper.QueueConfig{ Enabled: true, + Sizer: exporterhelper.RequestSizerTypeRequests, NumConsumers: 2, QueueSize: 10, }, diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index 8e07baa283d..15b1eead7a2 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -54,6 +54,7 @@ func TestUnmarshalConfig(t *testing.T) { }, QueueConfig: exporterhelper.QueueConfig{ Enabled: true, + Sizer: exporterhelper.RequestSizerTypeRequests, NumConsumers: 2, QueueSize: 10, },