Skip to content

Commit 3febf16

Browse files
feat/re-allow multiple workers (#36134)
#### Description This MR does the following: * Re adds the ability to allow multiple workers in this exporter due to: 1. Out of Order is no longer an issue now that it is fully supported in Prometheus. Nonetheless, I am setting the default worker as 1 to avoid OoO in Vanilla Prometheus Settings. 2. With a single worker, and for a collector with a large load, this becomes "blocking". Example: Imagine a scenario in which a collector is collecting lots of targets, and with a slow prometheus/unstable network, a single worker can easily bottleneck the off-shipping if retries are enabled. #### Link to tracking issue N/A #### Testing #### #### Documentation docs auto-updated. Readme.md is now correct in its explanation of the `num_consumers since its no longer hard-coded at 1. Additional docs added.
1 parent ce1d15a commit 3febf16

File tree

7 files changed

+110
-12
lines changed

7 files changed

+110
-12
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: prometheusremotewriteexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Re allows the configuration of multiple workers
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36134]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/prometheusremotewriteexporter/README.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ The following settings can be optionally configured:
5454
- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes.
5555
- `enabled`: enable the sending queue (default: `true`)
5656
- `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false` (default: `10000`)
57-
- `num_consumers`: minimum number of workers to use to fan out the outgoing requests. (default: `5`)
57+
- `num_consumers`: minimum number of workers to use to fan out the outgoing requests. (default: `5` or default: `1` if `EnableMultipleWorkersFeatureGate` is enabled).
5858
- `resource_to_telemetry_conversion`
5959
- `enabled` (default = false): If `enabled` is `true`, all the resource attributes will be converted to metric labels by default.
6060
- `target_info`: customize `target_info` metric
@@ -66,6 +66,7 @@ The following settings can be optionally configured:
6666
- `max_batch_size_bytes` (default = `3000000` -> `~2.861 mb`): Maximum size of a batch of
6767
samples to be sent to the remote write endpoint. If the batch size is larger
6868
than this value, it will be split into multiple batches.
69+
- `max_batch_request_parallelism` (default = `5`): Maximum parallelism allowed for a single request bigger than `max_batch_size_bytes`.
6970

7071
Example:
7172

@@ -101,12 +102,22 @@ Several helper files are leveraged to provide additional capabilities automatica
101102
- [Retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), note that the exporter doesn't support `sending_queue` but provides `remote_write_queue`.
102103

103104
### Feature gates
105+
106+
#### RetryOn429
107+
104108
This exporter has feature gate: `exporter.prometheusremotewritexporter.RetryOn429`.
105109
When this feature gate is enable the prometheus remote write exporter will retry on 429 http status code with the provided retry configuration.
106110
It currently doesn't support respecting the http header `Retry-After` if provided since the retry library used doesn't support this feature.
107111

108112
To enable it run collector with enabled feature gate `exporter.prometheusremotewritexporter.RetryOn429`. This can be done by executing it with one additional parameter - `--feature-gates=telemetry.useOtelForInternalMetrics`.
109113

