Skip to content

Commit e45dbe3

Browse files
[exporter/elasticsearch] report connection health via componentstatus (#39562)
#### Description Report component status of the Elasticsearch exporter based on the response code when making ingestion requests. #### Testing Added unit test confirming status is reported on http response. Also tested manually with the collector and confirmed that the error conditions appear when querying collector health status. --------- Co-authored-by: Andrzej Stencel <[email protected]>
1 parent f432d53 commit e45dbe3

File tree

5 files changed

+143
-0
lines changed

5 files changed

+143
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Report Elasticsearch request success / failure via componentstatus
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39562]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/esclient.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry
66
import (
77
"context"
88
"errors"
9+
"fmt"
910
"io"
1011
"net/http"
1112
"time"
@@ -15,6 +16,7 @@ import (
1516
"github.com/elastic/go-elasticsearch/v8/esapi"
1617
"github.com/klauspost/compress/gzip"
1718
"go.opentelemetry.io/collector/component"
19+
"go.opentelemetry.io/collector/component/componentstatus"
1820
"go.uber.org/zap"
1921

2022
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
@@ -26,6 +28,7 @@ type clientLogger struct {
2628
*zap.Logger
2729
logRequestBody bool
2830
logResponseBody bool
31+
componentHost component.Host
2932
}
3033

3134
// LogRoundTrip should not modify the request or response, except for consuming and closing the body.
@@ -62,13 +65,25 @@ func (cl *clientLogger) LogRoundTrip(requ *http.Request, resp *http.Response, cl
6265
zap.String("status", resp.Status),
6366
)
6467
zl.Debug("Request roundtrip completed.", fields...)
68+
if resp.StatusCode == http.StatusOK {
69+
// Success
70+
componentstatus.ReportStatus(
71+
cl.componentHost, componentstatus.NewEvent(componentstatus.StatusOK))
72+
} else if httpRecoverableErrorStatus(resp.StatusCode) {
73+
err := fmt.Errorf("Elasticsearch request failed: %v", resp.Status)
74+
componentstatus.ReportStatus(
75+
cl.componentHost, componentstatus.NewRecoverableErrorEvent(err))
76+
}
6577

6678
case clientErr != nil:
6779
fields = append(
6880
fields,
6981
zap.NamedError("reason", clientErr),
7082
)
7183
zl.Debug("Request failed.", fields...)
84+
err := fmt.Errorf("Elasticsearch request failed: %w", clientErr)
85+
componentstatus.ReportStatus(
86+
cl.componentHost, componentstatus.NewRecoverableErrorEvent(err))
7287
}
7388

7489
return nil
@@ -111,6 +126,7 @@ func newElasticsearchClient(
111126
Logger: telemetry.Logger,
112127
logRequestBody: config.LogRequestBody,
113128
logResponseBody: config.LogResponseBody,
129+
componentHost: host,
114130
}
115131

116132
return elasticsearchv8.NewClient(elasticsearchv8.Config{
@@ -169,3 +185,11 @@ func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Durati
169185
return expBackoff.NextBackOff()
170186
}
171187
}
188+
189+
func httpRecoverableErrorStatus(statusCode int) bool {
190+
// Elasticsearch uses 409 conflict to report duplicates, which aren't really
191+
// an error state, so those return false (but if we were already in an error
192+
// state, we will still wait until we get an actual 200 OK before changing
193+
// our state back).
194+
return statusCode >= 300 && statusCode != http.StatusConflict
195+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package elasticsearchexporter
5+
6+
import (
7+
"io"
8+
"net/http"
9+
"net/url"
10+
"testing"
11+
"time"
12+
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/component/componentstatus"
17+
"go.uber.org/zap"
18+
)
19+
20+
func TestComponentStatus(t *testing.T) {
21+
statusChan := make(chan *componentstatus.Event, 1)
22+
reporter := &testStatusReporter{statusChan}
23+
esLogger := clientLogger{
24+
Logger: zap.New(nil),
25+
logRequestBody: false,
26+
logResponseBody: false,
27+
componentHost: reporter,
28+
}
29+
30+
// Pass in an error and make sure it's sent to the component status reporter
31+
_ = esLogger.LogRoundTrip(nil, nil, io.EOF, time.Now(), 0)
32+
select {
33+
case event := <-statusChan:
34+
assert.ErrorIs(t, event.Err(), io.EOF, "LogRoundTrip should report a component status error wrapping its error parameter")
35+
assert.Equal(t, componentstatus.StatusRecoverableError, event.Status(), "LogRoundTrip on an error parameter should report a recoverable error")
36+
default:
37+
require.Fail(t, "LogRoundTrip with an error should report a recoverable error status")
38+
}
39+
40+
// Pass in an http error status and make sure it's sent to the component status reporter
41+
_ = esLogger.LogRoundTrip(
42+
&http.Request{URL: &url.URL{}},
43+
&http.Response{StatusCode: http.StatusUnauthorized, Status: "401 Unauthorized"},
44+
nil, time.Now(), 0)
45+
select {
46+
case event := <-statusChan:
47+
err := event.Err()
48+
require.Error(t, err, "LogRoundTrip with an http error status should report a component status error")
49+
assert.Contains(t, err.Error(), "401 Unauthorized", "LogRoundTrip with an http error status should include the status in its error state")
50+
assert.Equal(t, componentstatus.StatusRecoverableError, event.Status(), "LogRoundTrip with an http error status should report a recoverable error")
51+
default:
52+
require.Fail(t, "LogRoundTrip with an http error code should report a recoverable error status")
53+
}
54+
55+
// Pass in a 409 (duplicate document) and make sure it doesn't report a new status
56+
_ = esLogger.LogRoundTrip(
57+
&http.Request{URL: &url.URL{}},
58+
&http.Response{StatusCode: http.StatusConflict, Status: "409 duplicate"},
59+
nil, time.Now(), 0)
60+
select {
61+
case <-statusChan:
62+
assert.Fail(t, "LogRoundTrip with a 409 should not change the component status")
63+
default:
64+
}
65+
66+
// Pass in an http success status and make sure the component status returns to OK
67+
_ = esLogger.LogRoundTrip(
68+
&http.Request{URL: &url.URL{}},
69+
&http.Response{StatusCode: http.StatusOK}, nil, time.Now(), 0)
70+
select {
71+
case event := <-statusChan:
72+
assert.NoError(t, event.Err(), "LogRoundTrip with a success status shouldn't report a component status error")
73+
assert.Equal(t, componentstatus.StatusOK, event.Status(), "LogRoundTrip with a success status should report component status OK")
74+
default:
75+
require.Fail(t, "LogRoundTrip with an http success should report component status OK")
76+
}
77+
}
78+
79+
type testStatusReporter struct {
80+
statusChan chan *componentstatus.Event
81+
}
82+
83+
func (tsr *testStatusReporter) Report(event *componentstatus.Event) {
84+
tsr.statusChan <- event
85+
}
86+
87+
func (tsr *testStatusReporter) GetExtensions() map[component.ID]component.Component {
88+
return make(map[component.ID]component.Component)
89+
}

exporter/elasticsearchexporter/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/tidwall/gjson v1.18.0
1919
go.opentelemetry.io/collector/client v1.31.1-0.20250509035855-4d929a9d6a7e
2020
go.opentelemetry.io/collector/component v1.31.1-0.20250509035855-4d929a9d6a7e
21+
go.opentelemetry.io/collector/component/componentstatus v0.125.1-0.20250509035855-4d929a9d6a7e
2122
go.opentelemetry.io/collector/component/componenttest v0.125.1-0.20250509035855-4d929a9d6a7e
2223
go.opentelemetry.io/collector/config/configauth v0.125.1-0.20250509035855-4d929a9d6a7e
2324
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250509035855-4d929a9d6a7e

exporter/elasticsearchexporter/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)