Skip to content

Commit dbb0244

Browse files
committed
Allow users to configure different sizers for memory queue
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 58370e5 commit dbb0244

File tree

8 files changed

+79
-42
lines changed

8 files changed

+79
-42
lines changed

.chloggen/allow-sizer.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Allow exporter memory queue to use different type of sizers.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12708]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type QueueBatchSettings[K any] struct {
2828
func NewDefaultQueueConfig() QueueConfig {
2929
return QueueConfig{
3030
Enabled: true,
31+
Sizer: request.SizerTypeRequests,
3132
NumConsumers: 10,
3233
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
3334
// This can be estimated at 1-4 GB worth of maximum memory usage
@@ -44,10 +45,16 @@ func NewDefaultQueueConfig() QueueConfig {
4445
type QueueConfig struct {
4546
// Enabled indicates whether to not enqueue batches before exporting.
4647
Enabled bool `mapstructure:"enabled"`
48+
49+
// Sizer determines the type of size measurement used by this component.
50+
// It accepts "requests", "items", or "bytes".
51+
Sizer request.SizerType `mapstructure:"sizer"`
52+
53+
// QueueSize represents the maximum data size allowed for concurrent storage and processing.
54+
QueueSize int `mapstructure:"queue_size"`
55+
4756
// NumConsumers is the number of consumers from the queue.
4857
NumConsumers int `mapstructure:"num_consumers"`
49-
// QueueSize is the maximum number of requests allowed in queue at any given time.
50-
QueueSize int `mapstructure:"queue_size"`
5158
// Blocking controls the queue behavior when full.
5259
// If true it blocks until enough space to add the new request to the queue.
5360
Blocking bool `mapstructure:"blocking"`
@@ -61,12 +68,20 @@ func (qCfg *QueueConfig) Validate() error {
6168
if !qCfg.Enabled {
6269
return nil
6370
}
71+
6472
if qCfg.NumConsumers <= 0 {
6573
return errors.New("`num_consumers` must be positive")
6674
}
75+
6776
if qCfg.QueueSize <= 0 {
6877
return errors.New("`queue_size` must be positive")
6978
}
79+
80+
// Only support request sizer for persistent queue at this moment.
81+
if qCfg.StorageID != nil && qCfg.Sizer != request.SizerTypeRequests {
82+
return errors.New("persistent queue only supports `requests` sizer")
83+
}
84+
7085
return nil
7186
}
7287

@@ -96,7 +111,7 @@ func newQueueBatchConfig(qCfg QueueConfig, bCfg BatcherConfig) queuebatch.Config
96111
qbCfg := queuebatch.Config{
97112
Enabled: true,
98113
WaitForResult: !qCfg.Enabled,
99-
Sizer: request.SizerTypeRequests,
114+
Sizer: qCfg.Sizer,
100115
QueueSize: qCfg.QueueSize,
101116
NumConsumers: qCfg.NumConsumers,
102117
BlockOnOverflow: qCfg.Blocking,

exporter/exporterhelper/internal/queuebatch/queue_batch.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel
66
import (
77
"context"
88
"errors"
9+
"fmt"
910

1011
"go.opentelemetry.io/collector/component"
1112
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -42,32 +43,35 @@ func NewQueueBatch(
4243
b = newDefaultBatcher(*cfg.Batch, next, cfg.NumConsumers)
4344
}
4445

45-
sizer, ok := qSet.Sizers[request.SizerTypeRequests]
46-
if !ok {
47-
return nil, errors.New("queue_batch: unsupported sizer")
48-
}
49-
5046
var q Queue[request.Request]
5147
switch {
5248
case cfg.WaitForResult:
5349
q = newDisabledQueue(b.Consume)
54-
case cfg.StorageID != nil:
55-
q = newAsyncQueue(newPersistentQueue[request.Request](persistentQueueSettings[request.Request]{
56-
sizer: sizer,
57-
capacity: int64(cfg.QueueSize),
58-
blocking: cfg.BlockOnOverflow,
59-
signal: qSet.Signal,
60-
storageID: *cfg.StorageID,
61-
encoding: qSet.Encoding,
62-
id: qSet.ID,
63-
telemetry: qSet.Telemetry,
64-
}), cfg.NumConsumers, b.Consume)
6550
default:
66-
q = newAsyncQueue(newMemoryQueue[request.Request](memoryQueueSettings[request.Request]{
67-
sizer: sizer,
68-
capacity: int64(cfg.QueueSize),
69-
blocking: cfg.BlockOnOverflow,
70-
}), cfg.NumConsumers, b.Consume)
51+
sizer, ok := qSet.Sizers[cfg.Sizer]
52+
if !ok {
53+
return nil, fmt.Errorf("queue_batch: unsupported sizer %q", cfg.Sizer)
54+
}
55+
56+
switch cfg.StorageID != nil {
57+
case true:
58+
q = newAsyncQueue(newPersistentQueue[request.Request](persistentQueueSettings[request.Request]{
59+
sizer: sizer,
60+
capacity: int64(cfg.QueueSize),
61+
blocking: cfg.BlockOnOverflow,
62+
signal: qSet.Signal,
63+
storageID: *cfg.StorageID,
64+
encoding: qSet.Encoding,
65+
id: qSet.ID,
66+
telemetry: qSet.Telemetry,
67+
}), cfg.NumConsumers, b.Consume)
68+
default:
69+
q = newAsyncQueue(newMemoryQueue[request.Request](memoryQueueSettings[request.Request]{
70+
sizer: sizer,
71+
capacity: int64(cfg.QueueSize),
72+
blocking: cfg.BlockOnOverflow,
73+
}), cfg.NumConsumers, b.Consume)
74+
}
7175
}
7276

7377
oq, err := newObsQueue(qSet, q)

exporter/exporterhelper/internal/queuebatch/queue_batch_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,25 +95,25 @@ func TestQueueBatchDoNotPreserveCancellation(t *testing.T) {
9595
}
9696

9797
func TestQueueBatchHappyPath(t *testing.T) {
98-
cfg := Config{
99-
Enabled: true,
100-
QueueSize: 10,
101-
NumConsumers: 1,
102-
}
98+
cfg := newTestConfig()
99+
cfg.BlockOnOverflow = false
100+
cfg.QueueSize = 56
103101
sink := requesttest.NewSink()
104102
qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sink.Export)
105103
require.NoError(t, err)
106104

107105
for i := 0; i < 10; i++ {
108-
require.NoError(t, qb.Send(context.Background(), &requesttest.FakeRequest{Items: i}))
106+
require.NoError(t, qb.Send(context.Background(), &requesttest.FakeRequest{Items: i + 1}))
109107
}
110108

111109
// expect queue to be full
112110
require.Error(t, qb.Send(context.Background(), &requesttest.FakeRequest{Items: 2}))
113111

114112
require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost()))
115113
assert.Eventually(t, func() bool {
116-
return sink.RequestsCount() == 10 && sink.ItemsCount() == 45
114+
// Because batching is used, cannot guarantee that will be 1 batch or multiple because of the flush interval.
115+
// Check only for total items count.
116+
return sink.ItemsCount() == 55
117117
}, 1*time.Second, 10*time.Millisecond)
118118
require.NoError(t, qb.Shutdown(context.Background()))
119119
}

exporter/exporterhelper/queue_batch.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,7 @@ func WithQueueBatch(cfg QueueBatchConfig, set QueueBatchSettings) Option {
6161

6262
// NewDefaultQueueConfig returns the default config for QueueConfig.
6363
// By default, the queue stores 1000 items of telemetry and is non-blocking when full.
64-
func NewDefaultQueueConfig() QueueConfig {
65-
return QueueConfig{
66-
Enabled: true,
67-
NumConsumers: 10,
68-
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
69-
// This can be estimated at 1-4 GB worth of maximum memory usage
70-
// This default is probably still too high, and may be adjusted further down in a future release
71-
QueueSize: 1_000,
72-
Blocking: false,
73-
}
74-
}
64+
var NewDefaultQueueConfig = internal.NewDefaultQueueConfig
7565

7666
// BatcherConfig defines a configuration for batching requests based on a timeout and a minimum number of items.
7767
// Experimental: This API is at the early stage of development and may change without backward compatibility

exporter/exporterqueue/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type Config = exporterhelper.QueueConfig
1515
func NewDefaultConfig() Config {
1616
return Config{
1717
Enabled: true,
18+
Sizer: exporterhelper.RequestSizerTypeRequests,
1819
NumConsumers: 10,
1920
QueueSize: 1_000,
2021
Blocking: true,

exporter/otlpexporter/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func TestUnmarshalConfig(t *testing.T) {
5252
},
5353
QueueConfig: exporterhelper.QueueConfig{
5454
Enabled: true,
55+
Sizer: exporterhelper.RequestSizerTypeRequests,
5556
NumConsumers: 2,
5657
QueueSize: 10,
5758
},

exporter/otlphttpexporter/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func TestUnmarshalConfig(t *testing.T) {
5454
},
5555
QueueConfig: exporterhelper.QueueConfig{
5656
Enabled: true,
57+
Sizer: exporterhelper.RequestSizerTypeRequests,
5758
NumConsumers: 2,
5859
QueueSize: 10,
5960
},

0 commit comments

Comments
 (0)