114+
#### EnableMultipleWorkersFeatureGate
115+
116+
This exporter has feature gate: `+exporter.prometheusremotewritexporter.EnableMultipleWorkers`.
117+
118+
When this feature gate is enabled, `num_consumers` will be used as the worker counter for handling batches from the queue, and `max_batch_request_parallelism` will be used for parallelism on single batch bigger than `max_batch_size_bytes`.
119+
Enabling this feature gate, with `num_consumers` higher than 1 requires the target destination to supports ingestion of OutOfOrder samples. See [Multiple Consumers and OutOfOrder](#multiple-consumers-and-outoforder) for more info
120+
110121
## Metric names and labels normalization
111122

112123
OpenTelemetry metric names and attributes are normalized to be compliant with Prometheus naming rules. [Details on this normalization process are described in the Prometheus translator module](../../pkg/translator/prometheus/).
@@ -149,3 +160,19 @@ sum by (namespace) (app_ads_ad_requests_total)
149160
[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
150161
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
151162
[core]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
163+
164+
## Multiple Consumers and OutOfOrder
165+
166+
**DISCLAIMER**: This snippet applies only to Prometheus, other remote write destinations using Prometheus Protocol (ex: Thanos/Grafana Mimir/VictoriaMetrics) may have different settings.
167+
168+
By default, Prometheus expects samples to be ingested sequentially, in temporal order.
169+
170+
When multiple consumers are enabled, the temporal ordering of the samples written to the target destination is not deterministic, and temporal ordering can no longer be guaranteed. For example, one worker may push a sample for `t+30s`, and a second worker may push an additional sample but for `t+15s`.
171+
172+
Vanilla Prometheus configurations will reject these unordered samples and you'll receive "out of order" errors.
173+
174+
Out-of-order support in Prometheus must be enabled for multiple consumers.
175+
This can be done by using the `tsdb.out_of_order_time_window: 10m` settings. Please choose an appropriate time window to support pushing the worst-case scenarios of a "queue" build-up on the sender side.
176+
177+
See for more info:
178+
- https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tsdb

exporter/prometheusremotewriteexporter/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ type Config struct {
3535
// maximum size in bytes of time series batch sent to remote storage
3636
MaxBatchSizeBytes int `mapstructure:"max_batch_size_bytes"`
3737

38+
// maximum amount of parallel requests to do when handling large batch request
39+
MaxBatchRequestParallelism *int `mapstructure:"max_batch_request_parallelism"`
40+
3841
// ResourceToTelemetrySettings is the option for converting resource attributes to telemetry attributes.
3942
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
4043
// If enabled, all the resource attributes will be converted to metric labels by default.
@@ -87,6 +90,10 @@ var _ component.Config = (*Config)(nil)
8790

8891
// Validate checks if the exporter configuration is valid
8992
func (cfg *Config) Validate() error {
93+
if cfg.MaxBatchRequestParallelism != nil && *cfg.MaxBatchRequestParallelism < 1 {
94+
return fmt.Errorf("max_batch_request_parallelism can't be set to below 1")
95+
}
96+
9097
if cfg.RemoteWriteQueue.QueueSize < 0 {
9198
return fmt.Errorf("remote write queue size can't be negative")
9299
}

exporter/prometheusremotewriteexporter/config_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ func TestLoadConfig(t *testing.T) {
5656
{
5757
id: component.NewIDWithName(metadata.Type, "2"),
5858
expected: &Config{
59-
MaxBatchSizeBytes: 3000000,
60-
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
59+
MaxBatchSizeBytes: 3000000,
60+
MaxBatchRequestParallelism: toPtr(10),
61+
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
6162
BackOffConfig: configretry.BackOffConfig{
6263
Enabled: true,
6364
InitialInterval: 10 * time.Second,
@@ -90,6 +91,10 @@ func TestLoadConfig(t *testing.T) {
9091
id: component.NewIDWithName(metadata.Type, "negative_num_consumers"),
9192
errorMessage: "remote write consumer number can't be negative",
9293
},
94+
{
95+
id: component.NewIDWithName(metadata.Type, "less_than_1_max_batch_request_parallelism"),
96+
errorMessage: "max_batch_request_parallelism can't be set to below 1",
97+
},
9398
}
9499

95100
for _, tt := range tests {
@@ -136,3 +141,7 @@ func TestDisabledTargetInfo(t *testing.T) {
136141

137142
assert.False(t, cfg.(*Config).TargetInfo.Enabled)
138143
}
144+
145+
func toPtr[T any](val T) *T {
146+
return &val
147+
}

exporter/prometheusremotewriteexporter/exporter.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,21 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
124124

125125
userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version)
126126

127+
concurrency := 5
128+
if !enableMultipleWorkersFeatureGate.IsEnabled() {
129+
concurrency = cfg.RemoteWriteQueue.NumConsumers
130+
}
131+
if cfg.MaxBatchRequestParallelism != nil {
132+
concurrency = *cfg.MaxBatchRequestParallelism
133+
}
134+
127135
prwe := &prwExporter{
128136
endpointURL: endpointURL,
129137
wg: new(sync.WaitGroup),
130138
closeChan: make(chan struct{}),
131139
userAgentHeader: userAgentHeader,
132140
maxBatchSizeBytes: cfg.MaxBatchSizeBytes,
133-
concurrency: cfg.RemoteWriteQueue.NumConsumers,
141+
concurrency: concurrency,
134142
clientSettings: &cfg.ClientConfig,
135143
settings: set.TelemetrySettings,
136144
retrySettings: cfg.BackOffConfig,

exporter/prometheusremotewriteexporter/factory.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ var retryOn429FeatureGate = featuregate.GlobalRegistry().MustRegister(
2626
featuregate.WithRegisterDescription("When enabled, the Prometheus remote write exporter will retry 429 http status code. Requires exporter.prometheusremotewritexporter.metrics.RetryOn429 to be enabled."),
2727
)
2828

29+
var enableMultipleWorkersFeatureGate = featuregate.GlobalRegistry().MustRegister(
30+
"exporter.prometheusremotewritexporter.EnableMultipleWorkers",
31+
featuregate.StageAlpha,
32+
featuregate.WithRegisterDescription("When enabled and settings configured, the Prometheus remote exporter will"+
33+
" spawn multiple workers/goroutines to handle incoming metrics batches concurrently"),
34+
)
35+
2936
// NewFactory creates a new Prometheus Remote Write exporter.
3037
func NewFactory() exporter.Factory {
3138
return exporter.NewFactory(
@@ -42,17 +49,19 @@ func createMetricsExporter(ctx context.Context, set exporter.Settings,
4249
return nil, errors.New("invalid configuration")
4350
}
4451

52+
if !enableMultipleWorkersFeatureGate.IsEnabled() && prwCfg.RemoteWriteQueue.NumConsumers != 5 {
53+
set.Logger.Warn("`remote_write_queue.num_consumers` will be used to configure processing parallelism, rather than request parallelism in a future release. This may cause out-of-order issues unless you take action. Please migrate to using `max_batch_request_parallelism` to keep the your existing behavior.")
54+
}
55+
4556
prwe, err := newPRWExporter(prwCfg, set)
4657
if err != nil {
4758
return nil, err
4859
}
4960

50-
// Don't allow users to configure the queue.
51-
// See https://github.com/open-telemetry/opentelemetry-collector/issues/2949.
52-
// Prometheus remote write samples needs to be in chronological
53-
// order for each timeseries. If we shard the incoming metrics
54-
// without considering this limitation, we experience
55-
// "out of order samples" errors.
61+
numConsumers := 1
62+
if enableMultipleWorkersFeatureGate.IsEnabled() {
63+
numConsumers = prwCfg.RemoteWriteQueue.NumConsumers
64+
}
5665
exporter, err := exporterhelper.NewMetrics(
5766
ctx,
5867
set,
@@ -61,7 +70,7 @@ func createMetricsExporter(ctx context.Context, set exporter.Settings,
6170
exporterhelper.WithTimeout(prwCfg.TimeoutSettings),
6271
exporterhelper.WithQueue(exporterhelper.QueueConfig{
6372
Enabled: prwCfg.RemoteWriteQueue.Enabled,
64-
NumConsumers: 1,
73+
NumConsumers: numConsumers,
6574
QueueSize: prwCfg.RemoteWriteQueue.QueueSize,
6675
}),
6776
exporterhelper.WithStart(prwe.Start),
@@ -83,10 +92,16 @@ func createDefaultConfig() component.Config {
8392
clientConfig.WriteBufferSize = 512 * 1024
8493
clientConfig.Timeout = exporterhelper.NewDefaultTimeoutConfig().Timeout
8594

95+
numConsumers := 5
96+
if enableMultipleWorkersFeatureGate.IsEnabled() {
97+
numConsumers = 1
98+
}
8699
return &Config{
87100
Namespace: "",
88101
ExternalLabels: map[string]string{},
89102
MaxBatchSizeBytes: 3000000,
103+
// To set this as default once `exporter.prometheusremotewritexporter.EnableMultipleWorkers` is removed
104+
// MaxBatchRequestParallelism: 5,
90105
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
91106
BackOffConfig: retrySettings,
92107
AddMetricSuffixes: true,
@@ -96,7 +111,7 @@ func createDefaultConfig() component.Config {
96111
RemoteWriteQueue: RemoteWriteQueue{
97112
Enabled: true,
98113
QueueSize: 10000,
99-
NumConsumers: 5,
114+
NumConsumers: numConsumers,
100115
},
101116
TargetInfo: &TargetInfo{
102117
Enabled: true,

exporter/prometheusremotewriteexporter/testdata/config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ prometheusremotewrite:
22

33
prometheusremotewrite/2:
44
namespace: "test-space"
5+
max_batch_request_parallelism: 10
56
retry_on_failure:
67
enabled: true
78
initial_interval: 10s
@@ -38,6 +39,10 @@ prometheusremotewrite/negative_num_consumers:
3839
queue_size: 5
3940
num_consumers: -1
4041

42+
prometheusremotewrite/less_than_1_max_batch_request_parallelism:
43+
endpoint: "localhost:8888"
44+
max_batch_request_parallelism: 0
45+
4146
prometheusremotewrite/disabled_target_info:
4247
endpoint: "localhost:8888"
4348
target_info:

0 commit comments

Comments
 (0)