Skip to content

Commit a3d4bdc

Browse files
axwcarsonip
andauthored
[elasticsearchexporter] [chore] extract a dataPointHasher interface (open-telemetry#37764)
#### Description Extract a dataPointHasher interface from the mappingModel interface, so we can make the latter purely about encoding, and separate concerns. #### Link to tracking issue Part of refactoring for open-telemetry#36092 #### Testing N/A, non-functional change. #### Documentation N/A --------- Co-authored-by: Carson Ip <[email protected]>
1 parent c4405d7 commit a3d4bdc

File tree

4 files changed

+135
-111
lines changed

4 files changed

+135
-111
lines changed

exporter/elasticsearchexporter/exporter.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ func (e *elasticsearchExporter) pushMetricsData(
249249
) error {
250250
mappingMode := e.config.MappingMode()
251251
router := newDocumentRouter(mappingMode, e.dynamicIndex, e.index, e.config)
252+
hasher := newDataPointHasher(mappingMode)
252253

253254
e.wg.Add(1)
254255
defer e.wg.Done()
@@ -284,7 +285,7 @@ func (e *elasticsearchExporter) pushMetricsData(
284285
groupedDataPoints = make(map[uint32]*dataPointsGroup)
285286
groupedDataPointsByIndex[index] = groupedDataPoints
286287
}
287-
dpHash := e.model.hashDataPoint(dp)
288+
dpHash := hasher.hashDataPoint(dp)
288289
dpGroup, ok := groupedDataPoints[dpHash]
289290
if !ok {
290291
groupedDataPoints[dpHash] = &dataPointsGroup{
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
5+
6+
import (
7+
"encoding/binary"
8+
"hash"
9+
"hash/fnv"
10+
"math"
11+
"slices"
12+
13+
"go.opentelemetry.io/collector/pdata/pcommon"
14+
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints"
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
17+
)
18+
19+
// dataPointHasher is an interface for hashing data points by their identity,
20+
// for grouping into a single document.
21+
type dataPointHasher interface {
22+
hashDataPoint(datapoints.DataPoint) uint32
23+
}
24+
25+
func newDataPointHasher(mode MappingMode) dataPointHasher {
26+
switch mode {
27+
case MappingOTel:
28+
return otelDataPointHasher{}
29+
default:
30+
// Defaults to ECS for backward compatibility
31+
return ecsDataPointHasher{}
32+
}
33+
}
34+
35+
// TODO use https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/exp/metrics/identity
36+
37+
type (
38+
ecsDataPointHasher struct{}
39+
otelDataPointHasher struct{}
40+
)
41+
42+
func (h ecsDataPointHasher) hashDataPoint(dp datapoints.DataPoint) uint32 {
43+
hasher := fnv.New32a()
44+
45+
timestampBuf := make([]byte, 8)
46+
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp()))
47+
hasher.Write(timestampBuf)
48+
49+
mapHashExcludeReservedAttrs(hasher, dp.Attributes())
50+
51+
return hasher.Sum32()
52+
}
53+
54+
func (h otelDataPointHasher) hashDataPoint(dp datapoints.DataPoint) uint32 {
55+
hasher := fnv.New32a()
56+
57+
timestampBuf := make([]byte, 8)
58+
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp()))
59+
hasher.Write(timestampBuf)
60+
61+
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.StartTimestamp()))
62+
hasher.Write(timestampBuf)
63+
64+
hasher.Write([]byte(dp.Metric().Unit()))
65+
66+
mapHashExcludeReservedAttrs(hasher, dp.Attributes(), elasticsearch.MappingHintsAttrKey)
67+
68+
return hasher.Sum32()
69+
}
70+
71+
// mapHashExcludeReservedAttrs is mapHash but ignoring some reserved attributes.
72+
// e.g. index is already considered during routing and DS attributes do not need to be considered in hashing
73+
func mapHashExcludeReservedAttrs(hasher hash.Hash, m pcommon.Map, extra ...string) {
74+
m.Range(func(k string, v pcommon.Value) bool {
75+
switch k {
76+
case elasticsearch.DataStreamType, elasticsearch.DataStreamDataset, elasticsearch.DataStreamNamespace:
77+
return true
78+
}
79+
if slices.Contains(extra, k) {
80+
return true
81+
}
82+
hasher.Write([]byte(k))
83+
valueHash(hasher, v)
84+
85+
return true
86+
})
87+
}
88+
89+
func mapHash(hasher hash.Hash, m pcommon.Map) {
90+
m.Range(func(k string, v pcommon.Value) bool {
91+
hasher.Write([]byte(k))
92+
valueHash(hasher, v)
93+
94+
return true
95+
})
96+
}
97+
98+
func valueHash(h hash.Hash, v pcommon.Value) {
99+
switch v.Type() {
100+
case pcommon.ValueTypeEmpty:
101+
h.Write([]byte{0})
102+
case pcommon.ValueTypeStr:
103+
h.Write([]byte(v.Str()))
104+
case pcommon.ValueTypeBool:
105+
if v.Bool() {
106+
h.Write([]byte{1})
107+
} else {
108+
h.Write([]byte{0})
109+
}
110+
case pcommon.ValueTypeDouble:
111+
buf := make([]byte, 8)
112+
binary.LittleEndian.PutUint64(buf, math.Float64bits(v.Double()))
113+
h.Write(buf)
114+
case pcommon.ValueTypeInt:
115+
buf := make([]byte, 8)
116+
binary.LittleEndian.PutUint64(buf, uint64(v.Int()))
117+
h.Write(buf)
118+
case pcommon.ValueTypeBytes:
119+
h.Write(v.Bytes().AsRaw())
120+
case pcommon.ValueTypeMap:
121+
mapHash(h, v.Map())
122+
case pcommon.ValueTypeSlice:
123+
sliceHash(h, v.Slice())
124+
}
125+
}
126+
127+
func sliceHash(h hash.Hash, s pcommon.Slice) {
128+
for i := 0; i < s.Len(); i++ {
129+
valueHash(h, s.At(i))
130+
}
131+
}

