Skip to content

Commit 0222d50

Browse files
carsonipsbylica-splunk
authored andcommitted
[exporter/elasticsearch] Preserve attribute names and metric names on prefix conflict in OTel mapping mode (open-telemetry#35651)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Metric names should be flattened and exported as is, even when one metric name is a prefix of another. Same for attributes for all logs, metrics and traces. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 996f206 commit 0222d50

File tree

5 files changed

+206
-29
lines changed

5 files changed

+206
-29
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Preserve attribute names and metric names on prefix conflict in OTel mapping mode
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35651]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: e.g. if there are attributes "a" and "a.b", they should be sent to Elasticsearch as is, instead of "a.value" and "a.b", in OTel mapping mode
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/exporter_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,35 @@ func TestExporterLogs(t *testing.T) {
714714
assert.Equal(t, `{"some.scope.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
715715
assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
716716
})
717+
718+
t.Run("otel mode attribute key prefix conflict", func(t *testing.T) {
719+
rec := newBulkRecorder()
720+
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
721+
rec.Record(docs)
722+
return itemsAllOK(docs)
723+
})
724+
725+
exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
726+
cfg.Mapping.Mode = "otel"
727+
})
728+
729+
mustSendLogs(t, exporter, newLogsWithAttributes(map[string]any{
730+
"a": "a",
731+
"a.b": "a.b",
732+
}, map[string]any{
733+
"a": "a",
734+
"a.b": "a.b",
735+
}, map[string]any{
736+
"a": "a",
737+
"a.b": "a.b",
738+
}))
739+
740+
rec.WaitItems(1)
741+
doc := rec.Items()[0].Document
742+
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw)
743+
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
744+
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
745+
})
717746
}
718747

719748
func TestExporterMetrics(t *testing.T) {
@@ -1300,6 +1329,75 @@ func TestExporterMetrics(t *testing.T) {
13001329
assertItemsEqual(t, expected, rec.Items(), false)
13011330
})
13021331

1332+
t.Run("otel mode metric name conflict", func(t *testing.T) {
1333+
rec := newBulkRecorder()
1334+
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
1335+
rec.Record(docs)
1336+
return itemsAllOK(docs)
1337+
})
1338+
1339+
exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
1340+
cfg.Mapping.Mode = "otel"
1341+
})
1342+
1343+
metrics := pmetric.NewMetrics()
1344+
resourceMetric := metrics.ResourceMetrics().AppendEmpty()
1345+
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
1346+
1347+
fooBarMetric := scopeMetric.Metrics().AppendEmpty()
1348+
fooBarMetric.SetName("foo.bar")
1349+
fooBarMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)
1350+
1351+
fooMetric := scopeMetric.Metrics().AppendEmpty()
1352+
fooMetric.SetName("foo")
1353+
fooMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)
1354+
1355+
fooBarBazMetric := scopeMetric.Metrics().AppendEmpty()
1356+
fooBarBazMetric.SetName("foo.bar.baz")
1357+
fooBarBazMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)
1358+
1359+
mustSendMetrics(t, exporter, metrics)
1360+
1361+
rec.WaitItems(1)
1362+
expected := []itemRequest{
1363+
{
1364+
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.foo.bar":"gauge_long","metrics.foo":"gauge_long","metrics.foo.bar.baz":"gauge_long"}}}`),
1365+
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
1366+
},
1367+
}
1368+
1369+
assertItemsEqual(t, expected, rec.Items(), false)
1370+
})
1371+
1372+
t.Run("otel mode attribute key prefix conflict", func(t *testing.T) {
1373+
rec := newBulkRecorder()
1374+
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
1375+
rec.Record(docs)
1376+
return itemsAllOK(docs)
1377+
})
1378+
1379+
exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
1380+
cfg.Mapping.Mode = "otel"
1381+
})
1382+
1383+
mustSendMetrics(t, exporter, newMetricsWithAttributes(map[string]any{
1384+
"a": "a",
1385+
"a.b": "a.b",
1386+
}, map[string]any{
1387+
"a": "a",
1388+
"a.b": "a.b",
1389+
}, map[string]any{
1390+
"a": "a",
1391+
"a.b": "a.b",
1392+
}))
1393+
1394+
rec.WaitItems(1)
1395+
doc := rec.Items()[0].Document
1396+
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw)
1397+
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
1398+
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
1399+
})
1400+
13031401
t.Run("publish summary", func(t *testing.T) {
13041402
rec := newBulkRecorder()
13051403
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
@@ -1600,6 +1698,35 @@ func TestExporterTraces(t *testing.T) {
16001698
assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
16011699
}
16021700
})
1701+
1702+
t.Run("otel mode attribute key prefix conflict", func(t *testing.T) {
1703+
rec := newBulkRecorder()
1704+
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
1705+
rec.Record(docs)
1706+
return itemsAllOK(docs)
1707+
})
1708+
1709+
exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) {
1710+
cfg.Mapping.Mode = "otel"
1711+
})
1712+
1713+
mustSendTraces(t, exporter, newTracesWithAttributes(map[string]any{
1714+
"a": "a",
1715+
"a.b": "a.b",
1716+
}, map[string]any{
1717+
"a": "a",
1718+
"a.b": "a.b",
1719+
}, map[string]any{
1720+
"a": "a",
1721+
"a.b": "a.b",
1722+
}))
1723+
1724+
rec.WaitItems(1)
1725+
doc := rec.Items()[0].Document
1726+
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw)
1727+
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
1728+
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
1729+
})
16031730
}
16041731

