Skip to content

Commit 6bb304e

Browse files
axwcarsonip
andauthored
[elasticsearchexporter] support dynamic mapping modes (#38114)
#### Description Add support for controlling the mapping mode via a client metadata key (typically an HTTP or gRPC header), X-Elastic-Mapping-Mode. This will override the mode specified in `mapping::mode`, which now serves as the default. We also introduce `mapping::allowed_modes` config which restricts mapping modes to those listed. This will allow an administrator to lock down the allowed mapping modes that can be specified through client metadata. Because we want to set "require_data_stream=true" for otel mode, and we no longer know ahead of time which mapping mode will be used, we now create a BulkIndexer per mapping mode. #### Link to tracking issue Closes #36092 #### Testing - Added unit tests - Tested against a live collector with telemetrygen, passing `--otlp-header X-Elastic-Mapping-Mode=\"<mode>\"` #### Documentation Updated README. --------- Co-authored-by: Carson Ip <[email protected]>
1 parent 536cb72 commit 6bb304e

File tree

10 files changed

+531
-194
lines changed

10 files changed

+531
-194
lines changed

.chloggen/mappingmode-context.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Support specifying mapping mode via client metadata
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36092]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Add config `mapping::allowed_modes` to restrict mapping modes configurable from client metadata.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/README.md

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -157,28 +157,41 @@ This can be customised through the following settings:
157157
The Elasticsearch exporter supports several document schemas and preprocessing
158158
behaviours, which may be configured through the following settings:
159159

160-
- `mapping`: Events are encoded to JSON. The `mapping` allows users to
161-
configure additional mapping rules.
162-
- `mode` (default=none): The fields naming mode. valid modes are:
163-
- `none`: Use original fields and event structure from the OTLP event.
164-
- `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS]
165-
- `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event.
166-
- There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.
167-
- `data_stream.dataset` will always be appended with `.otel`. It is recommended to use with `*_dynamic_index.enabled: true` to route documents to data stream `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`.
168-
- Span events are stored in separate documents. They will be routed with `data_stream.type` set to `logs` if `traces_dynamic_index::enabled` is `true`.
169-
170-
- `raw`: Omit the `Attributes.` string prefixed to field names for log and
171-
span attributes as well as omit the `Events.` string prefixed to
172-
field names for span events.
173-
- `bodymap`: Provides fine-grained control over the final documents to be ingested.
174-
:warning: This mode's behavior is unstable, it is currently experimental and undergoing changes.
175-
It works only for logs where the log record body is a map. Each LogRecord
176-
body is serialized to JSON as-is and becomes a separate document for ingestion.
177-
If the log record body is not a map, the exporter will log a warning and drop the log record.
160+
- `mapping`:
161+
- `mode` (default=none): The default mapping mode. Valid modes are:
162+
- `none`
163+
- `ecs`
164+
- `otel`
165+
- `raw`
166+
- `bodymap`
167+
- `allowed_modes` (defaults to all mapping modes): A list of allowed mapping modes.
168+
169+
The mapping mode can also be controlled via the client metadata key `X-Elastic-Mapping-Mode`,
170+
e.g. via HTTP headers, gRPC metadata. This will override the configured `mapping::mode`.
171+
It is possible to restrict which mapping modes may be requested by configuring
172+
`mapping::allowed_modes`, which defaults to all mapping modes. Keep in mind that not all
173+
processors or exporter configurations will maintain client
174+
metadata.
175+
176+
See below for a description of each mapping mode.
178177

179178
#### OTel mapping mode
180179

181-
In `otel` mapping mode, the Elasticsearch Exporter stores documents in an OTel-native schema.
180+
In `otel` mapping mode, the Elasticsearch Exporter stores documents in Elastic's preferred
181+
"OTel-native" schema. In this mapping mode, documents use the original attribute names and
182+
closely follows the event structure from the OTLP events.
183+
184+
There is special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`,
185+
and `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace,
186+
they are put at the root of the document, to conform with the conventions of the data stream naming
187+
scheme that maps these as `constant_keyword` fields.
188+
189+
`data_stream.dataset` will always be appended with `.otel`. It is recommended to use with
190+
`*_dynamic_index::enabled: true` (e.g. `logs_dynamic_index::enabled`) to route documents to data stream
191+
`${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`.
192+
193+
Span events are stored in separate documents. They will be routed with `data_stream.type` set to
194+
`logs` if `traces_dynamic_index::enabled` is `true`.
182195

183196
| Signal | Supported |
184197
| --------- | ------------------ |

