Skip to content

Commit 9987999

Browse files
dmathieuflorianlcarsoniprockdabootandrzej-stencel
authored
[exporter/elasticsearch] Support for profiles export (#37567)
This adds support for exporting profiles within the elasticsearch exporter. --------- Co-authored-by: Florian Lehner <[email protected]> Co-authored-by: Carson Ip <[email protected]> Co-authored-by: Tim Rühsen <[email protected]> Co-authored-by: Andrzej Stencel <[email protected]>
1 parent 19a5f29 commit 9987999

File tree

23 files changed

+2097
-103
lines changed

23 files changed

+2097
-103
lines changed

.chloggen/elasticsearch-profiles.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add profiles support to elasticsearch exporter
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37567]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/README.md

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<!-- status autogenerated section -->
44
| Status | |
55
| ------------- |-----------|
6-
| Stability | [development]: metrics |
6+
| Stability | [development]: metrics, profiles |
77
| | [beta]: traces, logs |
88
| Distributions | [contrib] |
99
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Felasticsearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Felasticsearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Felasticsearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Felasticsearch) |
@@ -14,7 +14,7 @@
1414
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
1515
<!-- end autogenerated section -->
1616

17-
This exporter supports sending logs, metrics and traces to [Elasticsearch](https://www.elastic.co/elasticsearch).
17+
This exporter supports sending logs, metrics, traces and profiles to [Elasticsearch](https://www.elastic.co/elasticsearch).
1818

1919
The Exporter is API-compatible with Elasticsearch 7.17.x and 8.x. Certain features of the exporter,
2020
such as the `otel` mapping mode, may require newer versions of Elasticsearch. Limited effort will
@@ -246,6 +246,26 @@ The metric types supported are:
246246
- Exponential histogram (Delta temporality only)
247247
- Summary
248248

249+
## Exporting profiles
250+
251+
Profiles support is currently in development, and should not be used in
252+
production. Profiles only support the OTel mapping mode.
253+
254+
Example:
255+
256+
```yaml
257+
exporters:
258+
elasticsearch:
259+
endpoint: https://elastic.example.com:9200
260+
mapping:
261+
mode: otel
262+
```
263+
264+
> [!IMPORTANT]
265+
> For the Elasticsearch Exporter to be able to export Profiles data, Universal Profiling needs to be installed in the database.
266+
> See [the Universal Profiling getting started documentation](https://www.elastic.co/guide/en/observability/current/profiling-get-started.html)
267+
> You will need to use the Elasticsearch endpoint, with an [Elasticsearch API key](https://www.elastic.co/guide/en/kibana/current/api-keys.html).
268+
249269
[confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings
250270
[configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings
251271
[configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration
@@ -355,7 +375,7 @@ In case the record contains `timestamp`, this value is used. Otherwise, the `obs
355375

356376
## Setting a document id dynamically
357377

358-
The `logs_dynamic_id` setting allows users to set the document ID dynamically based on a log record attribute.
378+
The `logs_dynamic_id` setting allows users to set the document ID dynamically based on a log record attribute.
359379
Besides the ability to control the document ID, this setting also works as a deduplication mechanism, as Elasticsearch will refuse to index a document with the same ID.
360380

361381
The log record attribute `elasticsearch.document_id` can be set explicitly by a processor based on the log record.

exporter/elasticsearchexporter/attribute.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import "go.opentelemetry.io/collector/pdata/pcommon"
77

88
// dynamic index attribute key constants
99
const (
10-
indexPrefix = "elasticsearch.index.prefix"
11-
indexSuffix = "elasticsearch.index.suffix"
12-
defaultDataStreamDataset = "generic"
13-
defaultDataStreamNamespace = "default"
14-
defaultDataStreamTypeLogs = "logs"
15-
defaultDataStreamTypeMetrics = "metrics"
16-
defaultDataStreamTypeTraces = "traces"
10+
indexPrefix = "elasticsearch.index.prefix"
11+
indexSuffix = "elasticsearch.index.suffix"
12+
defaultDataStreamDataset = "generic"
13+
defaultDataStreamNamespace = "default"
14+
defaultDataStreamTypeLogs = "logs"
15+
defaultDataStreamTypeMetrics = "metrics"
16+
defaultDataStreamTypeTraces = "traces"
17+
defaultDataStreamTypeProfiles = "profiles"
1718
)
1819

1920
func getFromAttributes(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) {

exporter/elasticsearchexporter/bulkindexer.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type bulkIndexer interface {
3131

3232
type bulkIndexerSession interface {
3333
// Add adds a document to the bulk indexing session.
34-
Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error
34+
Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error
3535

3636
// End must be called on the session object once it is no longer
3737
// needed, in order to release any associated resources.
@@ -55,14 +55,14 @@ type bulkIndexerSession interface {
5555

5656
const defaultMaxRetries = 2
5757

58-
func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) (bulkIndexer, error) {
58+
func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) (bulkIndexer, error) {
5959
if config.Batcher.Enabled != nil {
60-
return newSyncBulkIndexer(logger, client, config), nil
60+
return newSyncBulkIndexer(logger, client, config, requireDataStream), nil
6161
}
62-
return newAsyncBulkIndexer(logger, client, config)
62+
return newAsyncBulkIndexer(logger, client, config, requireDataStream)
6363
}
6464

65-
func bulkIndexerConfig(client esapi.Transport, config *Config) docappender.BulkIndexerConfig {
65+
func bulkIndexerConfig(client esapi.Transport, config *Config, requireDataStream bool) docappender.BulkIndexerConfig {
6666
var maxDocRetries int
6767
if config.Retry.Enabled {
6868
maxDocRetries = defaultMaxRetries
@@ -79,14 +79,14 @@ func bulkIndexerConfig(client esapi.Transport, config *Config) docappender.BulkI
7979
MaxDocumentRetries: maxDocRetries,
8080
Pipeline: config.Pipeline,
8181
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
82-
RequireDataStream: config.MappingMode() == MappingOTel,
82+
RequireDataStream: requireDataStream,
8383
CompressionLevel: compressionLevel,
8484
}
8585
}
8686

87-
func newSyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) *syncBulkIndexer {
87+
func newSyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) *syncBulkIndexer {
8888
return &syncBulkIndexer{
89-
config: bulkIndexerConfig(client, config),
89+
config: bulkIndexerConfig(client, config, requireDataStream),
9090
flushTimeout: config.Timeout,
9191
flushBytes: config.Flush.Bytes,
9292
retryConfig: config.Retry,
@@ -126,8 +126,14 @@ type syncBulkIndexerSession struct {
126126
}
127127

128128
// Add adds an item to the sync bulk indexer session.
129-
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error {
130-
doc := docappender.BulkIndexerItem{Index: index, Body: document, DocumentID: docID, DynamicTemplates: dynamicTemplates}
129+
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error {
130+
doc := docappender.BulkIndexerItem{
131+
Index: index,
132+
Body: document,
133+
DocumentID: docID,
134+
DynamicTemplates: dynamicTemplates,
135+
Action: action,
136+
}
131137
err := s.bi.Add(doc)
132138
if err != nil {
133139
return err
@@ -176,7 +182,7 @@ func (s *syncBulkIndexerSession) Flush(ctx context.Context) error {
176182
}
177183
}
178184

179-
func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) (*asyncBulkIndexer, error) {
185+
func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) (*asyncBulkIndexer, error) {
180186
numWorkers := config.NumWorkers
181187
if numWorkers == 0 {
182188
numWorkers = runtime.NumCPU()
@@ -190,7 +196,7 @@ func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Con
190196
pool.wg.Add(numWorkers)
191197

192198
for i := 0; i < numWorkers; i++ {
193-
bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config))
199+
bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config, requireDataStream))
194200
if err != nil {
195201
return nil, err
196202
}
@@ -249,12 +255,13 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error {
249255
// Add adds an item to the async bulk indexer session.
250256
//
251257
// Adding an item after a call to Close() will panic.
252-
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error {
258+
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error {
253259
item := docappender.BulkIndexerItem{
254260
Index: index,
255261
Body: document,
256262
DocumentID: docID,
257263
DynamicTemplates: dynamicTemplates,
264+
Action: action,
258265
}
259266
select {
260267
case <-ctx.Done():

exporter/elasticsearchexporter/bulkindexer_test.go

Lines changed: 9 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"testing"
1414
"time"
1515

16+
"github.com/elastic/go-docappender/v2"
1617
"github.com/elastic/go-elasticsearch/v8"
1718
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
@@ -97,12 +98,12 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
9798
}})
9899
require.NoError(t, err)
99100

100-
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config)
101+
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config, false)
101102
require.NoError(t, err)
102103
session, err := bulkIndexer.StartSession(context.Background())
103104
require.NoError(t, err)
104105

105-
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
106+
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
106107
// should flush
107108
time.Sleep(100 * time.Millisecond)
108109
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
@@ -111,56 +112,6 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
111112
}
112113
}
113114

114-
func TestAsyncBulkIndexer_requireDataStream(t *testing.T) {
115-
tests := []struct {
116-
name string
117-
config Config
118-
wantRequireDataStream bool
119-
}{
120-
{
121-
name: "ecs",
122-
config: Config{
123-
NumWorkers: 1,
124-
Mapping: MappingsSettings{Mode: MappingECS.String()},
125-
Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8},
126-
},
127-
wantRequireDataStream: false,
128-
},
129-
{
130-
name: "otel",
131-
config: Config{
132-
NumWorkers: 1,
133-
Mapping: MappingsSettings{Mode: MappingOTel.String()},
134-
Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8},
135-
},
136-
wantRequireDataStream: true,
137-
},
138-
}
139-
140-
for _, tt := range tests {
141-
t.Run(tt.name, func(t *testing.T) {
142-
t.Parallel()
143-
requireDataStreamCh := make(chan bool, 1)
144-
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
145-
RoundTripFunc: func(r *http.Request) (*http.Response, error) {
146-
if r.URL.Path == "/_bulk" {
147-
requireDataStreamCh <- r.URL.Query().Get("require_data_stream") == "true"
148-
}
149-
return &http.Response{
150-
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
151-
Body: io.NopCloser(strings.NewReader(successResp)),
152-
}, nil
153-
},
154-
}})
155-
require.NoError(t, err)
156-
157-
runBulkIndexerOnce(t, &tt.config, client)
158-
159-
assert.Equal(t, tt.wantRequireDataStream, <-requireDataStreamCh)
160-
})
161-
}
162-
}
163-
164115
func TestAsyncBulkIndexer_flush_error(t *testing.T) {
165116
tests := []struct {
166117
name string
@@ -222,14 +173,14 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
222173
require.NoError(t, err)
223174
core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel))
224175

225-
bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg)
176+
bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg, false)
226177
require.NoError(t, err)
227178
defer bulkIndexer.Close(context.Background())
228179

229180
session, err := bulkIndexer.StartSession(context.Background())
230181
require.NoError(t, err)
231182

232-
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
183+
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
233184
// should flush
234185
time.Sleep(100 * time.Millisecond)
235186
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
@@ -303,12 +254,12 @@ func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) {
303254
}
304255

305256
func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer {
306-
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config)
257+
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config, false)
307258
require.NoError(t, err)
308259
session, err := bulkIndexer.StartSession(context.Background())
309260
require.NoError(t, err)
310261

311-
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
262+
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
312263
assert.NoError(t, bulkIndexer.Close(context.Background()))
313264

314265
return bulkIndexer
@@ -331,11 +282,11 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) {
331282
}})
332283
require.NoError(t, err)
333284

334-
bi := newSyncBulkIndexer(zap.NewNop(), client, &cfg)
285+
bi := newSyncBulkIndexer(zap.NewNop(), client, &cfg, false)
335286
session, err := bi.StartSession(context.Background())
336287
require.NoError(t, err)
337288

338-
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
289+
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
339290
assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes
340291
assert.NoError(t, bi.Close(context.Background()))
341292
}

0 commit comments

Comments
 (0)