-
Notifications
You must be signed in to change notification settings - Fork 3k
[exporter/elasticsearch] OTel mode serialization #33290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 34 commits
3f2f30f
d00a835
51e0c62
f308e88
5d80a84
bd189d2
87d9929
eedd947
3fba3f7
3efdbec
ca8a2ec
460138c
26a374b
7362669
174f6d8
8d8ce19
018cce9
76824b2
59e272d
39b9cb4
e914547
5b0b4ba
dd78d3f
2537852
182eb37
64d0bd5
54a52f0
5bcdf7e
e24cd4c
74c1d77
f411f0d
10370e9
8f027f5
b7a5def
abd9159
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: elasticsearchexporter | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Introduce an experimental OTel native mapping mode for logs | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [33290] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package elasticsearchexporter | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"go.opentelemetry.io/collector/pdata/plog" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
"go.opentelemetry.io/collector/pdata/ptrace" | ||
) | ||
|
||
type routeTestInfo struct { | ||
name string | ||
otel bool | ||
want string | ||
} | ||
|
||
func createRouteTests(dsType string) []routeTestInfo { | ||
renderWantRoute := func(dsType string, otel bool) string { | ||
if otel { | ||
return fmt.Sprintf("%s-%s.otel-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace) | ||
} | ||
return fmt.Sprintf("%s-%s-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace) | ||
} | ||
|
||
return []routeTestInfo{ | ||
{ | ||
name: "default", | ||
otel: false, | ||
want: renderWantRoute(dsType, false), | ||
}, | ||
{ | ||
name: "otel", | ||
otel: true, | ||
want: renderWantRoute(dsType, true), | ||
}, | ||
} | ||
} | ||
|
||
func TestRouteLogRecord(t *testing.T) { | ||
|
||
tests := createRouteTests(defaultDataStreamTypeLogs) | ||
|
||
for _, tc := range tests { | ||
t.Run(tc.name, func(t *testing.T) { | ||
ds := routeLogRecord(plog.NewLogRecord(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel) | ||
assert.Equal(t, tc.want, ds) | ||
}) | ||
} | ||
} | ||
|
||
func TestRouteDataPoint(t *testing.T) { | ||
|
||
tests := createRouteTests(defaultDataStreamTypeMetrics) | ||
|
||
for _, tc := range tests { | ||
t.Run(tc.name, func(t *testing.T) { | ||
ds := routeDataPoint(pmetric.NewNumberDataPoint(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel) | ||
assert.Equal(t, tc.want, ds) | ||
}) | ||
} | ||
} | ||
|
||
func TestRouteSpan(t *testing.T) { | ||
|
||
tests := createRouteTests(defaultDataStreamTypeTraces) | ||
|
||
for _, tc := range tests { | ||
t.Run(tc.name, func(t *testing.T) { | ||
ds := routeSpan(ptrace.NewSpan(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel) | ||
assert.Equal(t, tc.want, ds) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -244,19 +244,19 @@ func (doc *Document) Dedup() { | |
// Serialize writes the document to the given writer. The serializer will create nested objects if dedot is true. | ||
// | ||
// NOTE: The documented MUST be sorted if dedot is true. | ||
func (doc *Document) Serialize(w io.Writer, dedot bool) error { | ||
func (doc *Document) Serialize(w io.Writer, dedot bool, otel bool) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have to change objmodel for otel mode? Is there a reason why it isn't done like https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/model.go#L124 where we just add the attributes under a key? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I remember, at the time of implementing this feature, I stumbled on the issue where the document could only be serialized as completely flat from the root or "dedotted" (by default https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/factory.go#L80), all the comma separated properties were unwrapped into the nested object, which was not stored correctly "flattened" with the passthrough attributes in ES. With OTel-native serialization the serialized document structure is a mix, needed to keep only the ".attributes*" flattened, while the rest of the document from the root is not flattened. Let me know if anything changed in that area recently, this PR was open for some time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the context. I'm just learning about how passthrough field type works, and what you describe here makes sense. I wonder if it could be structured better in the code, but I don't have any good ideas at the moment and it is not a blocker. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it makes sense to revisit this once we remove the dedot option: #33772. |
||
v := json.NewVisitor(w) | ||
return doc.iterJSON(v, dedot) | ||
return doc.iterJSON(v, dedot, otel) | ||
} | ||
|
||
func (doc *Document) iterJSON(v *json.Visitor, dedot bool) error { | ||
func (doc *Document) iterJSON(v *json.Visitor, dedot bool, otel bool) error { | ||
if dedot { | ||
return doc.iterJSONDedot(v) | ||
return doc.iterJSONDedot(v, otel) | ||
} | ||
return doc.iterJSONFlat(v) | ||
return doc.iterJSONFlat(v, otel) | ||
} | ||
|
||
func (doc *Document) iterJSONFlat(w *json.Visitor) error { | ||
func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error { | ||
err := w.OnObjectStart(-1, structform.AnyType) | ||
if err != nil { | ||
return err | ||
|
@@ -275,15 +275,22 @@ func (doc *Document) iterJSONFlat(w *json.Visitor) error { | |
return err | ||
} | ||
|
||
if err := fld.value.iterJSON(w, true); err != nil { | ||
if err := fld.value.iterJSON(w, true, otel); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (doc *Document) iterJSONDedot(w *json.Visitor) error { | ||
// Set of prefixes for the OTel attributes that needs to stay flattened | ||
var otelPrefixSet = map[string]struct{}{ | ||
"attributes.": {}, | ||
"resource.attributes.": {}, | ||
"scope.attributes.": {}, | ||
} | ||
|
||
func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error { | ||
objPrefix := "" | ||
level := 0 | ||
|
||
|
@@ -335,6 +342,16 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error { | |
|
||
// increase object level up to current field | ||
for { | ||
|
||
// Otel mode serialization | ||
if otel { | ||
// Check the prefix | ||
_, isOtelPrefix := otelPrefixSet[objPrefix] | ||
if isOtelPrefix { | ||
break | ||
} | ||
} | ||
|
||
start := len(objPrefix) | ||
idx := strings.IndexByte(key[start:], '.') | ||
if idx < 0 { | ||
|
@@ -357,7 +374,7 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error { | |
if err := w.OnKey(fieldName); err != nil { | ||
return err | ||
} | ||
if err := fld.value.iterJSON(w, true); err != nil { | ||
if err := fld.value.iterJSON(w, true, otel); err != nil { | ||
return err | ||
} | ||
} | ||
|
@@ -460,7 +477,7 @@ func (v *Value) IsEmpty() bool { | |
} | ||
} | ||
|
||
func (v *Value) iterJSON(w *json.Visitor, dedot bool) error { | ||
func (v *Value) iterJSON(w *json.Visitor, dedot bool, otel bool) error { | ||
switch v.kind { | ||
case KindNil: | ||
return w.OnNil() | ||
|
@@ -483,13 +500,13 @@ func (v *Value) iterJSON(w *json.Visitor, dedot bool) error { | |
if len(v.doc.fields) == 0 { | ||
return w.OnNil() | ||
} | ||
return v.doc.iterJSON(w, dedot) | ||
return v.doc.iterJSON(w, dedot, otel) | ||
case KindArr: | ||
if err := w.OnArrayStart(-1, structform.AnyType); err != nil { | ||
return err | ||
} | ||
for i := range v.arr { | ||
if err := v.arr[i].iterJSON(w, dedot); err != nil { | ||
if err := v.arr[i].iterJSON(w, dedot, otel); err != nil { | ||
return err | ||
} | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.