Skip to content

Commit e0bac79

Browse files
authored
[elasticsearchexporter] Use new exporterbatcher.SizeConfig (#38243)
#### Description Updated BatcherConfig to support the new SizeConfig, which replaces the now-deprecated MinSizeItems and MaxSizeItems. BatcherConfig now embeds the exporterbatcher.Config and sets an additional unexported field to track whether `batcher::enabled` has been set. See open-telemetry/opentelemetry-collector#12486 #### Link to tracking issue N/A #### Testing Added unit test #### Documentation Updated README.
1 parent 825ef93 commit e0bac79

File tree

8 files changed

+125
-51
lines changed

8 files changed

+125
-51
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Deprecate `batcher::min_size_items` and `batcher::max_size_items` in favor of `batcher::min_size` and `batcher::max_size`.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38243]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,11 @@ The Elasticsearch exporter supports the [common `batcher` settings](https://gith
9494

9595
- `batcher`:
9696
- `enabled` (default=unset): Enable batching of requests into 1 or more bulk requests. On a batcher flush, it is possible for a batched request to be translated to more than 1 bulk request due to `flush::bytes`.
97-
- `min_size_items` (default=5000): Minimum number of log records / spans / data points in the batched request to immediately trigger a batcher flush.
98-
- `max_size_items` (default=0): Maximum number of log records / spans / data points in a batched request. To limit bulk request size, configure `flush::bytes` instead. :warning: It is recommended to keep `max_size_items` as 0 as a non-zero value may lead to broken metrics grouping and indexing rejections.
97+
- `sizer` (default=items): Unit of `min_size` and `max_size`. Currently supports only "items", in the future will also support "bytes".
98+
- `min_size` (default=5000): Minimum batch size to be exported to Elasticsearch, measured in units according to `batcher::sizer`.
99+
- `max_size` (default=0): Maximum batch size to be exported to Elasticsearch, measured in units according to `batcher::sizer`. To limit bulk request size, configure `flush::bytes` instead. :warning: It is recommended to keep `max_size` as 0 as a non-zero value may lead to broken metrics grouping and indexing rejections.
100+
- `min_size_items` (DEPRECATED, use `batcher::min_size` instead): Minimum number of log records / spans / data points in the batched request to immediately trigger a batcher flush.
101+
- `max_size_items` (DEPRECATED, use `batcher::max_size` instead): Maximum number of log records / spans / data points in a batched request.
99102
- `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the batcher buffer, aka "max age of batcher buffer". A batcher flush will happen regardless of the size of content in batcher buffer.
100103

101104
By default, the exporter will perform its own buffering and batching, as configured through the

exporter/elasticsearchexporter/bulkindexer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type bulkIndexerSession interface {
5555
const defaultMaxRetries = 2
5656

5757
func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) (bulkIndexer, error) {
58-
if config.Batcher.Enabled != nil {
58+
if config.Batcher.enabledSet {
5959
return newSyncBulkIndexer(logger, client, config, requireDataStream), nil
6060
}
6161
return newAsyncBulkIndexer(logger, client, config, requireDataStream)

exporter/elasticsearchexporter/config.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.opentelemetry.io/collector/config/configcompression"
1616
"go.opentelemetry.io/collector/config/confighttp"
1717
"go.opentelemetry.io/collector/config/configopaque"
18+
"go.opentelemetry.io/collector/confmap"
1819
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1920
"go.opentelemetry.io/collector/exporter/exporterhelper"
2021
"go.uber.org/zap"
@@ -88,16 +89,20 @@ type Config struct {
8889
// This is a slightly modified version of exporterbatcher.Config,
8990
// to enable tri-state Enabled: unset, false, true.
9091
type BatcherConfig struct {
91-
// Enabled indicates whether to enqueue batches before sending
92-
// to the exporter. If Enabled is specified (non-nil),
93-
// then the exporter will not perform any buffering itself.
94-
Enabled *bool `mapstructure:"enabled"`
92+
exporterbatcher.Config `mapstructure:",squash"`
9593

96-
// FlushTimeout sets the time after which a batch will be sent regardless of its size.
97-
FlushTimeout time.Duration `mapstructure:"flush_timeout"`
94+
// enabledSet tracks whether Enabled has been specified.
95+
// If enabledSet is false, the exporter will perform its
96+
// own buffering.
97+
enabledSet bool `mapstructure:"-"`
98+
}
9899

99-
exporterbatcher.MinSizeConfig `mapstructure:",squash"`
100-
exporterbatcher.MaxSizeConfig `mapstructure:",squash"`
100+
func (c *BatcherConfig) Unmarshal(conf *confmap.Conf) error {
101+
if err := conf.Unmarshal(c); err != nil {
102+
return err
103+
}
104+
c.enabledSet = conf.IsSet("enabled")
105+
return nil
101106
}
102107

103108
type TelemetrySettings struct {

exporter/elasticsearchexporter/config_test.go

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,12 @@ func TestConfig(t *testing.T) {
117117
DateFormat: "%Y.%m.%d",
118118
},
119119
Batcher: BatcherConfig{
120-
FlushTimeout: 30 * time.Second,
121-
MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck
122-
MinSizeItems: &defaultBatcherMinSizeItems,
123-
},
124-
MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck
125-
MaxSizeItems: nil,
120+
Config: exporterbatcher.Config{
121+
FlushTimeout: 30 * time.Second,
122+
SizeConfig: exporterbatcher.SizeConfig{
123+
Sizer: exporterbatcher.SizerTypeItems,
124+
MinSize: defaultBatcherMinSizeItems,
125+
},
126126
},
127127
},
128128
},
@@ -191,12 +191,12 @@ func TestConfig(t *testing.T) {
191191
DateFormat: "%Y.%m.%d",
192192
},
193193
Batcher: BatcherConfig{
194-
FlushTimeout: 30 * time.Second,
195-
MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck
196-
MinSizeItems: &defaultBatcherMinSizeItems,
197-
},
198-
MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck
199-
MaxSizeItems: nil,
194+
Config: exporterbatcher.Config{
195+
FlushTimeout: 30 * time.Second,
196+
SizeConfig: exporterbatcher.SizeConfig{
197+
Sizer: exporterbatcher.SizerTypeItems,
198+
MinSize: defaultBatcherMinSizeItems,
199+
},
200200
},
201201
},
202202
},
@@ -265,12 +265,12 @@ func TestConfig(t *testing.T) {
265265
DateFormat: "%Y.%m.%d",
266266
},
267267
Batcher: BatcherConfig{
268-
FlushTimeout: 30 * time.Second,
269-
MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck
270-
MinSizeItems: &defaultBatcherMinSizeItems,
271-
},
272-
MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck
273-
MaxSizeItems: nil,
268+
Config: exporterbatcher.Config{
269+
FlushTimeout: 30 * time.Second,
270+
SizeConfig: exporterbatcher.SizeConfig{
271+
Sizer: exporterbatcher.SizerTypeItems,
272+
MinSize: defaultBatcherMinSizeItems,
273+
},
274274
},
275275
},
276276
},
@@ -305,8 +305,8 @@ func TestConfig(t *testing.T) {
305305
expected: withDefaultConfig(func(cfg *Config) {
306306
cfg.Endpoint = "https://elastic.example.com:9200"
307307

308-
enabled := false
309-
cfg.Batcher.Enabled = &enabled
308+
cfg.Batcher.Enabled = false
309+
cfg.Batcher.enabledSet = true
310310
}),
311311
},
312312
{
@@ -327,6 +327,36 @@ func TestConfig(t *testing.T) {
327327
cfg.Compression = "gzip"
328328
}),
329329
},
330+
{
331+
id: component.NewIDWithName(metadata.Type, "batcher_minmax_size_items"),
332+
configFile: "config.yaml",
333+
expected: withDefaultConfig(func(cfg *Config) {
334+
cfg.Endpoint = "https://elastic.example.com:9200"
335+
336+
cfg.Batcher.MinSize = 100
337+
cfg.Batcher.MaxSize = 200
338+
cfg.Batcher.MinSizeItems = &cfg.Batcher.MinSize //nolint:staticcheck
339+
cfg.Batcher.MaxSizeItems = &cfg.Batcher.MaxSize //nolint:staticcheck
340+
}),
341+
},
342+
{
343+
id: component.NewIDWithName(metadata.Type, "batcher_minmax_size"),
344+
configFile: "config.yaml",
345+
expected: withDefaultConfig(func(cfg *Config) {
346+
cfg.Endpoint = "https://elastic.example.com:9200"
347+
348+
cfg.Batcher.MinSize = 100
349+
cfg.Batcher.MaxSize = 200
350+
351+
// TODO uncomment setting min/max_size_items in config.yaml
352+
// and uncomment the below, when the fix to ignore those fields
353+
// is brought into contrib.
354+
// minSizeItems := 300
355+
// maxSizeItems := 400
356+
// cfg.Batcher.MinSizeItems = &minSizeItems
357+
// cfg.Batcher.MaxSizeItems = &maxSizeItems
358+
}),
359+
},
330360
}
331361

332362
for _, tt := range tests {

exporter/elasticsearchexporter/exporter_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.opentelemetry.io/collector/config/configauth"
2525
"go.opentelemetry.io/collector/config/configopaque"
2626
"go.opentelemetry.io/collector/exporter"
27+
"go.opentelemetry.io/collector/exporter/exporterbatcher"
2728
"go.opentelemetry.io/collector/exporter/exportertest"
2829
"go.opentelemetry.io/collector/extension/auth/authtest"
2930
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -772,12 +773,11 @@ func TestExporterLogs(t *testing.T) {
772773

773774
cfgs := map[string]func(*Config){
774775
"async": func(cfg *Config) {
775-
batcherEnabled := false
776-
cfg.Batcher.Enabled = &batcherEnabled
776+
cfg.Batcher.enabledSet = false
777777
},
778778
"sync": func(cfg *Config) {
779-
batcherEnabled := true
780-
cfg.Batcher.Enabled = &batcherEnabled
779+
cfg.Batcher.enabledSet = true
780+
cfg.Batcher.Enabled = true
781781
cfg.Batcher.FlushTimeout = 10 * time.Millisecond
782782
},
783783
}
@@ -1965,9 +1965,12 @@ func TestExporterAuth(t *testing.T) {
19651965
func TestExporterBatcher(t *testing.T) {
19661966
var requests []*http.Request
19671967
testauthID := component.NewID(component.MustNewType("authtest"))
1968-
batcherEnabled := false // sync bulk indexer is used without batching
19691968
exporter := newUnstartedTestLogsExporter(t, "http://testing.invalid", func(cfg *Config) {
1970-
cfg.Batcher = BatcherConfig{Enabled: &batcherEnabled}
1969+
cfg.Batcher = BatcherConfig{
1970+
// sync bulk indexer is used without batching
1971+
Config: exporterbatcher.Config{Enabled: false},
1972+
enabledSet: true,
1973+
}
19711974
cfg.Auth = &configauth.Authentication{AuthenticatorID: testauthID}
19721975
cfg.Retry.Enabled = false
19731976
})

exporter/elasticsearchexporter/factory.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,12 @@ func createDefaultConfig() component.Config {
9494
LogResponseBody: false,
9595
},
9696
Batcher: BatcherConfig{
97-
FlushTimeout: 30 * time.Second,
98-
MinSizeConfig: exporterbatcher.MinSizeConfig{ //nolint:staticcheck
99-
MinSizeItems: &defaultBatcherMinSizeItems,
100-
},
101-
MaxSizeConfig: exporterbatcher.MaxSizeConfig{ //nolint:staticcheck
102-
MaxSizeItems: nil,
97+
Config: exporterbatcher.Config{
98+
FlushTimeout: 30 * time.Second,
99+
SizeConfig: exporterbatcher.SizeConfig{
100+
Sizer: exporterbatcher.SizerTypeItems,
101+
MinSize: defaultBatcherMinSizeItems,
102+
},
103103
},
104104
},
105105
Flush: FlushSettings{
@@ -203,14 +203,8 @@ func exporterhelperOptions(
203203
exporterhelper.WithShutdown(shutdown),
204204
exporterhelper.WithQueue(cfg.QueueSettings),
205205
}
206-
if cfg.Batcher.Enabled != nil {
207-
batcherConfig := exporterbatcher.Config{
208-
Enabled: *cfg.Batcher.Enabled,
209-
FlushTimeout: cfg.Batcher.FlushTimeout,
210-
MinSizeConfig: cfg.Batcher.MinSizeConfig,
211-
MaxSizeConfig: cfg.Batcher.MaxSizeConfig,
212-
}
213-
opts = append(opts, exporterhelper.WithBatcher(batcherConfig))
206+
if cfg.Batcher.enabledSet {
207+
opts = append(opts, exporterhelper.WithBatcher(cfg.Batcher.Config))
214208

215209
// Effectively disable timeout_sender because timeout is enforced in bulk indexer.
216210
//

exporter/elasticsearchexporter/testdata/config.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,15 @@ elasticsearch/compression_none:
8989
elasticsearch/compression_gzip:
9090
endpoint: https://elastic.example.com:9200
9191
compression: gzip
92+
elasticsearch/batcher_minmax_size_items:
93+
endpoint: https://elastic.example.com:9200
94+
batcher:
95+
min_size_items: 100
96+
max_size_items: 200
97+
elasticsearch/batcher_minmax_size:
98+
endpoint: https://elastic.example.com:9200
99+
batcher:
100+
min_size: 100
101+
max_size: 200
102+
#min_size_items: 300 # min_size is set, does not carry over
103+
#max_size_items: 400 # max_size is set, does not carry over

0 commit comments

Comments
 (0)