Skip to content

Commit d6f2dfa

Browse files
rapphilcparkins
authored andcommitted
[exporter/prometheusremotewrite] Fix: Validate context is canceled during retries (open-telemetry#30308)
**Description:** Validates that the context is canceled in order to avoid unnecessary retries in the prometheus remote write exporter **Link to tracking Issue:** No tracking issue. **Testing:** Unit tests were added. --------- Signed-off-by: Raphael Silva <[email protected]>
1 parent 4561135 commit d6f2dfa

File tree

3 files changed

+110
-57
lines changed

3 files changed

+110
-57
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: prometheusremotewriteexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Check if the context was canceled by a timeout in the component level to avoid unnecessary retries.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [30308]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/prometheusremotewriteexporter/exporter.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,15 @@ func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ
236236

237237
// executeFunc can be used for backoff and non backoff scenarios.
238238
executeFunc := func() error {
239+
// check there was no timeout in the component level to avoid retries
240+
// to continue to run after a timeout
241+
select {
242+
case <-ctx.Done():
243+
return backoff.Permanent(ctx.Err())
244+
default:
245+
// continue
246+
}
247+
239248
// Create the HTTP POST request to send to the endpoint
240249
req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
241250
if err != nil {

exporter/prometheusremotewriteexporter/exporter_test.go

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -998,69 +998,86 @@ func TestWALOnExporterRoundTrip(t *testing.T) {
998998
assert.Equal(t, gotFromWAL, gotFromUpload)
999999
}
10001000

1001-
func TestRetryOn5xx(t *testing.T) {
1002-
// Create a mock HTTP server with a counter to simulate a 5xx error on the first attempt and a 2xx success on the second attempt
1003-
attempts := 0
1004-
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1005-
if attempts < 4 {
1006-
attempts++
1007-
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
1008-
} else {
1009-
w.WriteHeader(http.StatusOK)
1010-
}
1011-
}))
1012-
defer mockServer.Close()
1013-
1014-
endpointURL, err := url.Parse(mockServer.URL)
1015-
require.NoError(t, err)
1016-
1017-
// Create the prwExporter
1018-
exporter := &prwExporter{
1019-
endpointURL: endpointURL,
1020-
client: http.DefaultClient,
1021-
retrySettings: configretry.BackOffConfig{
1022-
Enabled: true,
1023-
},
1024-
}
1025-
1026-
ctx := context.Background()
1027-
1028-
// Execute the write request and verify that the exporter returns a non-permanent error on the first attempt.
1029-
err = exporter.execute(ctx, &prompb.WriteRequest{})
1030-
assert.NoError(t, err)
1031-
assert.Equal(t, 4, attempts)
1001+
func canceledContext() context.Context {
1002+
ctx, cancel := context.WithCancel(context.Background())
1003+
cancel()
1004+
return ctx
10321005
}
10331006

1034-
func TestNoRetryOn4xx(t *testing.T) {
1035-
// Create a mock HTTP server with a counter to simulate a 4xx error
1036-
attempts := 0
1037-
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1038-
if attempts < 1 {
1039-
attempts++
1040-
http.Error(w, "Bad Request", http.StatusBadRequest)
1041-
} else {
1042-
w.WriteHeader(http.StatusOK)
1043-
}
1044-
}))
1045-
defer mockServer.Close()
1007+
func assertPermanentConsumerError(t assert.TestingT, err error, _ ...any) bool {
1008+
return assert.True(t, consumererror.IsPermanent(err), "error should be consumererror.Permanent")
1009+
}
10461010

1047-
endpointURL, err := url.Parse(mockServer.URL)
1048-
require.NoError(t, err)
1011+
func TestRetries(t *testing.T) {
10491012

1050-
// Create the prwExporter
1051-
exporter := &prwExporter{
1052-
endpointURL: endpointURL,
1053-
client: http.DefaultClient,
1054-
retrySettings: configretry.BackOffConfig{
1055-
Enabled: true,
1013+
tts := []struct {
1014+
name string
1015+
serverErrorCount int // number of times server should return error
1016+
expectedAttempts int
1017+
httpStatus int
1018+
assertError assert.ErrorAssertionFunc
1019+
assertErrorType assert.ErrorAssertionFunc
1020+
ctx context.Context
1021+
}{
1022+
{
1023+
"test 5xx should retry",
1024+
3,
1025+
4,
1026+
http.StatusInternalServerError,
1027+
assert.NoError,
1028+
assert.NoError,
1029+
context.Background(),
1030+
},
1031+
{
1032+
"test 4xx should not retry",
1033+
4,
1034+
1,
1035+
http.StatusBadRequest,
1036+
assert.Error,
1037+
assertPermanentConsumerError,
1038+
context.Background(),
1039+
},
1040+
{
1041+
"test timeout context should not execute",
1042+
4,
1043+
0,
1044+
http.StatusInternalServerError,
1045+
assert.Error,
1046+
assertPermanentConsumerError,
1047+
canceledContext(),
10561048
},
10571049
}
10581050

1059-
ctx := context.Background()
1051+
for _, tt := range tts {
1052+
t.Run(tt.name, func(t *testing.T) {
1053+
totalAttempts := 0
1054+
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1055+
if totalAttempts < tt.serverErrorCount {
1056+
http.Error(w, http.StatusText(tt.httpStatus), tt.httpStatus)
1057+
} else {
1058+
w.WriteHeader(http.StatusOK)
1059+
}
1060+
totalAttempts++
1061+
},
1062+
))
1063+
defer mockServer.Close()
1064+
1065+
endpointURL, err := url.Parse(mockServer.URL)
1066+
require.NoError(t, err)
1067+
1068+
// Create the prwExporter
1069+
exporter := &prwExporter{
1070+
endpointURL: endpointURL,
1071+
client: http.DefaultClient,
1072+
retrySettings: configretry.BackOffConfig{
1073+
Enabled: true,
1074+
},
1075+
}
10601076

1061-
// Execute the write request and verify that the exporter returns an error due to the 4xx response.
1062-
err = exporter.execute(ctx, &prompb.WriteRequest{})
1063-
assert.Error(t, err)
1064-
assert.True(t, consumererror.IsPermanent(err))
1065-
assert.Equal(t, 1, attempts)
1077+
err = exporter.execute(tt.ctx, &prompb.WriteRequest{})
1078+
tt.assertError(t, err)
1079+
tt.assertErrorType(t, err)
1080+
assert.Equal(t, tt.expectedAttempts, totalAttempts)
1081+
})
1082+
}
10661083
}

0 commit comments

Comments
 (0)