Skip to content

Commit 9985b42

Browse files
carsonipsbylica-splunk
authored andcommitted
[exporter/elasticsearch] Set body.* for log body in OTel mode (open-telemetry#35771)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Update OTel mode to implementation to serialize log body into body.* fields <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent c3c6313 commit 9985b42

File tree

3 files changed

+153
-43
lines changed

3 files changed

+153
-43
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Set body.* for log body in OTel mode
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35771]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Log record body in OTel mapping mode will be stored in body.text, body.structured, body.flattened based on body value type and presence of event.name attribute
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/exporter_test.go

Lines changed: 92 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -293,38 +293,104 @@ func TestExporterLogs(t *testing.T) {
293293
})
294294

295295
t.Run("publish otel mapping mode", func(t *testing.T) {
296-
rec := newBulkRecorder()
297-
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
298-
rec.Record(docs)
299-
return itemsAllOK(docs)
300-
})
301-
302-
exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
303-
cfg.LogsDynamicIndex.Enabled = true
304-
cfg.Mapping.Mode = "otel"
305-
})
306-
mustSendLogs(t, exporter, newLogsWithAttributes(
307-
map[string]any{
308-
"data_stream.dataset": "attr.dataset",
309-
"attr.foo": "attr.foo.value",
296+
for _, tc := range []struct {
297+
body pcommon.Value
298+
isEvent bool
299+
wantDocument []byte
300+
}{
301+
{
302+
body: func() pcommon.Value {
303+
return pcommon.NewValueStr("foo")
304+
}(),
305+
wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"text":"foo"}}`),
310306
},
311-
nil,
312-
map[string]any{
313-
"data_stream.dataset": "resource.attribute.dataset",
314-
"data_stream.namespace": "resource.attribute.namespace",
315-
"resource.attr.foo": "resource.attr.foo.value",
307+
{
308+
body: func() pcommon.Value {
309+
vm := pcommon.NewValueMap()
310+
m := vm.SetEmptyMap()
311+
m.PutBool("true", true)
312+
m.PutBool("false", false)
313+
m.PutEmptyMap("inner").PutStr("foo", "bar")
314+
return vm
315+
}(),
316+
wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"flattened":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`),
317+
},
318+
{
319+
body: func() pcommon.Value {
320+
vm := pcommon.NewValueMap()
321+
m := vm.SetEmptyMap()
322+
m.PutBool("true", true)
323+
m.PutBool("false", false)
324+
m.PutEmptyMap("inner").PutStr("foo", "bar")
325+
return vm
326+
}(),
327+
isEvent: true,
328+
wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`),
316329
},
317-
))
318-
rec.WaitItems(1)
319-
320-
expected := []itemRequest{
321330
{
322-
Action: []byte(`{"create":{"_index":"logs-attr.dataset.otel-resource.attribute.namespace"}}`),
323-
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0}`),
331+
body: func() pcommon.Value {
332+
vs := pcommon.NewValueSlice()
333+
s := vs.Slice()
334+
s.AppendEmpty().SetStr("foo")
335+
s.AppendEmpty().SetBool(false)
336+
s.AppendEmpty().SetEmptyMap().PutStr("foo", "bar")
337+
return vs
338+
}(),
339+
wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"flattened":{"value":["foo",false,{"foo":"bar"}]}}}`),
324340
},
341+
{
342+
body: func() pcommon.Value {
343+
vs := pcommon.NewValueSlice()
344+
s := vs.Slice()
345+
s.AppendEmpty().SetStr("foo")
346+
s.AppendEmpty().SetBool(false)
347+
s.AppendEmpty().SetEmptyMap().PutStr("foo", "bar")
348+
return vs
349+
}(),
350+
isEvent: true,
351+
wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"value":["foo",false,{"foo":"bar"}]}}}`),
352+
},
353+
} {
354+
rec := newBulkRecorder()
355+
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
356+
rec.Record(docs)
357+
return itemsAllOK(docs)
358+
})
359+
360+
exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
361+
cfg.LogsDynamicIndex.Enabled = true
362+
cfg.Mapping.Mode = "otel"
363+
})
364+
recordAttrs := map[string]any{
365+
"data_stream.dataset": "attr.dataset",
366+
"attr.foo": "attr.foo.value",
367+
}
368+
if tc.isEvent {
369+
recordAttrs["event.name"] = "foo"
370+
}
371+
logs := newLogsWithAttributes(
372+
recordAttrs,
373+
nil,
374+
map[string]any{
375+
"data_stream.dataset": "resource.attribute.dataset",
376+
"data_stream.namespace": "resource.attribute.namespace",
377+
"resource.attr.foo": "resource.attr.foo.value",
378+
},
379+
)
380+
tc.body.CopyTo(logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body())
381+
mustSendLogs(t, exporter, logs)
382+
rec.WaitItems(1)
383+
384+
expected := []itemRequest{
385+
{
386+
Action: []byte(`{"create":{"_index":"logs-attr.dataset.otel-resource.attribute.namespace"}}`),
387+
Document: tc.wantDocument,
388+
},
389+
}
390+
391+
assertItemsEqual(t, expected, rec.Items(), false)
325392
}
326393

