Skip to content

Commit 1ef15b9

Browse files
authored
[exporter/elasticsearch] Enable gzip compression by default (#35865)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Enable gzip compression by default, at hardcoded level BestSpeed. To disable compression, set `compression` to `none`. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 33a5457 commit 1ef15b9

File tree

11 files changed

+169
-24
lines changed

11 files changed

+169
-24
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Enable gzip compression by default
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35865]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: To disable compression, set config `compression` to `none`.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ service:
7171
7272
### HTTP settings
7373
74-
The Elasticsearch exporter supports common [HTTP Configuration Settings][confighttp], except for `compression` (all requests are uncompressed).
74+
The Elasticsearch exporter supports common [HTTP Configuration Settings][confighttp]. Gzip compression is enabled by default. To disable compression, set `compression` to `none`.
7575
As a consequence of supporting [confighttp], the Elasticsearch exporter also supports common [TLS Configuration Settings][configtls].
7676

7777
The Elasticsearch exporter sets `timeout` (HTTP request timeout) to 90s by default.

exporter/elasticsearchexporter/bulkindexer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
55

66
import (
7+
"compress/gzip"
78
"context"
89
"errors"
910
"io"
@@ -15,6 +16,7 @@ import (
1516

1617
"github.com/elastic/go-docappender/v2"
1718
"github.com/elastic/go-elasticsearch/v7"
19+
"go.opentelemetry.io/collector/config/configcompression"
1820
"go.uber.org/zap"
1921
)
2022

@@ -68,12 +70,17 @@ func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender
6870
maxDocRetries = config.Retry.MaxRetries
6971
}
7072
}
73+
var compressionLevel int
74+
if config.Compression == configcompression.TypeGzip {
75+
compressionLevel = gzip.BestSpeed
76+
}
7177
return docappender.BulkIndexerConfig{
7278
Client: client,
7379
MaxDocumentRetries: maxDocRetries,
7480
Pipeline: config.Pipeline,
7581
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
7682
RequireDataStream: config.MappingMode() == MappingOTel,
83+
CompressionLevel: compressionLevel,
7784
}
7885
}
7986

exporter/elasticsearchexporter/bulkindexer_test.go

Lines changed: 80 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/elastic/go-elasticsearch/v7"
1616
"github.com/stretchr/testify/assert"
1717
"github.com/stretchr/testify/require"
18+
"go.opentelemetry.io/collector/config/confighttp"
1819
"go.uber.org/zap"
1920
"go.uber.org/zap/zapcore"
2021
"go.uber.org/zap/zaptest/observer"
@@ -62,13 +63,8 @@ func TestAsyncBulkIndexer_flushOnClose(t *testing.T) {
6263
}})
6364
require.NoError(t, err)
6465

65-
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &cfg)
66-
require.NoError(t, err)
67-
session, err := bulkIndexer.StartSession(context.Background())
68-
require.NoError(t, err)
66+
bulkIndexer := runBulkIndexerOnce(t, &cfg, client)
6967

70-
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
71-
assert.NoError(t, bulkIndexer.Close(context.Background()))
7268
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
7369
}
7470

@@ -157,13 +153,7 @@ func TestAsyncBulkIndexer_requireDataStream(t *testing.T) {
157153
}})
158154
require.NoError(t, err)
159155

160-
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config)
161-
require.NoError(t, err)
162-
session, err := bulkIndexer.StartSession(context.Background())
163-
require.NoError(t, err)
164-
165-
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
166-
assert.NoError(t, bulkIndexer.Close(context.Background()))
156+
runBulkIndexerOnce(t, &tt.config, client)
167157

168158
assert.Equal(t, tt.wantRequireDataStream, <-requireDataStreamCh)
169159
})
@@ -234,14 +224,15 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
234224

235225
bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg)
236226
require.NoError(t, err)
227+
defer bulkIndexer.Close(context.Background())
228+
237229
session, err := bulkIndexer.StartSession(context.Background())
238230
require.NoError(t, err)
239231