exporter/elasticsearchexporter/model.go

Lines changed: 0 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,9 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry
55

66
import (
77
"bytes"
8-
"encoding/binary"
98
"encoding/json"
109
"errors"
1110
"fmt"
12-
"hash"
13-
"hash/fnv"
14-
"math"
15-
"slices"
1611
"time"
1712

1813
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -81,7 +76,6 @@ type mappingModel interface {
8176
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, elasticsearch.Index, *bytes.Buffer) error
8277
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, elasticsearch.Index, *bytes.Buffer) error
8378
encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx elasticsearch.Index, buf *bytes.Buffer)
84-
hashDataPoint(datapoints.DataPoint) uint32
8579
encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []datapoints.DataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error)
8680
encodeProfile(pcommon.Resource, pcommon.InstrumentationScope, pprofile.Profile, func(*bytes.Buffer, string, string) error) error
8781
}
@@ -189,17 +183,6 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo
189183
return document
190184
}
191185

192-
// upsertMetricDataPointValue upserts a datapoint value to documents which is already hashed by resource and index
193-
func (m *encodeModel) hashDataPoint(dp datapoints.DataPoint) uint32 {
194-
switch m.mode {
195-
case MappingOTel:
196-
return metricOTelHash(dp, dp.Metric().Unit())
197-
default:
198-
// Defaults to ECS for backward compatibility
199-
return metricECSHash(dp.Timestamp(), dp.Attributes())
200-
}
201-
}
202-
203186
func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []datapoints.DataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) {
204187
dp0 := dataPoints[0]
205188
var document objmodel.Document
@@ -450,95 +433,3 @@ func encodeLogTimestampECSMode(document *objmodel.Document, record plog.LogRecor
450433

451434
document.AddTimestamp("@timestamp", record.ObservedTimestamp())
452435
}
453-
454-
// TODO use https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/exp/metrics/identity
455-
func metricECSHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 {
456-
hasher := fnv.New32a()
457-
458-
timestampBuf := make([]byte, 8)
459-
binary.LittleEndian.PutUint64(timestampBuf, uint64(timestamp))
460-
hasher.Write(timestampBuf)
461-
462-
mapHashExcludeReservedAttrs(hasher, attributes)
463-
464-
return hasher.Sum32()
465-
}
466-
467-
func metricOTelHash(dp datapoints.DataPoint, unit string) uint32 {
468-
hasher := fnv.New32a()
469-
470-
timestampBuf := make([]byte, 8)
471-
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp()))
472-
hasher.Write(timestampBuf)
473-
474-
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.StartTimestamp()))
475-
hasher.Write(timestampBuf)
476-
477-
hasher.Write([]byte(unit))
478-
479-
mapHashExcludeReservedAttrs(hasher, dp.Attributes(), elasticsearch.MappingHintsAttrKey)
480-
481-
return hasher.Sum32()
482-
}
483-
484-
// mapHashExcludeReservedAttrs is mapHash but ignoring some reserved attributes.
485-
// e.g. index is already considered during routing and DS attributes do not need to be considered in hashing
486-
func mapHashExcludeReservedAttrs(hasher hash.Hash, m pcommon.Map, extra ...string) {
487-
m.Range(func(k string, v pcommon.Value) bool {
488-
switch k {
489-
case elasticsearch.DataStreamType, elasticsearch.DataStreamDataset, elasticsearch.DataStreamNamespace:
490-
return true
491-
}
492-
if slices.Contains(extra, k) {
493-
return true
494-
}
495-
hasher.Write([]byte(k))
496-
valueHash(hasher, v)
497-
498-
return true
499-
})
500-
}
501-
502-
func mapHash(hasher hash.Hash, m pcommon.Map) {
503-
m.Range(func(k string, v pcommon.Value) bool {
504-
hasher.Write([]byte(k))
505-
valueHash(hasher, v)
506-
507-
return true
508-
})
509-
}
510-
511-
func valueHash(h hash.Hash, v pcommon.Value) {
512-
switch v.Type() {
513-
case pcommon.ValueTypeEmpty:
514-
h.Write([]byte{0})
515-
case pcommon.ValueTypeStr:
516-
h.Write([]byte(v.Str()))
517-
case pcommon.ValueTypeBool:
518-
if v.Bool() {
519-
h.Write([]byte{1})
520-
} else {
521-
h.Write([]byte{0})
522-
}
523-
case pcommon.ValueTypeDouble:
524-
buf := make([]byte, 8)
525-
binary.LittleEndian.PutUint64(buf, math.Float64bits(v.Double()))
526-
h.Write(buf)
527-
case pcommon.ValueTypeInt:
528-
buf := make([]byte, 8)
529-
binary.LittleEndian.PutUint64(buf, uint64(v.Int()))
530-
h.Write(buf)
531-
case pcommon.ValueTypeBytes:
532-
h.Write(v.Bytes().AsRaw())
533-
case pcommon.ValueTypeMap:
534-
mapHash(h, v.Map())
535-
case pcommon.ValueTypeSlice:
536-
sliceHash(h, v.Slice())
537-
}
538-
}
539-
540-
func sliceHash(h hash.Hash, s pcommon.Slice) {
541-
for i := 0; i < s.Len(); i++ {
542-
valueHash(h, s.At(i))
543-
}
544-
}

exporter/elasticsearchexporter/model_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func TestEncodeMetric(t *testing.T) {
9090
model := &encodeModel{
9191
mode: MappingECS,
9292
}
93+
hasher := newDataPointHasher(model.mode)
9394

9495
groupedDataPoints := make(map[uint32][]datapoints.DataPoint)
9596

@@ -100,7 +101,7 @@ func TestEncodeMetric(t *testing.T) {
100101
dps := m.Sum().DataPoints()
101102
for i := 0; i < dps.Len(); i++ {
102103
dp := datapoints.NewNumber(m, dps.At(i))
103-
dpHash := model.hashDataPoint(dp)
104+
dpHash := hasher.hashDataPoint(dp)
104105
dataPoints, ok := groupedDataPoints[dpHash]
105106
if !ok {
106107
groupedDataPoints[dpHash] = []datapoints.DataPoint{dp}

0 commit comments

Comments
 (0)