Skip to content

Commit f09817e

Browse files
authored
Add batching capability to the old QueueConfig (#12746)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent ea0a4c6 commit f09817e

File tree

15 files changed

+179
-169
lines changed

15 files changed

+179
-169
lines changed

.chloggen/add-batch-to-config.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support to configure batching in the sending queue.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12746]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

.chloggen/deprecate_QueueConfig.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Deprecate `QueueConfig` in favor of `QueueBatchConfig`.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12746]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type BaseExporter struct {
4747
retryCfg configretry.BackOffConfig
4848

4949
queueBatchSettings QueueBatchSettings[request.Request]
50-
queueCfg QueueConfig
50+
queueCfg queuebatch.Config
5151
batcherCfg BatcherConfig
5252
}
5353

@@ -190,10 +190,10 @@ func WithRetry(config configretry.BackOffConfig) Option {
190190
}
191191
}
192192

193-
// WithQueue overrides the default QueueConfig for an exporter.
194-
// The default QueueConfig is to disable queueing.
193+
// WithQueue overrides the default queuebatch.Config for an exporter.
194+
// The default queuebatch.Config is to disable queueing.
195195
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
196-
func WithQueue(cfg QueueConfig) Option {
196+
func WithQueue(cfg queuebatch.Config) Option {
197197
return func(o *BaseExporter) error {
198198
if o.queueBatchSettings.Encoding == nil {
199199
return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead")
@@ -206,7 +206,7 @@ func WithQueue(cfg QueueConfig) Option {
206206
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
207207
// Experimental: This API is at the early stage of development and may change without backward compatibility
208208
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
209-
func WithQueueBatch(cfg QueueConfig, set QueueBatchSettings[request.Request]) Option {
209+
func WithQueueBatch(cfg queuebatch.Config, set QueueBatchSettings[request.Request]) Option {
210210
return func(o *BaseExporter) error {
211211
if !cfg.Enabled {
212212
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 36 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313

1414
"go.uber.org/zap"
1515

16-
"go.opentelemetry.io/collector/component"
17-
"go.opentelemetry.io/collector/confmap"
1816
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1917
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2018
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
@@ -26,10 +24,10 @@ type QueueBatchSettings[K any] struct {
2624
Sizers map[request.SizerType]request.Sizer[K]
2725
}
2826

29-
// NewDefaultQueueConfig returns the default config for QueueConfig.
27+
// NewDefaultQueueConfig returns the default config for queuebatch.Config.
3028
// By default, the queue stores 1000 items of telemetry and is non-blocking when full.
31-
func NewDefaultQueueConfig() QueueConfig {
32-
return QueueConfig{
29+
func NewDefaultQueueConfig() queuebatch.Config {
30+
return queuebatch.Config{
3331
Enabled: true,
3432
Sizer: request.SizerTypeRequests,
3533
NumConsumers: 10,
@@ -38,94 +36,18 @@ func NewDefaultQueueConfig() QueueConfig {
3836
// This default is probably still too high, and may be adjusted further down in a future release
3937
QueueSize: 1_000,
4038
BlockOnOverflow: false,
39+
StorageID: nil,
40+
Batch: nil,
4141
}
4242
}
4343

44-
// QueueConfig defines configuration for queueing requests before exporting.
45-
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
46-
// Experimental: This API is at the early stage of development and may change without backward compatibility
47-
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
48-
type QueueConfig struct {
49-
// Enabled indicates whether to not enqueue batches before exporting.
50-
Enabled bool `mapstructure:"enabled"`
51-
52-
// WaitForResult determines if incoming requests are blocked until the request is processed or not.
53-
// Currently, this option is not available when persistent queue is configured using the storage configuration.
54-
WaitForResult bool `mapstructure:"wait_for_result"`
55-
56-
// Sizer determines the type of size measurement used by this component.
57-
// It accepts "requests", "items", or "bytes".
58-
Sizer request.SizerType `mapstructure:"sizer"`
59-
60-
// QueueSize represents the maximum data size allowed for concurrent storage and processing.
61-
QueueSize int64 `mapstructure:"queue_size"`
62-
63-
// NumConsumers is the number of consumers from the queue.
64-
NumConsumers int `mapstructure:"num_consumers"`
65-
66-
// Deprecated: [v0.123.0] use `block_on_overflow`.
67-
Blocking bool `mapstructure:"blocking"`
68-
69-
// BlockOnOverflow determines the behavior when the component's QueueSize limit is reached.
70-
// If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
71-
BlockOnOverflow bool `mapstructure:"block_on_overflow"`
72-
73-
// StorageID if not empty, enables the persistent storage and uses the component specified
74-
// as a storage extension for the persistent queue
75-
StorageID *component.ID `mapstructure:"storage"`
76-
77-
hasBlocking bool
78-
}
79-
80-
func (qCfg *QueueConfig) Unmarshal(conf *confmap.Conf) error {
81-
if err := conf.Unmarshal(qCfg); err != nil {
82-
return err
83-
}
84-
85-
// If user still uses the old blocking, override and will log error during initialization.
86-
if conf.IsSet("blocking") {
87-
qCfg.hasBlocking = true
88-
qCfg.BlockOnOverflow = qCfg.Blocking
89-
}
90-
91-
return nil
92-
}
93-
94-
// Validate checks if the Config is valid
95-
func (qCfg *QueueConfig) Validate() error {
96-
if !qCfg.Enabled {
97-
return nil
98-
}
99-
100-
if qCfg.NumConsumers <= 0 {
101-
return errors.New("`num_consumers` must be positive")
102-
}
103-
104-
if qCfg.QueueSize <= 0 {
105-
return errors.New("`queue_size` must be positive")
106-
}
107-
108-
if qCfg.StorageID != nil && qCfg.WaitForResult {
109-
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
110-
}
111-
112-
// Only support request sizer for persistent queue at this moment.
113-
if qCfg.StorageID != nil && qCfg.Sizer != request.SizerTypeRequests {
114-
return errors.New("persistent queue configured with `storage` only supports `requests` sizer")
115-
}
116-
return nil
117-
}
118-
11944
func NewQueueSender(
12045
qSet queuebatch.Settings[request.Request],
121-
qCfg QueueConfig,
46+
qCfg queuebatch.Config,
12247
bCfg BatcherConfig,
12348
exportFailureMessage string,
12449
next sender.Sender[request.Request],
12550
) (sender.Sender[request.Request], error) {
126-
if qCfg.hasBlocking {
127-
qSet.Telemetry.Logger.Error("using deprecated field `blocking`")
128-
}
12951
exportFunc := func(ctx context.Context, req request.Request) error {
13052
// Have to read the number of items before sending the request since the request can
13153
// be modified by the downstream components like the batcher.
@@ -141,47 +63,41 @@ func NewQueueSender(
14163
return queuebatch.NewQueueBatch(qSet, newQueueBatchConfig(qCfg, bCfg), exportFunc)
14264
}
14365

144-
func newQueueBatchConfig(qCfg QueueConfig, bCfg BatcherConfig) queuebatch.Config {
145-
var qbCfg queuebatch.Config
66+
func newQueueBatchConfig(qCfg queuebatch.Config, bCfg BatcherConfig) queuebatch.Config {
67+
// Overwrite configuration with the legacy BatcherConfig configured via WithBatcher.
68+
// TODO: Remove this when WithBatcher is removed.
69+
if !bCfg.Enabled {
70+
return qCfg
71+
}
72+
14673
// User configured queueing, copy all config.
14774
if qCfg.Enabled {
148-
qbCfg = queuebatch.Config{
149-
Enabled: true,
150-
WaitForResult: qCfg.WaitForResult,
151-
Sizer: qCfg.Sizer,
152-
QueueSize: qCfg.QueueSize,
153-
NumConsumers: qCfg.NumConsumers,
154-
BlockOnOverflow: qCfg.BlockOnOverflow,
155-
StorageID: qCfg.StorageID,
156-
// TODO: Copy batching configuration as well when available.
157-
}
75+
// Overwrite configuration with the legacy BatcherConfig configured via WithBatcher.
15876
// TODO: Remove this when WithBatcher is removed.
159-
if bCfg.Enabled {
160-
qbCfg.Batch = &queuebatch.BatchConfig{
161-
FlushTimeout: bCfg.FlushTimeout,
162-
MinSize: bCfg.MinSize,
163-
MaxSize: bCfg.MaxSize,
164-
}
165-
}
166-
} else {
167-
// This can happen only if the deprecated way to configure batching is used with a "disabled" queue.
168-
// TODO: Remove this when WithBatcher is removed.
169-
qbCfg = queuebatch.Config{
170-
Enabled: true,
171-
WaitForResult: true,
172-
Sizer: request.SizerTypeRequests,
173-
QueueSize: math.MaxInt,
174-
NumConsumers: runtime.NumCPU(),
175-
BlockOnOverflow: true,
176-
StorageID: nil,
177-
Batch: &queuebatch.BatchConfig{
178-
FlushTimeout: bCfg.FlushTimeout,
179-
MinSize: bCfg.MinSize,
180-
MaxSize: bCfg.MaxSize,
181-
},
77+
qCfg.Batch = &queuebatch.BatchConfig{
78+
FlushTimeout: bCfg.FlushTimeout,
79+
MinSize: bCfg.MinSize,
80+
MaxSize: bCfg.MaxSize,
18281
}
82+
return qCfg
83+
}
84+
85+
// This can happen only if the deprecated way to configure batching is used with a "disabled" queue.
86+
// TODO: Remove this when WithBatcher is removed.
87+
return queuebatch.Config{
88+
Enabled: true,
89+
WaitForResult: true,
90+
Sizer: request.SizerTypeRequests,
91+
QueueSize: math.MaxInt,
92+
NumConsumers: runtime.NumCPU(),
93+
BlockOnOverflow: true,
94+
StorageID: nil,
95+
Batch: &queuebatch.BatchConfig{
96+
FlushTimeout: bCfg.FlushTimeout,
97+
MinSize: bCfg.MinSize,
98+
MaxSize: bCfg.MaxSize,
99+
},
183100
}
184-
return qbCfg
185101
}
186102

187103
// BatcherConfig defines a configuration for batching requests based on a timeout and a minimum number of items.

exporter/exporterhelper/internal/queue_sender_test.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/component/componenttest"
18-
"go.opentelemetry.io/collector/confmap"
1918
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
2019
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2120
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
@@ -37,30 +36,15 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
3736
qSet.Telemetry.Logger = zap.New(logger)
3837
be, err := NewQueueSender(
3938
qSet, NewDefaultQueueConfig(), BatcherConfig{}, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
40-
4139
require.NoError(t, err)
40+
4241
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
4342
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2}))
4443
require.NoError(t, be.Shutdown(context.Background()))
4544
assert.Len(t, observed.All(), 1)
4645
assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message)
4746
}
4847

49-
func TestQueueConfig_DeprecatedBlockingUnmarshal(t *testing.T) {
50-
conf := confmap.NewFromStringMap(map[string]any{
51-
"enabled": true,
52-
"num_consumers": 2,
53-
"queue_size": 100,
54-
"blocking": true,
55-
})
56-
57-
qCfg := QueueConfig{}
58-
assert.False(t, qCfg.BlockOnOverflow)
59-
require.NoError(t, conf.Unmarshal(&qCfg))
60-
assert.True(t, qCfg.BlockOnOverflow)
61-
assert.True(t, qCfg.hasBlocking)
62-
}
63-
6448
func TestQueueConfig_Validate(t *testing.T) {
6549
qCfg := NewDefaultQueueConfig()
6650
require.NoError(t, qCfg.Validate())

exporter/exporterhelper/internal/queuebatch/config.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/confmap"
1112
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1213
)
1314

@@ -31,6 +32,9 @@ type Config struct {
3132
// If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
3233
BlockOnOverflow bool `mapstructure:"block_on_overflow"`
3334

35+
// Deprecated: [v0.123.0] use `block_on_overflow`.
36+
Blocking bool `mapstructure:"blocking"`
37+
3438
// StorageID if not empty, enables the persistent storage and uses the component specified
3539
// as a storage extension for the persistent queue.
3640
// TODO: This will be changed to Optional when available.
@@ -45,6 +49,23 @@ type Config struct {
4549
// BatchConfig it configures how the requests are consumed from the queue and batch together during consumption.
4650
// TODO: This will be changed to Optional when available.
4751
Batch *BatchConfig `mapstructure:"batch"`
52+
53+
// TODO: Remove when deprecated "blocking" is removed.
54+
hasBlocking bool
55+
}
56+
57+
func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
58+
if err := conf.Unmarshal(cfg); err != nil {
59+
return err
60+
}
61+
62+
// If user still uses the old blocking, override and will log error during initialization.
63+
if conf.IsSet("blocking") {
64+
cfg.hasBlocking = true
65+
cfg.BlockOnOverflow = cfg.Blocking
66+
}
67+
68+
return nil
4869
}
4970

5071
// Validate checks if the Config is valid
@@ -61,6 +82,7 @@ func (cfg *Config) Validate() error {
6182
return errors.New("`queue_size` must be positive")
6283
}
6384

85+
// Only support request sizer for persistent queue at this moment.
6486
if cfg.StorageID != nil && cfg.WaitForResult {
6587
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
6688
}
@@ -70,6 +92,11 @@ func (cfg *Config) Validate() error {
7092
return errors.New("persistent queue configured with `storage` only supports `requests` sizer")
7193
}
7294

95+
// Only support items sizer for batch at this moment.
96+
if cfg.Batch != nil && cfg.Sizer != request.SizerTypeItems {
97+
return errors.New("`batch` supports only `items` sizer")
98+
}
99+
73100
return nil
74101
}
75102

0 commit comments

Comments
 (0)