240232
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
241233
// should flush
242234
time.Sleep(100 * time.Millisecond)
243235
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
244-
assert.NoError(t, bulkIndexer.Close(context.Background()))
245236
messages := observed.FilterMessage(tt.wantMessage)
246237
require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All())
247238
for _, wantField := range tt.wantFields {
@@ -250,3 +241,78 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
250241
})
251242
}
252243
}
244+
245+
func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) {
246+
tests := []struct {
247+
name string
248+
config Config
249+
}{
250+
{
251+
name: "compression none",
252+
config: Config{
253+
NumWorkers: 1,
254+
ClientConfig: confighttp.ClientConfig{Compression: "none"},
255+
},
256+
},
257+
{
258+
name: "compression gzip",
259+
config: Config{
260+
NumWorkers: 1,
261+
ClientConfig: confighttp.ClientConfig{Compression: "gzip"},
262+
},
263+
},
264+
}
265+
266+
for _, tt := range tests {
267+
tt := tt
268+
t.Run(tt.name, func(t *testing.T) {
269+
t.Parallel()
270+
271+
loggerCore, logObserver := observer.New(zap.DebugLevel)
272+
273+
esLogger := clientLogger{
274+
Logger: zap.New(loggerCore),
275+
logRequestBody: true,
276+
logResponseBody: true,
277+
}
278+
279+
client, err := elasticsearch.NewClient(elasticsearch.Config{
280+
Transport: &mockTransport{
281+
RoundTripFunc: func(*http.Request) (*http.Response, error) {
282+
return &http.Response{
283+
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
284+
Body: io.NopCloser(strings.NewReader(successResp)),
285+
}, nil
286+
},
287+
},
288+
Logger: &esLogger,
289+
})
290+
require.NoError(t, err)
291+
292+
runBulkIndexerOnce(t, &tt.config, client)
293+
294+
records := logObserver.AllUntimed()
295+
assert.Len(t, records, 2)
296+
297+
assert.Equal(t, "/", records[0].ContextMap()["path"])
298+
assert.Nil(t, records[0].ContextMap()["request_body"])
299+
assert.JSONEq(t, successResp, records[0].ContextMap()["response_body"].(string))
300+
301+
assert.Equal(t, "/_bulk", records[1].ContextMap()["path"])
302+
assert.Equal(t, "{\"create\":{\"_index\":\"foo\"}}\n{\"foo\": \"bar\"}\n", records[1].ContextMap()["request_body"])
303+
assert.JSONEq(t, successResp, records[1].ContextMap()["response_body"].(string))
304+
})
305+
}
306+
}
307+
308+
func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer {
309+
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config)
310+
require.NoError(t, err)
311+
session, err := bulkIndexer.StartSession(context.Background())
312+
require.NoError(t, err)
313+
314+
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
315+
assert.NoError(t, bulkIndexer.Close(context.Background()))
316+
317+
return bulkIndexer
318+
}

exporter/elasticsearchexporter/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"strings"
1313
"time"
1414

15+
"go.opentelemetry.io/collector/config/configcompression"
1516
"go.opentelemetry.io/collector/config/confighttp"
1617
"go.opentelemetry.io/collector/config/configopaque"
1718
"go.opentelemetry.io/collector/exporter/exporterbatcher"
@@ -273,9 +274,8 @@ func (cfg *Config) Validate() error {
273274
return fmt.Errorf("unknown mapping mode %q", cfg.Mapping.Mode)
274275
}
275276

