Skip to content

Commit 13366cc

Browse files
aleksmausfelixbarnycarsonip
authored
[exporter/elasticsearch] OTel mode serialization (#33290)
**Description:** Implements OTel (OpenTelemetry-native) mode serialization for elasticsearch exporter. This is an initial cut in order to get the discussion going. This is approach was tested as internal POC. It leverages Elasticsearch ```"passthrough"``` fields mapping initially introduced in Elasticsearch 8.13 allowing users to query the document/scope/resources attributes as top level fields, making the ECS queries compatible with OTel sematic convention schema. Another benefit is the simplicity of conversion of stored document from Elasticsearch back to Otel data model format. The document/scope/resources attributes are dynamically mapped and stored as flattened keys. Here is an example of index template mappings with ```"passthrough"``` fields: ``` PUT _index_template/logs_otel { "priority": 250, "template": { "settings": { "index": { "lifecycle": { "name": "logs" }, "codec": "best_compression", "mapping": { "ignore_malformed": "true" } } }, "mappings": { "_source": { "enabled": true }, "date_detection": false, "dynamic": "strict", "dynamic_templates": [ { "all_strings_to_keywords": { "mapping": { "ignore_above": 1024, "type": "keyword" }, "match_mapping_type": "string" } }, { "complex_attributes": { "path_match": [ "resource.attributes.*", "scope.attributes.*", "attributes.*" ], "match_mapping_type": "object", "mapping": { "type": "flattened" } } } ], "properties": { "@timestamp": { "type": "date_nanos", "ignore_malformed": false }, "data_stream": { "type": "object", "properties": { "type": { "type": "constant_keyword" }, "dataset": { "type": "constant_keyword" }, "namespace": { "type": "constant_keyword" } } }, "observed_timestamp": { "type": "date_nanos", "ignore_malformed": true }, "severity_number": { "type": "long" }, "severity_text": { "type": "keyword" }, "body_text": { "type": "match_only_text" }, "body_structured": { "type": "flattened" }, "attributes": { "type": "passthrough", "dynamic": true, "priority": 2 }, "dropped_attributes_count": { "type": "long" }, "trace_flags": { "type": "byte" }, "trace_id": { "type": "keyword" }, "span_id": { "type": "keyword" }, "scope": { "properties": { "name": { "type": "keyword" }, "version": { "type": "keyword" }, "attributes": { "type": "passthrough", "dynamic": true, "priority": 1 }, "dropped_attributes_count": { "type": "long" }, "schema_url": { "type": "keyword" } } }, "resource": { "properties": { "dropped_attributes_count": { "type": "long" }, "schema_url": { "type": "keyword" }, "attributes": { "type": "passthrough", "dynamic": true, "priority": 0 } } } } } }, "index_patterns": [ "logs-*.otel-*" ], "data_stream": {} } ``` Here is an example of the auditd document in Elasticsearch abbreviated: ``` { "@timestamp": "2024-05-29T13:30:25.085926000Z", "attributes": { "foo": "bar", "some.bool": true }, "body_structured": { "MESSAGE": "AVC apparmor=\"STATUS\" operation=\"profile_replace\" info=\"same as current profile, skipping\" profile=\"unconfined\" name=\"/usr/bin/evince-previewer\" pid=2702 comm=\"apparmor_parser\"", "SYSLOG_FACILITY": "4", "SYSLOG_IDENTIFIER": "audit", "_SOURCE_REALTIME_TIMESTAMP": "1716989425080000", "_TRANSPORT": "audit", }, "dropped_attributes_count": 0, "observed_timestamp": "2024-05-29T14:49:26.534908898Z", "resource": { "attributes": { "data_stream.dataset": "auditd.otel", "data_stream.namespace": "default", "data_stream.type": "logs", "host.arch": "arm64", "host.cpu.cache.l2.size": 0, "host.cpu.family": "", "host.cpu.model.id": "0x000", "host.cpu.model.name": "", "host.cpu.stepping": "0", "host.cpu.vendor.id": "Apple", "host.id": "cae0e0147d454a80971b0b747c8b62b9", "host.ip": [ "172.16.3.131", "fe80::20c:29ff:fe66:3012", "host.name": "lebuntu", "host.os.description": "Ubuntu 22.04.4 LTS (Jammy Jellyfish) (Linux lebuntu 5.15.0-107-generic #117-Ubuntu SMP Mon Apr 29 14:37:09 UTC 2024 aarch64)", "host.os.type": "linux", "os.description": "Ubuntu 22.04.4 LTS (Jammy Jellyfish) (Linux lebuntu 5.15.0-107-generic #117-Ubuntu SMP Mon Apr 29 14:37:09 UTC 2024 aarch64)", "os.type": "linux" }, "dropped_attributes_count": 0, "schema_url": "https://opentelemetry.io/schemas/1.6.1" }, "severity_number": 0, "trace_flags": 0 } ``` Here is an example of ECS compatible query that works on this Otel native schema: ``` GET logs-auditd.otel-default/_search { "query": { "bool": { "must": [ { "match": { "host.name": "lebuntu" } } ] } } } ``` **Link to tracking Issue:** No tracking issue yet. **Testing:** Added unit test for OTel transformation. Tested with journald OTel receiver. **Documentation:** No documentation is added yet. --------- Co-authored-by: Felix Barnsteiner <[email protected]> Co-authored-by: Carson Ip <[email protected]>
1 parent 1c98261 commit 13366cc

File tree

10 files changed

+647
-39
lines changed

10 files changed

+647
-39
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Introduce an experimental OTel native mapping mode for logs
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [33290]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/elasticsearchexporter/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ behaviours, which may be configured through the following settings:
127127
- `mode` (default=none): The fields naming mode. valid modes are:
128128
- `none`: Use original fields and event structure from the OTLP event.
129129
- `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS]
130+
- `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event.
131+
:warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes.
132+
There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.
133+
130134
- `raw`: Omit the `Attributes.` string prefixed to field names for log and
131135
span attributes as well as omit the `Events.` string prefixed to
132136
field names for span events.

exporter/elasticsearchexporter/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ type MappingMode int
179179
const (
180180
MappingNone MappingMode = iota
181181
MappingECS
182+
MappingOTel
182183
MappingRaw
183184
)
184185

@@ -193,6 +194,8 @@ func (m MappingMode) String() string {
193194
return ""
194195
case MappingECS:
195196
return "ecs"
197+
case MappingOTel:
198+
return "otel"
196199
case MappingRaw:
197200
return "raw"
198201
default:
@@ -205,6 +208,7 @@ var mappingModes = func() map[string]MappingMode {
205208
for _, m := range []MappingMode{
206209
MappingNone,
207210
MappingECS,
211+
MappingOTel,
208212
MappingRaw,
209213
} {
210214
table[strings.ToLower(m.String())] = m

exporter/elasticsearchexporter/data_stream_router.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace strin
1616
pcommon.Map,
1717
pcommon.Map,
1818
string,
19+
bool,
1920
) string {
2021
return func(
2122
recordAttr pcommon.Map,
2223
scopeAttr pcommon.Map,
2324
resourceAttr pcommon.Map,
2425
fIndex string,
26+
otel bool,
2527
) string {
2628
// Order:
2729
// 1. read data_stream.* from attributes
@@ -37,6 +39,13 @@ func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace strin
3739
return fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
3840
}
3941
}
42+
43+
// The naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]".
44+
// This is in order to match the soon to be built-in logs-*.otel-* index template.
45+
if otel {
46+
dataset += ".otel"
47+
}
48+
4049
recordAttr.PutStr(dataStreamDataset, dataset)
4150
recordAttr.PutStr(dataStreamNamespace, namespace)
4251
recordAttr.PutStr(dataStreamType, defaultDSType)
@@ -51,9 +60,10 @@ func routeLogRecord(
5160
scope pcommon.InstrumentationScope,
5261
resource pcommon.Resource,
5362
fIndex string,
63+
otel bool,
5464
) string {
5565
route := routeWithDefaults(defaultDataStreamTypeLogs, defaultDataStreamDataset, defaultDataStreamNamespace)
56-
return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
66+
return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
5767
}
5868

5969
// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes.
@@ -63,9 +73,10 @@ func routeDataPoint(
6373
scope pcommon.InstrumentationScope,
6474
resource pcommon.Resource,
6575
fIndex string,
76+
otel bool,
6677
) string {
6778
route := routeWithDefaults(defaultDataStreamTypeMetrics, defaultDataStreamDataset, defaultDataStreamNamespace)
68-
return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
79+
return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
6980
}
7081

7182
// routeSpan returns the name of the index to send the span to according to data stream routing attributes.
@@ -75,7 +86,8 @@ func routeSpan(
7586
scope pcommon.InstrumentationScope,
7687
resource pcommon.Resource,
7788
fIndex string,
89+
otel bool,
7890
) string {
7991
route := routeWithDefaults(defaultDataStreamTypeTraces, defaultDataStreamDataset, defaultDataStreamNamespace)
80-
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
92+
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
8193
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package elasticsearchexporter
5+
6+
import (
7+
"fmt"
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
"go.opentelemetry.io/collector/pdata/plog"
12+
"go.opentelemetry.io/collector/pdata/pmetric"
13+
"go.opentelemetry.io/collector/pdata/ptrace"
14+
)
15+
16+
type routeTestInfo struct {
17+
name string
18+
otel bool
19+
want string
20+
}
21+
22+
func createRouteTests(dsType string) []routeTestInfo {
23+
renderWantRoute := func(dsType string, otel bool) string {
24+
if otel {
25+
return fmt.Sprintf("%s-%s.otel-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace)
26+
}
27+
return fmt.Sprintf("%s-%s-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace)
28+
}
29+
30+
return []routeTestInfo{
31+
{
32+
name: "default",
33+
otel: false,
34+
want: renderWantRoute(dsType, false),
35+
},
36+
{
37+
name: "otel",
38+
otel: true,
39+
want: renderWantRoute(dsType, true),
40+
},
41+
}
42+
}
43+
44+
func TestRouteLogRecord(t *testing.T) {
45+
46+
tests := createRouteTests(defaultDataStreamTypeLogs)
47+
48+
for _, tc := range tests {
49+
t.Run(tc.name, func(t *testing.T) {
50+
ds := routeLogRecord(plog.NewLogRecord(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
51+
assert.Equal(t, tc.want, ds)
52+
})
53+
}
54+
}
55+
56+
func TestRouteDataPoint(t *testing.T) {
57+
58+
tests := createRouteTests(defaultDataStreamTypeMetrics)
59+
60+
for _, tc := range tests {
61+
t.Run(tc.name, func(t *testing.T) {
62+
ds := routeDataPoint(pmetric.NewNumberDataPoint(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
63+
assert.Equal(t, tc.want, ds)
64+
})
65+
}
66+
}
67+
68+
func TestRouteSpan(t *testing.T) {
69+
70+
tests := createRouteTests(defaultDataStreamTypeTraces)
71+
72+
for _, tc := range tests {
73+
t.Run(tc.name, func(t *testing.T) {
74+
ds := routeSpan(ptrace.NewSpan(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
75+
assert.Equal(t, tc.want, ds)
76+
})
77+
}
78+
}

exporter/elasticsearchexporter/exporter.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type elasticsearchExporter struct {
3030
logstashFormat LogstashFormatSettings
3131
dynamicIndex bool
3232
model mappingModel
33+
otel bool
3334

3435
bulkIndexer bulkIndexer
3536
}
@@ -49,6 +50,8 @@ func newExporter(
4950
mode: cfg.MappingMode(),
5051
}
5152

53+
otel := model.mode == MappingOTel
54+
5255
userAgent := fmt.Sprintf(
5356
"%s/%s (%s/%s)",
5457
set.BuildInfo.Description,
@@ -66,6 +69,7 @@ func newExporter(
6669
dynamicIndex: dynamicIndex,
6770
model: model,
6871
logstashFormat: cfg.LogstashFormat,
72+
otel: otel,
6973
}, nil
7074
}
7175

@@ -107,7 +111,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
107111
scope := ill.Scope()
108112
logs := ill.LogRecords()
109113
for k := 0; k < logs.Len(); k++ {
110-
if err := e.pushLogRecord(ctx, resource, logs.At(k), scope, session); err != nil {
114+
if err := e.pushLogRecord(ctx, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl(), session); err != nil {
111115
if cerr := ctx.Err(); cerr != nil {
112116
return cerr
113117
}
@@ -130,13 +134,15 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
130134
func (e *elasticsearchExporter) pushLogRecord(
131135
ctx context.Context,
132136
resource pcommon.Resource,
137+
resourceSchemaURL string,
133138
record plog.LogRecord,
134139
scope pcommon.InstrumentationScope,
140+
scopeSchemaURL string,
135141
bulkIndexerSession bulkIndexerSession,
136142
) error {
137143
fIndex := e.index
138144
if e.dynamicIndex {
139-
fIndex = routeLogRecord(record, scope, resource, fIndex)
145+
fIndex = routeLogRecord(record, scope, resource, fIndex, e.otel)
140146
}
141147

142148
if e.logstashFormat.Enabled {
@@ -147,7 +153,7 @@ func (e *elasticsearchExporter) pushLogRecord(
147153
fIndex = formattedIndex
148154
}
149155

150-
document, err := e.model.encodeLog(resource, record, scope)
156+
document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
151157
if err != nil {
152158
return fmt.Errorf("failed to encode log event: %w", err)
153159
}
@@ -279,7 +285,7 @@ func (e *elasticsearchExporter) getMetricDataPointIndex(
279285
) (string, error) {
280286
fIndex := e.index
281287
if e.dynamicIndex {
282-
fIndex = routeDataPoint(dataPoint, scope, resource, fIndex)
288+
fIndex = routeDataPoint(dataPoint, scope, resource, fIndex, e.otel)
283289
}
284290

285291
if e.logstashFormat.Enabled {
@@ -342,7 +348,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
342348
) error {
343349
fIndex := e.index
344350
if e.dynamicIndex {
345-
fIndex = routeSpan(span, scope, resource, fIndex)
351+
fIndex = routeSpan(span, scope, resource, fIndex, e.otel)
346352
}
347353

348354
if e.logstashFormat.Enabled {

exporter/elasticsearchexporter/internal/objmodel/objmodel.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -244,19 +244,19 @@ func (doc *Document) Dedup() {
244244
// Serialize writes the document to the given writer. The serializer will create nested objects if dedot is true.
245245
//
246246
// NOTE: The documented MUST be sorted if dedot is true.
247-
func (doc *Document) Serialize(w io.Writer, dedot bool) error {
247+
func (doc *Document) Serialize(w io.Writer, dedot bool, otel bool) error {
248248
v := json.NewVisitor(w)
249-
return doc.iterJSON(v, dedot)
249+
return doc.iterJSON(v, dedot, otel)
250250
}
251251

252-
func (doc *Document) iterJSON(v *json.Visitor, dedot bool) error {
252+
func (doc *Document) iterJSON(v *json.Visitor, dedot bool, otel bool) error {
253253
if dedot {
254-
return doc.iterJSONDedot(v)
254+
return doc.iterJSONDedot(v, otel)
255255
}
256-
return doc.iterJSONFlat(v)
256+
return doc.iterJSONFlat(v, otel)
257257
}
258258

259-
func (doc *Document) iterJSONFlat(w *json.Visitor) error {
259+
func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error {
260260
err := w.OnObjectStart(-1, structform.AnyType)
261261
if err != nil {
262262
return err
@@ -275,15 +275,22 @@ func (doc *Document) iterJSONFlat(w *json.Visitor) error {
275275
return err
276276
}
277277

278-
if err := fld.value.iterJSON(w, true); err != nil {
278+
if err := fld.value.iterJSON(w, true, otel); err != nil {
279279
return err
280280
}
281281
}
282282

283283
return nil
284284
}
285285

286-
func (doc *Document) iterJSONDedot(w *json.Visitor) error {
286+
// Set of prefixes for the OTel attributes that needs to stay flattened
287+
var otelPrefixSet = map[string]struct{}{
288+
"attributes.": {},
289+
"resource.attributes.": {},
290+
"scope.attributes.": {},
291+
}
292+
293+
func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error {
287294
objPrefix := ""
288295
level := 0
289296

@@ -335,6 +342,16 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error {
335342

336343
// increase object level up to current field
337344
for {
345+
346+
// Otel mode serialization
347+
if otel {
348+
// Check the prefix
349+
_, isOtelPrefix := otelPrefixSet[objPrefix]
350+
if isOtelPrefix {
351+
break
352+
}
353+
}
354+
338355
start := len(objPrefix)
339356
idx := strings.IndexByte(key[start:], '.')
340357
if idx < 0 {
@@ -357,7 +374,7 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error {
357374
if err := w.OnKey(fieldName); err != nil {
358375
return err
359376
}
360-
if err := fld.value.iterJSON(w, true); err != nil {
377+
if err := fld.value.iterJSON(w, true, otel); err != nil {
361378
return err
362379
}
363380
}
@@ -460,7 +477,7 @@ func (v *Value) IsEmpty() bool {
460477
}
461478
}
462479

463-
func (v *Value) iterJSON(w *json.Visitor, dedot bool) error {
480+
func (v *Value) iterJSON(w *json.Visitor, dedot bool, otel bool) error {
464481
switch v.kind {
465482
case KindNil:
466483
return w.OnNil()
@@ -483,13 +500,13 @@ func (v *Value) iterJSON(w *json.Visitor, dedot bool) error {
483500
if len(v.doc.fields) == 0 {
484501
return w.OnNil()
485502
}
486-
return v.doc.iterJSON(w, dedot)
503+
return v.doc.iterJSON(w, dedot, otel)
487504
case KindArr:
488505
if err := w.OnArrayStart(-1, structform.AnyType); err != nil {
489506
return err
490507
}
491508
for i := range v.arr {
492-
if err := v.arr[i].iterJSON(w, dedot); err != nil {
509+
if err := v.arr[i].iterJSON(w, dedot, otel); err != nil {
493510
return err
494511
}
495512
}

0 commit comments

Comments
 (0)