327-
assertItemsEqual(t, expected, rec.Items(), false)
328394
})
329395

330396
t.Run("retry http request", func(t *testing.T) {

exporter/elasticsearchexporter/model.go

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -160,36 +160,53 @@ func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchem
160160
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)
161161

162162
// Body
163-
setOTelLogBody(&document, record.Body())
163+
setOTelLogBody(&document, record.Body(), record.Attributes())
164164

165165
return document
166166
}
167167

168-
func setOTelLogBody(doc *objmodel.Document, body pcommon.Value) {
168+
func setOTelLogBody(doc *objmodel.Document, body pcommon.Value, attributes pcommon.Map) {
169+
// Determine if this log record is an event, as they are mapped differently
170+
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/events.md
171+
_, isEvent := attributes.Get("event.name")
172+
169173
switch body.Type() {
170174
case pcommon.ValueTypeMap:
171-
doc.AddAttribute("body_structured", body)
175+
if isEvent {
176+
doc.AddAttribute("body.structured", body)
177+
} else {
178+
doc.AddAttribute("body.flattened", body)
179+
}
172180
case pcommon.ValueTypeSlice:
173-
slice := body.Slice()
174-
for i := 0; i < slice.Len(); i++ {
175-
switch slice.At(i).Type() {
176-
case pcommon.ValueTypeMap, pcommon.ValueTypeSlice:
177-
doc.AddAttribute("body_structured", body)
178-
return
181+
// output must be an array of objects due to ES limitations
182+
// otherwise, wrap the array in an object
183+
s := body.Slice()
184+
allMaps := true
185+
for i := 0; i < s.Len(); i++ {
186+
if s.At(i).Type() != pcommon.ValueTypeMap {
187+
allMaps = false
179188
}
180189
}
181190

182-
bodyTextVal := pcommon.NewValueSlice()
183-
bodyTextSlice := bodyTextVal.Slice()
184-
bodyTextSlice.EnsureCapacity(slice.Len())
191+
var outVal pcommon.Value
192+
if allMaps {
193+
outVal = body
194+
} else {
195+
vm := pcommon.NewValueMap()
196+
m := vm.SetEmptyMap()
197+
body.Slice().CopyTo(m.PutEmptySlice("value"))
198+
outVal = vm
199+
}
185200

186-
for i := 0; i < slice.Len(); i++ {
187-
elem := slice.At(i)
188-
bodyTextSlice.AppendEmpty().SetStr(elem.AsString())
201+
if isEvent {
202+
doc.AddAttribute("body.structured", outVal)
203+
} else {
204+
doc.AddAttribute("body.flattened", outVal)
189205
}
190-
doc.AddAttribute("body_text", bodyTextVal)
206+
case pcommon.ValueTypeStr:
207+
doc.AddString("body.text", body.Str())
191208
default:
192-
doc.AddString("body_text", body.AsString())
209+
doc.AddString("body.text", body.AsString())
193210
}
194211
}
195212

0 commit comments

Comments
 (0)