276-
if cfg.Compression != "" {
277-
// TODO support confighttp.ClientConfig.Compression
278-
return errors.New("compression is not currently configurable")
277+
if cfg.Compression != "none" && cfg.Compression != configcompression.TypeGzip {
278+
return errors.New("compression must be one of [none, gzip]")
279279
}
280280

281281
if cfg.Retry.MaxRequests != 0 && cfg.Retry.MaxRetries != 0 {

exporter/elasticsearchexporter/config_test.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package elasticsearchexporter
66
import (
77
"net/http"
88
"path/filepath"
9+
"strings"
910
"testing"
1011
"time"
1112

@@ -38,6 +39,7 @@ func TestConfig(t *testing.T) {
3839

3940
defaultMaxIdleConns := 100
4041
defaultIdleConnTimeout := 90 * time.Second
42+
defaultCompression := configcompression.TypeGzip
4143

4244
tests := []struct {
4345
configFile string
@@ -80,6 +82,7 @@ func TestConfig(t *testing.T) {
8082
cfg.Headers = map[string]configopaque.String{
8183
"myheader": "test",
8284
}
85+
cfg.Compression = defaultCompression
8386
}),
8487
Authentication: AuthenticationSettings{
8588
User: "elastic",
@@ -150,6 +153,7 @@ func TestConfig(t *testing.T) {
150153
cfg.Headers = map[string]configopaque.String{
151154
"myheader": "test",
152155
}
156+
cfg.Compression = defaultCompression
153157
}),
154158
Authentication: AuthenticationSettings{
155159
User: "elastic",
@@ -220,6 +224,7 @@ func TestConfig(t *testing.T) {
220224
cfg.Headers = map[string]configopaque.String{
221225
"myheader": "test",
222226
}
227+
cfg.Compression = defaultCompression
223228
}),
224229
Authentication: AuthenticationSettings{
225230
User: "elastic",
@@ -301,10 +306,29 @@ func TestConfig(t *testing.T) {
301306
cfg.Batcher.Enabled = &enabled
302307
}),
303308
},
309+
{
310+
id: component.NewIDWithName(metadata.Type, "compression_none"),
311+
configFile: "config.yaml",
312+
expected: withDefaultConfig(func(cfg *Config) {
313+
cfg.Endpoint = "https://elastic.example.com:9200"
314+
315+
cfg.Compression = "none"
316+
}),
317+
},
318+
{
319+
id: component.NewIDWithName(metadata.Type, "compression_gzip"),
320+
configFile: "config.yaml",
321+
expected: withDefaultConfig(func(cfg *Config) {
322+
cfg.Endpoint = "https://elastic.example.com:9200"
323+
324+
cfg.Compression = "gzip"
325+
}),
326+
},
304327
}
305328

306329
for _, tt := range tests {
307-
t.Run(tt.id.String(), func(t *testing.T) {
330+
tt := tt
331+
t.Run(strings.ReplaceAll(tt.id.String(), "/", "_"), func(t *testing.T) {
308332
factory := NewFactory()
309333
cfg := factory.CreateDefaultConfig()
310334

@@ -387,9 +411,9 @@ func TestConfig_Validate(t *testing.T) {
387411
"compression unsupported": {
388412
config: withDefaultConfig(func(cfg *Config) {
389413
cfg.Endpoints = []string{"http://test:9200"}
390-
cfg.Compression = configcompression.TypeGzip
414+
cfg.Compression = configcompression.TypeSnappy
391415
}),
392-
err: `compression is not currently configurable`,
416+
err: `compression must be one of [none, gzip]`,
393417
},
394418
"both max_retries and max_requests specified": {
395419
config: withDefaultConfig(func(cfg *Config) {

exporter/elasticsearchexporter/esclient.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/cenkalti/backoff/v4"
1313
"github.com/elastic/go-elasticsearch/v7"
14+
"github.com/klauspost/compress/gzip"
1415
"go.opentelemetry.io/collector/component"
1516
"go.uber.org/zap"
1617

@@ -32,7 +33,14 @@ func (cl *clientLogger) LogRoundTrip(requ *http.Request, resp *http.Response, cl
3233

3334
var fields []zap.Field
3435
if cl.logRequestBody && requ != nil && requ.Body != nil {
35-
if b, err := io.ReadAll(requ.Body); err == nil {
36+
body := requ.Body
37+
if requ.Header.Get("Content-Encoding") == "gzip" {
38+
if r, err := gzip.NewReader(body); err == nil {
39+
defer r.Close()
40+
body = r
41+
}
42+
}
43+
if b, err := io.ReadAll(body); err == nil {
3644
fields = append(fields, zap.ByteString("request_body", b))
3745
}
3846
}

exporter/elasticsearchexporter/factory.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/config/configcompression"
1415
"go.opentelemetry.io/collector/config/confighttp"
1516
"go.opentelemetry.io/collector/consumer"
1617
"go.opentelemetry.io/collector/exporter"
@@ -44,6 +45,7 @@ func createDefaultConfig() component.Config {
4445

4546
httpClientConfig := confighttp.NewDefaultClientConfig()
4647
httpClientConfig.Timeout = 90 * time.Second
48+
httpClientConfig.Compression = configcompression.TypeGzip
4749

4850
return &Config{
4951
QueueSettings: qs,

exporter/elasticsearchexporter/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/elastic/go-elasticsearch/v7 v7.17.10
99
github.com/elastic/go-structform v0.0.12
1010
github.com/json-iterator/go v1.1.12
11+
github.com/klauspost/compress v1.17.10
1112
github.com/lestrrat-go/strftime v1.1.0
1213
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.111.0
1314
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0
@@ -44,7 +45,6 @@ require (
4445
github.com/golang/snappy v0.0.4 // indirect
4546
github.com/google/uuid v1.6.0 // indirect
4647
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
47-
github.com/klauspost/compress v1.17.10 // indirect
4848
github.com/knadh/koanf/maps v0.1.1 // indirect
4949
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
5050
github.com/knadh/koanf/v2 v2.1.1 // indirect

exporter/elasticsearchexporter/testdata/config.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,9 @@ elasticsearch/batcher_disabled:
8686
endpoint: https://elastic.example.com:9200
8787
batcher:
8888
enabled: false
89+
elasticsearch/compression_none:
90+
endpoint: https://elastic.example.com:9200
91+
compression: none
92+
elasticsearch/compression_gzip:
93+
endpoint: https://elastic.example.com:9200
94+
compression: gzip

exporter/elasticsearchexporter/utils_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"testing"
1717
"time"
1818

19+
"github.com/klauspost/compress/gzip"
1920
"github.com/stretchr/testify/assert"
2021
"go.opentelemetry.io/collector/pdata/pcommon"
2122
"go.opentelemetry.io/collector/pdata/plog"
@@ -160,7 +161,11 @@ func newESTestServer(t *testing.T, bulkHandler bulkHandler) *httptest.Server {
160161
tsStart := time.Now()
161162
var items []itemRequest
162163

163-
dec := json.NewDecoder(req.Body)
164+
body := req.Body
165+
if req.Header.Get("Content-Encoding") == "gzip" {
166+
body, _ = gzip.NewReader(req.Body)
167+
}
168+
dec := json.NewDecoder(body)
164169
for dec.More() {
165170
var action, doc json.RawMessage
166171
if err := dec.Decode(&action); err != nil {

0 commit comments

Comments
 (0)