Skip to content

Commit fc58cef

Browse files
authored
[exporter/elasticsearch] Fix data loss due to metric grouping regression (#37903)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Fix data loss when the same metric exists across different resources or scopes. Data points / metrics were incorrectly grouped together due to missing hash on resource / scope, leading to data loss with warning logs e.g. "metric with name '***' has already been serialized". <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #37898 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 77d4e0b commit fc58cef

File tree

5 files changed

+130
-95
lines changed

5 files changed

+130
-95
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix data loss caused by incorrect metric grouping in ECS and OTel mode
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37898]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Fix data loss when the same metric exists across different resources or scopes. Data points / metrics were incorrectly grouped together, leading to data loss with warning logs e.g. "metric with name '***' has already been serialized".
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (e *elasticsearchExporter) pushMetricsData(
285285
groupedDataPoints = make(map[uint32]*dataPointsGroup)
286286
groupedDataPointsByIndex[index] = groupedDataPoints
287287
}
288-
dpHash := hasher.hashDataPoint(dp)
288+
dpHash := hasher.hashDataPoint(resource, scope, dp)
289289
dpGroup, ok := groupedDataPoints[dpHash]
290290
if !ok {
291291
groupedDataPoints[dpHash] = &dataPointsGroup{

exporter/elasticsearchexporter/exporter_test.go

Lines changed: 92 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -931,17 +931,6 @@ func TestExporterMetrics(t *testing.T) {
931931
})
932932

933933
t.Run("publish with metrics grouping", func(t *testing.T) {
934-
rec := newBulkRecorder()
935-
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
936-
rec.Record(docs)
937-
return itemsAllOK(docs)
938-
})
939-
940-
exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
941-
cfg.MetricsIndex = "metrics.index"
942-
cfg.Mapping.Mode = "ecs"
943-
})
944-
945934
addToMetricSlice := func(metricSlice pmetric.MetricSlice) {
946935
fooMetric := metricSlice.AppendEmpty()
947936
fooMetric.SetName("metric.foo")
@@ -957,7 +946,7 @@ func TestExporterMetrics(t *testing.T) {
957946
barMetric := metricSlice.AppendEmpty()
958947
barMetric.SetName("metric.bar")
959948
barDps := barMetric.SetEmptyGauge().DataPoints()
960-
barDp := barDps.AppendEmpty()
949+
barDp := barDps.AppendEmpty() // dp without attribute
961950
barDp.SetDoubleValue(1.0)
962951
barOtherDp := barDps.AppendEmpty()
963952
fillAttributeMap(barOtherDp.Attributes(), map[string]any{
@@ -975,62 +964,109 @@ func TestExporterMetrics(t *testing.T) {
975964
bazMetric.SetName("metric.baz")
976965
bazDps := bazMetric.SetEmptyGauge().DataPoints()
977966
bazDp := bazDps.AppendEmpty()
978-
bazDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
967+
bazDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0))) // dp with different timestamp
979968
bazDp.SetDoubleValue(1.0)
980969
}
981970

982971
metrics := pmetric.NewMetrics()
983-
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
984-
fillAttributeMap(resourceMetrics.Resource().Attributes(), map[string]any{
972+
resourceA := metrics.ResourceMetrics().AppendEmpty()
973+
fillAttributeMap(resourceA.Resource().Attributes(), map[string]any{
985974
elasticsearch.DataStreamNamespace: "resource.namespace",
986975
})
987-
scopeA := resourceMetrics.ScopeMetrics().AppendEmpty()
988-
addToMetricSlice(scopeA.Metrics())
976+
scopeAA := resourceA.ScopeMetrics().AppendEmpty()
977+
addToMetricSlice(scopeAA.Metrics())
989978

990-
scopeB := resourceMetrics.ScopeMetrics().AppendEmpty()
991-
fillAttributeMap(scopeB.Scope().Attributes(), map[string]any{
992-
elasticsearch.DataStreamDataset: "scope.b",
979+
scopeAB := resourceA.ScopeMetrics().AppendEmpty()
980+
fillAttributeMap(scopeAB.Scope().Attributes(), map[string]any{
981+
elasticsearch.DataStreamDataset: "scope.ab", // routes to a different index and should not be grouped together
993982
})
994-
addToMetricSlice(scopeB.Metrics())
983+
addToMetricSlice(scopeAB.Metrics())
995984

996-
mustSendMetrics(t, exporter, metrics)
985+
scopeAC := resourceA.ScopeMetrics().AppendEmpty()
986+
fillAttributeMap(scopeAC.Scope().Attributes(), map[string]any{
987+
// ecs: scope attributes are ignored, and duplicates are dropped silently.
988+
// otel: scope attributes are dimensions and should result in a separate group.
989+
"some.scope.attribute": "scope.ac",
990+
})
991+
addToMetricSlice(scopeAC.Metrics())
997992

998-
expected := []itemRequest{
999-
{
1000-
Action: []byte(`{"create":{"_index":"metrics-generic-bar"}}`),
1001-
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"bar","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1.0}}`),
1002-
},
1003-
{
1004-
Action: []byte(`{"create":{"_index":"metrics-generic-resource.namespace"}}`),
1005-
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"resource.namespace","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1.0,"foo":1.0}}`),
1006-
},
1007-
{
1008-
Action: []byte(`{"create":{"_index":"metrics-generic-resource.namespace"}}`),
1009-
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"resource.namespace","type":"metrics"},"metric":{"bar":1.0,"foo":1}}`),
1010-
},
1011-
{
1012-
Action: []byte(`{"create":{"_index":"metrics-generic-resource.namespace"}}`),
1013-
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"resource.namespace","type":"metrics"},"metric":{"baz":1.0}}`),
1014-
},
1015-
{
1016-
Action: []byte(`{"create":{"_index":"metrics-scope.b-bar"}}`),
1017-
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"bar","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1.0}}`),
1018-
},
1019-
{
1020-
Action: []byte(`{"create":{"_index":"metrics-scope.b-resource.namespace"}}`),
1021-
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"resource.namespace","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1.0,"foo":1.0}}`),
1022-
},
1023-
{
1024-
Action: []byte(`{"create":{"_index":"metrics-scope.b-resource.namespace"}}`),
1025-
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"resource.namespace","type":"metrics"},"metric":{"bar":1.0,"foo":1}}`),
1026-
},
1027-
{
1028-
Action: []byte(`{"create":{"_index":"metrics-scope.b-resource.namespace"}}`),
1029-
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"resource.namespace","type":"metrics"},"metric":{"baz":1.0}}`),
1030-
},
993+
resourceB := metrics.ResourceMetrics().AppendEmpty()
994+
fillAttributeMap(resourceB.Resource().Attributes(), map[string]any{
995+
"my.resource": "resource.b",
996+
})
997+
scopeBA := resourceB.ScopeMetrics().AppendEmpty()
998+
addToMetricSlice(scopeBA.Metrics())
999+
1000+
scopeBB := resourceB.ScopeMetrics().AppendEmpty()
1001+
scopeBB.Scope().SetName("scope.bb")
1002+
addToMetricSlice(scopeBB.Metrics())
1003+
1004+
// identical resource
1005+
resourceAnotherB := metrics.ResourceMetrics().AppendEmpty()
1006+
fillAttributeMap(resourceAnotherB.Resource().Attributes(), map[string]any{
1007+
"my.resource": "resource.b",
1008+
})
1009+
addToMetricSlice(resourceAnotherB.ScopeMetrics().AppendEmpty().Metrics())
1010+
1011+
assertDocsInIndices := func(t *testing.T, wantDocsPerIndex map[string]int, rec *bulkRecorder) {
1012+
var sum int
1013+
for _, v := range wantDocsPerIndex {
1014+
sum += v
1015+
}
1016+
rec.WaitItems(sum)
1017+
1018+
actualDocsPerIndex := make(map[string]int)
1019+
for _, item := range rec.Items() {
1020+
idx := gjson.GetBytes(item.Action, "create._index")
1021+
actualDocsPerIndex[idx.String()]++
1022+
}
1023+
assert.Equal(t, wantDocsPerIndex, actualDocsPerIndex)
10311024
}
10321025