exporter/elasticsearchexporter/bulkindexer.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry
66
import (
77
"context"
88
"errors"
9+
"fmt"
910
"io"
1011
"runtime"
1112
"strings"
@@ -15,7 +16,9 @@ import (
1516

1617
"github.com/elastic/go-docappender/v2"
1718
"github.com/elastic/go-elasticsearch/v8/esapi"
19+
"go.opentelemetry.io/collector/component"
1820
"go.opentelemetry.io/collector/config/configcompression"
21+
"go.opentelemetry.io/collector/exporter"
1922
"go.uber.org/zap"
2023
)
2124

@@ -362,3 +365,143 @@ func getErrorHint(index, errorType string) string {
362365
}
363366
return ""
364367
}
368+
369+
type bulkIndexers struct {
370+
// wg tracks active sessions
371+
wg sync.WaitGroup
372+
373+
// NOTE(axw) when we get rid of the async bulk indexer there would be
374+
// no reason for having one per mode or for different document types.
375+
// Instead, the caller can create separate sessions as needed, and we
376+
// can either have one for required_data_stream=true and one for false,
377+
// or callers can set this per document.
378+
379+
modes [NumMappingModes]bulkIndexer
380+
profilingEvents bulkIndexer // For profiling-events-*
381+
profilingStackTraces bulkIndexer // For profiling-stacktraces
382+
profilingStackFrames bulkIndexer // For profiling-stackframes
383+
profilingExecutables bulkIndexer // For profiling-executables
384+
}
385+
386+
func (b *bulkIndexers) start(
387+
ctx context.Context,
388+
cfg *Config,
389+
set exporter.Settings,
390+
host component.Host,
391+
allowedMappingModes map[string]MappingMode,
392+
) error {
393+
userAgent := fmt.Sprintf(
394+
"%s/%s (%s/%s)",
395+
set.BuildInfo.Description,
396+
set.BuildInfo.Version,
397+
runtime.GOOS,
398+
runtime.GOARCH,
399+
)
400+
401+
esClient, err := newElasticsearchClient(ctx, cfg, host, set.TelemetrySettings, userAgent)
402+
if err != nil {
403+
return err
404+
}
405+
406+
for _, mode := range allowedMappingModes {
407+
var bi bulkIndexer
408+
bi, err = newBulkIndexer(set.TelemetrySettings.Logger, esClient, cfg, mode == MappingOTel)
409+
if err != nil {
410+
return err
411+
}
412+
b.modes[mode] = &wgTrackingBulkIndexer{bulkIndexer: bi, wg: &b.wg}
413+
}
414+
415+
profilingEvents, err := newBulkIndexer(set.Logger, esClient, cfg, true)
416+
if err != nil {
417+
return err
418+
}
419+
b.profilingEvents = &wgTrackingBulkIndexer{bulkIndexer: profilingEvents, wg: &b.wg}
420+
421+
profilingStackTraces, err := newBulkIndexer(set.Logger, esClient, cfg, false)
422+
if err != nil {
423+
return err
424+
}
425+
b.profilingStackTraces = &wgTrackingBulkIndexer{bulkIndexer: profilingStackTraces, wg: &b.wg}
426+
427+
profilingStackFrames, err := newBulkIndexer(set.Logger, esClient, cfg, false)
428+
if err != nil {
429+
return err
430+
}
431+
b.profilingStackFrames = &wgTrackingBulkIndexer{bulkIndexer: profilingStackFrames, wg: &b.wg}
432+
433+
profilingExecutables, err := newBulkIndexer(set.Logger, esClient, cfg, false)
434+
if err != nil {
435+
return err
436+
}
437+
b.profilingExecutables = &wgTrackingBulkIndexer{bulkIndexer: profilingExecutables, wg: &b.wg}
438+
return nil
439+
}
440+
441+
func (b *bulkIndexers) shutdown(ctx context.Context) error {
442+
for _, bi := range b.modes {
443+
if bi == nil {
444+
continue
445+
}
446+
if err := bi.Close(ctx); err != nil {
447+
return err
448+
}
449+
}
450+
if b.profilingEvents != nil {
451+
if err := b.profilingEvents.Close(ctx); err != nil {
452+
return err
453+
}
454+
}
455+
if b.profilingStackTraces != nil {
456+
if err := b.profilingStackTraces.Close(ctx); err != nil {
457+
return err
458+
}
459+
}
460+
if b.profilingStackFrames != nil {
461+
if err := b.profilingStackFrames.Close(ctx); err != nil {
462+
return err
463+
}
464+
}
465+
if b.profilingExecutables != nil {
466+
if err := b.profilingExecutables.Close(ctx); err != nil {
467+
return err
468+
}
469+
}
470+
471+
doneCh := make(chan struct{})
472+
go func() {
473+
b.wg.Wait()
474+
close(doneCh)
475+
}()
476+
select {
477+
case <-ctx.Done():
478+
return ctx.Err()
479+
case <-doneCh:
480+
}
481+
return nil
482+
}
483+
484+
type wgTrackingBulkIndexer struct {
485+
bulkIndexer
486+
wg *sync.WaitGroup
487+
}
488+
489+
func (w *wgTrackingBulkIndexer) StartSession(ctx context.Context) (bulkIndexerSession, error) {
490+
w.wg.Add(1)
491+
session, err := w.bulkIndexer.StartSession(ctx)
492+
if err != nil {
493+
w.wg.Done()
494+
return nil, err
495+
}
496+
return &wgTrackingBulkIndexerSession{bulkIndexerSession: session, wg: w.wg}, nil
497+
}
498+
499+
type wgTrackingBulkIndexerSession struct {
500+
bulkIndexerSession
501+
wg *sync.WaitGroup
502+
}
503+
504+
func (w *wgTrackingBulkIndexerSession) End() {
505+
defer w.wg.Done()
506+
w.bulkIndexerSession.End()
507+
}

0 commit comments

Comments
 (0)