Skip to content

Commit 7a19fb3

Browse files
carsoniplahsivjarcodeboten
authored
[exporter/elasticsearch] Add retry.retry_on_status config (#32585)
Previously, the status codes that trigger retries were hardcoded to be 429, 500, 502, 503, 504. It is now configurable using `retry.retry_on_status`, and defaults to `[429, 500, 502, 503, 504]` to avoid a breaking change. To avoid duplicates, it is recommended to configure `retry.retry_on_status` to `[429]`, which would be the default in a future version. Part of #32584 --------- Co-authored-by: Vishal Raj <[email protected]> Co-authored-by: Alex Boten <[email protected]>
1 parent e6fd2f9 commit 7a19fb3

File tree

12 files changed

+73
-13
lines changed

12 files changed

+73
-13
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add retry.retry_on_status config
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [32584]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Previously, the status codes that trigger retries were hardcoded to be 429, 500, 502, 503, 504.
20+
It is now configurable using `retry.retry_on_status`, and defaults to `[429, 500, 502, 503, 504]` to avoid a breaking change.
21+
To avoid duplicates, it is recommended to configure `retry.retry_on_status` to `[429]`, which would be the default in a future version.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

exporter/elasticsearchexporter/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](
5858
- `max_requests` (default=3): Number of HTTP request retries.
5959
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
6060
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
61+
- `retry_on_status` (default=[429, 500, 502, 503, 504]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it is recommended to set it to `[429]`. WARNING: The default will be changed to `[429]` in the future.
6162
- `mapping`: Events are encoded to JSON. The `mapping` allows users to
6263
configure additional mapping rules.
6364
- `mode` (default=none): The fields naming mode. valid modes are:

exporter/elasticsearchexporter/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ type RetrySettings struct {
151151

152152
// MaxInterval configures the max waiting time if consecutive requests failed.
153153
MaxInterval time.Duration `mapstructure:"max_interval"`
154+
155+
// RetryOnStatus configures the status codes that trigger request or document level retries.
156+
RetryOnStatus []int `mapstructure:"retry_on_status"`
154157
}
155158

156159
type MappingsSettings struct {

exporter/elasticsearchexporter/config_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package elasticsearchexporter
55

66
import (
7+
"net/http"
78
"path/filepath"
89
"testing"
910
"time"
@@ -61,6 +62,13 @@ func TestLoad_DeprecatedIndexConfigOption(t *testing.T) {
6162
MaxRequests: 5,
6263
InitialInterval: 100 * time.Millisecond,
6364
MaxInterval: 1 * time.Minute,
65+
RetryOnStatus: []int{
66+
http.StatusTooManyRequests,
67+
http.StatusInternalServerError,
68+
http.StatusBadGateway,
69+
http.StatusServiceUnavailable,
70+
http.StatusGatewayTimeout,
71+
},
6472
},
6573
Mapping: MappingsSettings{
6674
Mode: "none",
@@ -136,6 +144,7 @@ func TestLoadConfig(t *testing.T) {
136144
MaxRequests: 5,
137145
InitialInterval: 100 * time.Millisecond,
138146
MaxInterval: 1 * time.Minute,
147+
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
139148
},
140149
Mapping: MappingsSettings{
141150
Mode: "none",
@@ -186,6 +195,7 @@ func TestLoadConfig(t *testing.T) {
186195
MaxRequests: 5,
187196
InitialInterval: 100 * time.Millisecond,
188197
MaxInterval: 1 * time.Minute,
198+
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
189199
},
190200
Mapping: MappingsSettings{
191201
Mode: "none",

exporter/elasticsearchexporter/elasticsearch_bulk.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurren
103103
Header: headers,
104104

105105
// configure retry behavior
106-
RetryOnStatus: retryOnStatus,
106+
RetryOnStatus: config.Retry.RetryOnStatus,
107107
DisableRetry: retryDisabled,
108108
EnableRetryOnTimeout: config.Retry.Enabled,
109109
//RetryOnError: retryOnError, // should be used from esclient version 8 onwards
@@ -175,7 +175,7 @@ func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Durati
175175
}
176176
}
177177

178-
func shouldRetryEvent(status int) bool {
178+
func shouldRetryEvent(status int, retryOnStatus []int) bool {
179179
for _, retryable := range retryOnStatus {
180180
if status == retryable {
181181
return true
@@ -184,15 +184,15 @@ func shouldRetryEvent(status int) bool {
184184
return false
185185
}
186186

187-
func pushDocuments(ctx context.Context, logger *zap.Logger, index string, document []byte, bulkIndexer esBulkIndexerCurrent, maxAttempts int) error {
187+
func pushDocuments(ctx context.Context, logger *zap.Logger, index string, document []byte, bulkIndexer esBulkIndexerCurrent, maxAttempts int, retryOnStatus []int) error {
188188
attempts := 1
189189
body := bytes.NewReader(document)
190190
item := esBulkIndexerItem{Action: createAction, Index: index, Body: body}
191191
// Setup error handler. The handler handles the per item response status based on the
192192
// selective ACKing in the bulk response.
193193
item.OnFailure = func(ctx context.Context, item esBulkIndexerItem, resp esBulkIndexerResponseItem, err error) {
194194
switch {
195-
case attempts < maxAttempts && shouldRetryEvent(resp.Status):
195+
case attempts < maxAttempts && shouldRetryEvent(resp.Status, retryOnStatus):
196196
logger.Debug("Retrying to index",
197197
zap.String("name", index),
198198
zap.Int("attempt", attempts),

exporter/elasticsearchexporter/factory.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry
88
import (
99
"context"
1010
"fmt"
11+
"net/http"
1112
"runtime"
1213
"time"
1314

@@ -51,6 +52,13 @@ func createDefaultConfig() component.Config {
5152
MaxRequests: 3,
5253
InitialInterval: 100 * time.Millisecond,
5354
MaxInterval: 1 * time.Minute,
55+
RetryOnStatus: []int{
56+
http.StatusTooManyRequests,
57+
http.StatusInternalServerError,
58+
http.StatusBadGateway,
59+
http.StatusServiceUnavailable,
60+
http.StatusGatewayTimeout,
61+
},
5462
},
5563
Mapping: MappingsSettings{
5664
Mode: "none",

exporter/elasticsearchexporter/integrationtest/datareceiver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume
137137

138138
if err := next.ConsumeLogs(context.Background(), logs); err != nil {
139139
response.HasErrors = true
140-
item.Status = http.StatusInternalServerError
140+
item.Status = http.StatusTooManyRequests
141141
item.Error.Type = "simulated_es_error"
142142
item.Error.Reason = err.Error()
143143
}

exporter/elasticsearchexporter/logs_exporter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,13 @@ type elasticsearchLogsExporter struct {
2323
logstashFormat LogstashFormatSettings
2424
dynamicIndex bool
2525
maxAttempts int
26+
retryOnStatus []int
2627

2728
client *esClientCurrent
2829
bulkIndexer esBulkIndexerCurrent
2930
model mappingModel
3031
}
3132

32-
var retryOnStatus = []int{500, 502, 503, 504, 429}
33-
3433
const createAction = "create"
3534

3635
func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporter, error) {
@@ -71,6 +70,7 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporte
7170
index: indexStr,
7271
dynamicIndex: cfg.LogsDynamicIndex.Enabled,
7372
maxAttempts: maxAttempts,
73+
retryOnStatus: cfg.Retry.RetryOnStatus,
7474
model: model,
7575
logstashFormat: cfg.LogstashFormat,
7676
}
@@ -129,5 +129,5 @@ func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource
129129
if err != nil {
130130
return fmt.Errorf("Failed to encode log event: %w", err)
131131
}
132-
return pushDocuments(ctx, e.logger, fIndex, document, e.bulkIndexer, e.maxAttempts)
132+
return pushDocuments(ctx, e.logger, fIndex, document, e.bulkIndexer, e.maxAttempts, e.retryOnStatus)
133133
}

exporter/elasticsearchexporter/logs_exporter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func TestExporter_PushEvent(t *testing.T) {
328328
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
329329
if failures == 0 {
330330
failures++
331-
return nil, &httpTestError{message: "oops"}
331+
return nil, &httpTestError{status: http.StatusTooManyRequests, message: "oops"}
332332
}
333333

334334
rec.Record(docs)
@@ -510,7 +510,7 @@ func withTestExporterConfig(fns ...func(*Config)) func(string) *Config {
510510
}
511511

512512
func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents string) {
513-
err := pushDocuments(context.TODO(), zap.L(), exporter.index, []byte(contents), exporter.bulkIndexer, exporter.maxAttempts)
513+
err := pushDocuments(context.TODO(), zap.L(), exporter.index, []byte(contents), exporter.bulkIndexer, exporter.maxAttempts, exporter.retryOnStatus)
514514
require.NoError(t, err)
515515
}
516516

exporter/elasticsearchexporter/testdata/config.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ elasticsearch/trace:
1919
bytes: 10485760
2020
retry:
2121
max_requests: 5
22+
retry_on_status:
23+
- 429
24+
- 500
2225
elasticsearch/log:
2326
tls:
2427
insecure: false
@@ -38,6 +41,9 @@ elasticsearch/log:
3841
bytes: 10485760
3942
retry:
4043
max_requests: 5
44+
retry_on_status:
45+
- 429
46+
- 500
4147
sending_queue:
4248
enabled: true
4349
elasticsearch/logstash_format:

0 commit comments

Comments
 (0)