1033-
assertRecordedItems(t, expected, rec, false)
1026+
t.Run("ecs", func(t *testing.T) {
1027+
rec := newBulkRecorder()
1028+
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
1029+
rec.Record(docs)
1030+
return itemsAllOK(docs)
1031+
})
1032+
1033+
exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
1034+
cfg.MetricsIndex = "metrics.index"
1035+
cfg.Mapping.Mode = "ecs"
1036+
})
1037+
1038+
mustSendMetrics(t, exporter, metrics)
1039+
1040+
assertDocsInIndices(t, map[string]int{
1041+
"metrics-generic-bar": 2, // AA, BA
1042+
"metrics-generic-resource.namespace": 3,
1043+
"metrics-scope.ab-bar": 1,
1044+
"metrics-scope.ab-resource.namespace": 3,
1045+
"metrics-generic-default": 3,
1046+
}, rec)
1047+
})
1048+
1049+
t.Run("otel", func(t *testing.T) {
1050+
rec := newBulkRecorder()
1051+
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
1052+
rec.Record(docs)
1053+
return itemsAllOK(docs)
1054+
})
1055+
1056+
exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
1057+
cfg.Mapping.Mode = "otel"
1058+
})
1059+
1060+
mustSendMetrics(t, exporter, metrics)
1061+
1062+
assertDocsInIndices(t, map[string]int{
1063+
"metrics-generic.otel-bar": 4, // AA->bar, AC->bar, BA->bar, BB->bar
1064+
"metrics-generic.otel-resource.namespace": 6, // AA, AC
1065+
"metrics-scope.ab.otel-bar": 1, // AB->bar
1066+
"metrics-scope.ab.otel-resource.namespace": 3, // AB
1067+
"metrics-generic.otel-default": 6, // BA, BB
1068+
}, rec)
1069+
})
10341070
})
10351071

