Skip to content

Commit e484a74

Browse files
committed
move EncodeSpan to the encoder
1 parent ac063cc commit e484a74

File tree

5 files changed

+181
-186
lines changed

5 files changed

+181
-186
lines changed

exporter/elasticsearchexporter/internal/mapping/default.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,29 @@
44
package mapping // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/mapping"
55

66
import (
7+
"encoding/json"
8+
"time"
9+
710
"go.opentelemetry.io/collector/pdata/pcommon"
811
"go.opentelemetry.io/collector/pdata/plog"
12+
"go.opentelemetry.io/collector/pdata/ptrace"
913

1014
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
16+
)
17+
18+
const (
19+
traceIDField = "traceID"
20+
spanIDField = "spanID"
21+
attributeField = "attribute"
1122
)
1223

1324
// DefaultEncoder is an encoder that handles the `none` and `raw` encoding modes.
1425
type DefaultEncoder struct {
1526
Mode
1627
}
1728

18-
func (e DefaultEncoder) EncodeLog(resource pcommon.Resource, record plog.LogRecord, scopeLogs plog.ScopeLogs) objmodel.Document {
29+
func (e DefaultEncoder) EncodeLog(resource pcommon.Resource, scopeLogs plog.ScopeLogs, record plog.LogRecord) objmodel.Document {
1930
var document objmodel.Document
2031

2132
docTimeStamp := record.Timestamp()
@@ -36,6 +47,27 @@ func (e DefaultEncoder) EncodeLog(resource pcommon.Resource, record plog.LogReco
3647
return document
3748
}
3849

50+
func (e DefaultEncoder) EncodeSpan(resourceSpans ptrace.ResourceSpans, scopeSpans ptrace.ScopeSpans, span ptrace.Span) objmodel.Document {
51+
var document objmodel.Document
52+
53+
document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
54+
document.AddTimestamp("EndTimestamp", span.EndTimestamp())
55+
document.AddTraceID("TraceId", span.TraceID())
56+
document.AddSpanID("SpanId", span.SpanID())
57+
document.AddSpanID("ParentSpanId", span.ParentSpanID())
58+
document.AddString("Name", span.Name())
59+
document.AddString("Kind", traceutil.SpanKindStr(span.Kind()))
60+
document.AddInt("TraceStatus", int64(span.Status().Code()))
61+
document.AddString("TraceStatusDescription", span.Status().Message())
62+
document.AddString("Link", spanLinksToString(span.Links()))
63+
encodeAttributes(&document, e.Mode, span.Attributes())
64+
document.AddAttributes("Resource", resourceSpans.Resource().Attributes())
65+
encodeEvents(&document, e.Mode, span.Events())
66+
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
67+
document.AddAttributes("Scope", scopeToAttributes(scopeSpans.Scope()))
68+
return document
69+
}
70+
3971
func encodeAttributes(document *objmodel.Document, m Mode, attributes pcommon.Map) {
4072
key := "Attributes"
4173
if m == ModeRaw {
@@ -44,6 +76,14 @@ func encodeAttributes(document *objmodel.Document, m Mode, attributes pcommon.Ma
4476
document.AddAttributes(key, attributes)
4577
}
4678

79+
func encodeEvents(document *objmodel.Document, m Mode, events ptrace.SpanEventSlice) {
80+
key := "Events"
81+
if m == ModeRaw {
82+
key = ""
83+
}
84+
document.AddEvents(key, events)
85+
}
86+
4787
func scopeToAttributes(scope pcommon.InstrumentationScope) pcommon.Map {
4888
attrs := pcommon.NewMap()
4989
attrs.PutStr("name", scope.Name())
@@ -53,3 +93,23 @@ func scopeToAttributes(scope pcommon.InstrumentationScope) pcommon.Map {
5393
}
5494
return attrs
5595
}
96+
97+
func spanLinksToString(spanLinkSlice ptrace.SpanLinkSlice) string {
98+
linkArray := make([]map[string]any, 0, spanLinkSlice.Len())
99+
for i := 0; i < spanLinkSlice.Len(); i++ {
100+
spanLink := spanLinkSlice.At(i)
101+
link := map[string]any{}
102+
link[spanIDField] = traceutil.SpanIDToHexOrEmptyString(spanLink.SpanID())
103+
link[traceIDField] = traceutil.TraceIDToHexOrEmptyString(spanLink.TraceID())
104+
link[attributeField] = spanLink.Attributes().AsRaw()
105+
linkArray = append(linkArray, link)
106+
}
107+
linkArrayBytes, _ := json.Marshal(&linkArray)
108+
return string(linkArrayBytes)
109+
}
110+
111+
// durationAsMicroseconds calculate span duration through end - start nanoseconds and converts time.Time to microseconds,
112+
// which is the format the Duration field is stored in the Span.
113+
func durationAsMicroseconds(start, end time.Time) int64 {
114+
return (end.UnixNano() - start.UnixNano()) / 1000
115+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package mapping
5+
6+
import (
7+
"fmt"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/pdata/pcommon"
13+
"go.opentelemetry.io/collector/pdata/ptrace"
14+
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
16+
)
17+
18+
func TestEncodeAttributes(t *testing.T) {
19+
t.Parallel()
20+
21+
attributes := pcommon.NewMap()
22+
err := attributes.FromRaw(map[string]any{
23+
"s": "baz",
24+
"o": map[string]any{
25+
"sub_i": 19,
26+
},
27+
})
28+
require.NoError(t, err)
29+
30+
tests := map[string]struct {
31+
mappingMode Mode
32+
want func() objmodel.Document
33+
}{
34+
"raw": {
35+
mappingMode: ModeRaw,
36+
want: func() objmodel.Document {
37+
return objmodel.DocumentFromAttributes(attributes)
38+
},
39+
},
40+
"none": {
41+
mappingMode: ModeNone,
42+
want: func() objmodel.Document {
43+
doc := objmodel.Document{}
44+
doc.AddAttributes("Attributes", attributes)
45+
return doc
46+
},
47+
},
48+
"ecs": {
49+
mappingMode: ModeECS,
50+
want: func() objmodel.Document {
51+
doc := objmodel.Document{}
52+
doc.AddAttributes("Attributes", attributes)
53+
return doc
54+
},
55+
},
56+
}
57+
58+
for name, test := range tests {
59+
t.Run(name, func(t *testing.T) {
60+
doc := objmodel.Document{}
61+
encodeAttributes(&doc, test.mappingMode, attributes)
62+
require.Equal(t, test.want(), doc)
63+
})
64+
}
65+
}
66+
67+
func TestEncodeEvents(t *testing.T) {
68+
t.Parallel()
69+
70+
events := ptrace.NewSpanEventSlice()
71+
events.EnsureCapacity(4)
72+
for i := 0; i < 4; i++ {
73+
event := events.AppendEmpty()
74+
event.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Duration(i) * time.Minute)))
75+
event.SetName(fmt.Sprintf("event_%d", i))
76+
}
77+
78+
tests := map[string]struct {
79+
mappingMode Mode
80+
want func() objmodel.Document
81+
}{
82+
"raw": {
83+
mappingMode: ModeRaw,
84+
want: func() objmodel.Document {
85+
doc := objmodel.Document{}
86+
doc.AddEvents("", events)
87+
return doc
88+
},
89+
},
90+
"none": {
91+
mappingMode: ModeNone,
92+
want: func() objmodel.Document {
93+
doc := objmodel.Document{}
94+
doc.AddEvents("Events", events)
95+
return doc
96+
},
97+
},
98+
"ecs": {
99+
mappingMode: ModeECS,
100+
want: func() objmodel.Document {
101+
doc := objmodel.Document{}
102+
doc.AddEvents("Events", events)
103+
return doc
104+
},
105+
},
106+
}
107+
108+
for name, test := range tests {
109+
t.Run(name, func(t *testing.T) {
110+
doc := objmodel.Document{}
111+
encodeEvents(&doc, test.mappingMode, events)
112+
require.Equal(t, test.want(), doc)
113+
})
114+
}
115+
}

exporter/elasticsearchexporter/internal/mapping/encoder.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ package mapping // import "github.com/open-telemetry/opentelemetry-collector-con
66
import (
77
"go.opentelemetry.io/collector/pdata/pcommon"
88
"go.opentelemetry.io/collector/pdata/plog"
9+
"go.opentelemetry.io/collector/pdata/ptrace"
910

1011
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
1112
)
1213

1314
// Encoder provider an interface for all mapping encoders
1415
type Encoder interface {
15-
EncodeLog(pcommon.Resource, plog.LogRecord, plog.ScopeLogs) objmodel.Document
16+
EncodeLog(pcommon.Resource, plog.ScopeLogs, plog.LogRecord) objmodel.Document
17+
EncodeSpan(ptrace.ResourceSpans, ptrace.ScopeSpans, ptrace.Span) objmodel.Document
1618
}

exporter/elasticsearchexporter/model.go

Lines changed: 2 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry
66
import (
77
"bytes"
88
"encoding/binary"
9-
"encoding/json"
109
"errors"
1110
"fmt"
1211
"hash"
1312
"hash/fnv"
1413
"math"
1514
"slices"
1615
"strings"
17-
"time"
1816

1917
jsoniter "github.com/json-iterator/go"
2018
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -26,7 +24,6 @@ import (
2624
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/exphistogram"
2725
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/mapping"
2826
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
29-
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
3027
)
3128

3229
// resourceAttrsConversionMap contains conversions for resource-level attributes
@@ -106,12 +103,6 @@ type dataPoint interface {
106103
HasMappingHint(mappingHint) bool
107104
}
108105

109-
const (
110-
traceIDField = "traceID"
111-
spanIDField = "spanID"
112-
attributeField = "attribute"
113-
)
114-
115106
func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scopeLogs plog.ScopeLogs) ([]byte, error) {
116107
var document objmodel.Document
117108
switch m.mode {
@@ -122,7 +113,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str
122113
case mapping.ModeBodyMap:
123114
return m.encodeLogBodyMapMode(record)
124115
default:
125-
document = mapping.DefaultEncoder{Mode: m.mode}.EncodeLog(resource, record, scopeLogs)
116+
document = mapping.DefaultEncoder{Mode: m.mode}.EncodeLog(resource, scopeLogs, record)
126117
}
127118
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
128119
document.Dedup(m.mode != mapping.ModeOTel)
@@ -644,7 +635,7 @@ func (m *encodeModel) encodeSpan(resourceSpans ptrace.ResourceSpans, scopeSpans
644635
case mapping.ModeOTel:
645636
document = m.encodeSpanOTelMode(resourceSpans.Resource(), resourceSpans.SchemaUrl(), span, scopeSpans.Scope(), scopeSpans.SchemaUrl())
646637
default:
647-
document = m.encodeSpanDefaultMode(resourceSpans.Resource(), span, scopeSpans.Scope())
638+
document = mapping.DefaultEncoder{Mode: m.mode}.EncodeSpan(resourceSpans, scopeSpans, span)
648639
}
649640
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
650641
document.Dedup(m.mode != mapping.ModeOTel)
@@ -694,26 +685,6 @@ func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSche
694685
return document
695686
}
696687

697-
func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) objmodel.Document {
698-
var document objmodel.Document
699-
document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
700-
document.AddTimestamp("EndTimestamp", span.EndTimestamp())
701-
document.AddTraceID("TraceId", span.TraceID())
702-
document.AddSpanID("SpanId", span.SpanID())
703-
document.AddSpanID("ParentSpanId", span.ParentSpanID())
704-
document.AddString("Name", span.Name())
705-
document.AddString("Kind", traceutil.SpanKindStr(span.Kind()))
706-
document.AddInt("TraceStatus", int64(span.Status().Code()))
707-
document.AddString("TraceStatusDescription", span.Status().Message())
708-
document.AddString("Link", spanLinksToString(span.Links()))
709-
m.encodeAttributes(&document, span.Attributes())
710-
document.AddAttributes("Resource", resource.Attributes())
711-
m.encodeEvents(&document, span.Events())
712-
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
713-
document.AddAttributes("Scope", scopeToAttributes(scope))
714-
return document
715-
}
716-
717688
func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) *objmodel.Document {
718689
if m.mode != mapping.ModeOTel {
719690
// Currently span events are stored separately only in OTel mapping mode.
@@ -736,52 +707,6 @@ func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaU
736707
return &document
737708
}
738709

739-
func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) {
740-
key := "Attributes"
741-
if m.mode == mapping.ModeRaw {
742-
key = ""
743-
}
744-
document.AddAttributes(key, attributes)
745-
}
746-
747-
func (m *encodeModel) encodeEvents(document *objmodel.Document, events ptrace.SpanEventSlice) {
748-
key := "Events"
749-
if m.mode == mapping.ModeRaw {
750-
key = ""
751-
}
752-
document.AddEvents(key, events)
753-
}
754-
755-
func spanLinksToString(spanLinkSlice ptrace.SpanLinkSlice) string {
756-
linkArray := make([]map[string]any, 0, spanLinkSlice.Len())
757-
for i := 0; i < spanLinkSlice.Len(); i++ {
758-
spanLink := spanLinkSlice.At(i)
759-
link := map[string]any{}
760-
link[spanIDField] = traceutil.SpanIDToHexOrEmptyString(spanLink.SpanID())
761-
link[traceIDField] = traceutil.TraceIDToHexOrEmptyString(spanLink.TraceID())
762-
link[attributeField] = spanLink.Attributes().AsRaw()
763-
linkArray = append(linkArray, link)
764-
}
765-
linkArrayBytes, _ := json.Marshal(&linkArray)
766-
return string(linkArrayBytes)
767-
}
768-
769-
// durationAsMicroseconds calculate span duration through end - start nanoseconds and converts time.Time to microseconds,
770-
// which is the format the Duration field is stored in the Span.
771-
func durationAsMicroseconds(start, end time.Time) int64 {
772-
return (end.UnixNano() - start.UnixNano()) / 1000
773-
}
774-
775-
func scopeToAttributes(scope pcommon.InstrumentationScope) pcommon.Map {
776-
attrs := pcommon.NewMap()
777-
attrs.PutStr("name", scope.Name())
778-
attrs.PutStr("version", scope.Version())
779-
for k, v := range scope.Attributes().AsRaw() {
780-
attrs.PutStr(k, v.(string))
781-
}
782-
return attrs
783-
}
784-
785710
func encodeAttributesECSMode(document *objmodel.Document, attrs pcommon.Map, conversionMap map[string]string, preserveMap map[string]bool) {
786711
if len(conversionMap) == 0 {
787712
// No conversions to be done; add all attributes at top level of

0 commit comments

Comments
 (0)