16051732
// TestExporterAuth verifies that the Elasticsearch exporter supports

exporter/elasticsearchexporter/internal/objmodel/objmodel.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -209,12 +209,12 @@ func (doc *Document) sort() {
209209
// The filtering only keeps the last value for a key.
210210
//
211211
// Dedup ensure that keys are sorted.
212-
func (doc *Document) Dedup() {
212+
func (doc *Document) Dedup(appendValueOnConflict bool) {
213213
// 1. Always ensure the fields are sorted, Dedup support requires
214214
// Fields to be sorted.
215215
doc.sort()
216216

217-
// 2. rename fields if a primitive value is overwritten by an object.
217+
// 2. rename fields if a primitive value is overwritten by an object if appendValueOnConflict.
218218
// For example the pair (path.x=1, path.x.a="test") becomes:
219219
// (path.x.value=1, path.x.a="test").
220220
//
@@ -227,16 +227,18 @@ func (doc *Document) Dedup() {
227227
// field in favor of the `value` field in the document.
228228
//
229229
// This step removes potential conflicts when dedotting and serializing fields.
230-
var renamed bool
231-
for i := 0; i < len(doc.fields)-1; i++ {
232-
key, nextKey := doc.fields[i].key, doc.fields[i+1].key
233-
if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' {
234-
renamed = true
235-
doc.fields[i].key = key + ".value"
230+
if appendValueOnConflict {
231+
var renamed bool
232+
for i := 0; i < len(doc.fields)-1; i++ {
233+
key, nextKey := doc.fields[i].key, doc.fields[i+1].key
234+
if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' {
235+
renamed = true
236+
doc.fields[i].key = key + ".value"
237+
}
238+
}
239+
if renamed {
240+
doc.sort()
236241
}
237-
}
238-
if renamed {
239-
doc.sort()
240242
}
241243

242244
// 3. mark duplicates as 'ignore'
@@ -251,7 +253,7 @@ func (doc *Document) Dedup() {
251253

252254
// 4. fix objects that might be stored in arrays
253255
for i := range doc.fields {
254-
doc.fields[i].value.Dedup()
256+
doc.fields[i].value.Dedup(appendValueOnConflict)
255257
}
256258
}
257259

@@ -487,13 +489,13 @@ func (v *Value) sort() {
487489
// Dedup recursively dedups keys in stored documents.
488490
//
489491
// NOTE: The value MUST be sorted.
490-
func (v *Value) Dedup() {
492+
func (v *Value) Dedup(appendValueOnConflict bool) {
491493
switch v.kind {
492494
case KindObject:
493-
v.doc.Dedup()
495+
v.doc.Dedup(appendValueOnConflict)
494496
case KindArr:
495497
for i := range v.arr {
496-
v.arr[i].Dedup()
498+
v.arr[i].Dedup(appendValueOnConflict)
497499
}
498500
}
499501
}

exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,18 @@ func TestObjectModel_CreateMap(t *testing.T) {
8686

8787
func TestObjectModel_Dedup(t *testing.T) {
8888
tests := map[string]struct {
89-
build func() Document
90-
want Document
89+
build func() Document
90+
appendValueOnConflict bool
91+
want Document
9192
}{
9293
"no duplicates": {
9394
build: func() (doc Document) {
9495
doc.AddInt("a", 1)
9596
doc.AddInt("c", 3)
9697
return doc
9798
},
98-
want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}},
99+
appendValueOnConflict: true,
100+
want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}},
99101
},
100102
"duplicate keys": {
101103
build: func() (doc Document) {
@@ -104,7 +106,8 @@ func TestObjectModel_Dedup(t *testing.T) {
104106
doc.AddInt("a", 2)
105107
return doc
106108
},
107-
want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}},
109+
appendValueOnConflict: true,
110+
want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}},
108111
},
109112
"duplicate after flattening from map: namespace object at end": {
110113
build: func() Document {
@@ -114,7 +117,8 @@ func TestObjectModel_Dedup(t *testing.T) {
114117
am.PutEmptyMap("namespace").PutInt("a", 23)
115118
return DocumentFromAttributes(am)
116119
},
117-
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}},
120+
appendValueOnConflict: true,
121+
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}},
118122
},
119123
"duplicate after flattening from map: namespace object at beginning": {
120124
build: func() Document {
@@ -124,7 +128,8 @@ func TestObjectModel_Dedup(t *testing.T) {
124128
am.PutStr("toplevel", "test")
125129
return DocumentFromAttributes(am)
126130
},
127-
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}},
131+
appendValueOnConflict: true,
132+
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}},
128133
},
129134
"dedup in arrays": {
130135
build: func() (doc Document) {
@@ -136,6 +141,7 @@ func TestObjectModel_Dedup(t *testing.T) {
136141
doc.Add("arr", ArrValue(Value{kind: KindObject, doc: embedded}))
137142
return doc
138143
},
144+
appendValueOnConflict: true,
139145
want: Document{fields: []field{{"arr", ArrValue(Value{kind: KindObject, doc: Document{fields: []field{
140146
{"a", ignoreValue},
141147
{"a", IntValue(2)},
@@ -148,7 +154,8 @@ func TestObjectModel_Dedup(t *testing.T) {
148154
doc.AddInt("namespace.a", 2)
149155
return doc
150156
},
151-
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}},
157+
appendValueOnConflict: true,
158+
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}},
152159
},
153160
"dedup removes primitive if value exists": {
154161
build: func() (doc Document) {
@@ -157,14 +164,25 @@ func TestObjectModel_Dedup(t *testing.T) {
157164
doc.AddInt("namespace.value", 3)
158165
return doc
159166
},
160-
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}},
167+
appendValueOnConflict: true,
168+
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}},
169+
},
170+
"dedup without append value on conflict": {
171+
build: func() (doc Document) {
172+
doc.AddInt("namespace", 1)
173+
doc.AddInt("namespace.a", 2)
174+
doc.AddInt("namespace.value", 3)
175+
return doc
176+
},
177+
appendValueOnConflict: false,
178+
want: Document{fields: []field{{"namespace", IntValue(1)}, {"namespace.a", IntValue(2)}, {"namespace.value", IntValue(3)}}},
161179
},
162180
}
163181