10361072
t.Run("publish histogram", func(t *testing.T) {
@@ -1365,40 +1401,6 @@ func TestExporterMetrics(t *testing.T) {
13651401
assertRecordedItems(t, expected, rec, false)
13661402
})
13671403

1368-
t.Run("otel mode grouping of equal resources", func(t *testing.T) {
1369-
rec := newBulkRecorder()
1370-
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
1371-
rec.Record(docs)
1372-
return itemsAllOK(docs)
1373-
})
1374-
1375-
exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
1376-
cfg.Mapping.Mode = "otel"
1377-
})
1378-
1379-
metrics := pmetric.NewMetrics()
1380-
for _, n := range []string{"m1", "m2"} {
1381-
resourceMetric := metrics.ResourceMetrics().AppendEmpty()
1382-
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
1383-
1384-
sumMetric := scopeMetric.Metrics().AppendEmpty()
1385-
sumMetric.SetName(n)
1386-
sumDP := sumMetric.SetEmptySum().DataPoints().AppendEmpty()
1387-
sumDP.SetIntValue(0)
1388-
}
1389-
1390-
mustSendMetrics(t, exporter, metrics)
1391-
1392-
expected := []itemRequest{
1393-
{
1394-
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.m1":"gauge_long","metrics.m2":"gauge_long"}}}`),
1395-
Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"m1":0,"m2":0},"resource":{},"scope":{}}`),
1396-
},
1397-
}
1398-
1399-
assertRecordedItems(t, expected, rec, false)
1400-
})
1401-
14021404
t.Run("otel mode aggregate_metric_double hint", func(t *testing.T) {
14031405
rec := newBulkRecorder()
14041406
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {

exporter/elasticsearchexporter/metric_grouping.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
// dataPointHasher is an interface for hashing data points by their identity,
2020
// for grouping into a single document.
2121
type dataPointHasher interface {
22-
hashDataPoint(datapoints.DataPoint) uint32
22+
hashDataPoint(pcommon.Resource, pcommon.InstrumentationScope, datapoints.DataPoint) uint32
2323
}
2424

2525
func newDataPointHasher(mode MappingMode) dataPointHasher {
@@ -39,9 +39,11 @@ type (
3939
otelDataPointHasher struct{}
4040
)
4141

42-
func (h ecsDataPointHasher) hashDataPoint(dp datapoints.DataPoint) uint32 {
42+
func (h ecsDataPointHasher) hashDataPoint(resource pcommon.Resource, _ pcommon.InstrumentationScope, dp datapoints.DataPoint) uint32 {
4343
hasher := fnv.New32a()
4444

45+
mapHashExcludeReservedAttrs(hasher, resource.Attributes())
46+
4547
timestampBuf := make([]byte, 8)
4648
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp()))
4749
hasher.Write(timestampBuf)
@@ -51,9 +53,13 @@ func (h ecsDataPointHasher) hashDataPoint(dp datapoints.DataPoint) uint32 {
5153
return hasher.Sum32()
5254
}
5355

54-
func (h otelDataPointHasher) hashDataPoint(dp datapoints.DataPoint) uint32 {
56+
func (h otelDataPointHasher) hashDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, dp datapoints.DataPoint) uint32 {
5557
hasher := fnv.New32a()
5658

59+
mapHashExcludeReservedAttrs(hasher, resource.Attributes(), elasticsearch.MappingHintsAttrKey)
60+
hasher.Write([]byte(scope.Name()))
61+
mapHashExcludeReservedAttrs(hasher, scope.Attributes(), elasticsearch.MappingHintsAttrKey)
62+
5763
timestampBuf := make([]byte, 8)
5864
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp()))
5965
hasher.Write(timestampBuf)

exporter/elasticsearchexporter/model_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func TestEncodeMetric(t *testing.T) {
101101
dps := m.Sum().DataPoints()
102102
for i := 0; i < dps.Len(); i++ {
103103
dp := datapoints.NewNumber(m, dps.At(i))
104-
dpHash := hasher.hashDataPoint(dp)
104+
dpHash := hasher.hashDataPoint(rm.Resource(), sm.Scope(), dp)
105105
dataPoints, ok := groupedDataPoints[dpHash]
106106
if !ok {
107107
groupedDataPoints[dpHash] = []datapoints.DataPoint{dp}

0 commit comments

Comments
 (0)