164182
for name, test := range tests {
165183
t.Run(name, func(t *testing.T) {
166184
doc := test.build()
167-
doc.Dedup()
185+
doc.Dedup(test.appendValueOnConflict)
168186
assert.Equal(t, test.want, doc)
169187
})
170188
}
@@ -282,7 +300,7 @@ func TestDocument_Serialize_Flat(t *testing.T) {
282300
m := pcommon.NewMap()
283301
assert.NoError(t, m.FromRaw(test.attrs))
284302
doc := DocumentFromAttributes(m)
285-
doc.Dedup()
303+
doc.Dedup(true)
286304
err := doc.Serialize(&buf, false, false)
287305
require.NoError(t, err)
288306

@@ -343,7 +361,7 @@ func TestDocument_Serialize_Dedot(t *testing.T) {
343361
m := pcommon.NewMap()
344362
assert.NoError(t, m.FromRaw(test.attrs))
345363
doc := DocumentFromAttributes(m)
346-
doc.Dedup()
364+
doc.Dedup(true)
347365
err := doc.Serialize(&buf, true, false)
348366
require.NoError(t, err)
349367

exporter/elasticsearchexporter/model.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str
115115
default:
116116
document = m.encodeLogDefaultMode(resource, record, scope)
117117
}
118-
document.Dedup()
118+
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
119+
document.Dedup(m.mode != MappingOTel)
119120

120121
var buf bytes.Buffer
121122
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
@@ -267,7 +268,8 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo
267268
}
268269

269270
func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) {
270-
document.Dedup()
271+
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
272+
document.Dedup(m.mode != MappingOTel)
271273

272274
var buf bytes.Buffer
273275
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
@@ -646,7 +648,8 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL st
646648
default:
647649
document = m.encodeSpanDefaultMode(resource, span, scope)
648650
}
649-
document.Dedup()
651+
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
652+
document.Dedup(m.mode != MappingOTel)
650653
var buf bytes.Buffer
651654
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
652655
return buf.Bytes(), err

0 commit comments

Comments
 (0)