Skip to content

Commit 8fcd99b

Browse files
committed
Log validation error when metric with same name has already been serialized
1 parent 69ae5ad commit 8fcd99b

File tree

2 files changed

+58
-0
lines changed

2 files changed

+58
-0
lines changed

exporter/elasticsearchexporter/pdata_serializer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry
66
import (
77
"bytes"
88
"encoding/hex"
9+
"fmt"
910
"strings"
1011

1112
"github.com/elastic/go-structform"
@@ -48,8 +49,21 @@ func serializeDataPoints(v *json.Visitor, dataPoints []dataPoint, validationErro
4849

4950
dynamicTemplates := make(map[string]string, len(dataPoints))
5051
var docCount uint64
52+
metricNames := make(map[string]bool, len(dataPoints))
5153
for _, dp := range dataPoints {
5254
metric := dp.Metric()
55+
if _, present := metricNames[metric.Name()]; present {
56+
*validationErrors = append(
57+
*validationErrors,
58+
fmt.Errorf(
59+
"metric with name '%s' has already been serialized in document with timestamp %s",
60+
metric.Name(),
61+
dp.Timestamp().AsTime().UTC().Format(tsLayout),
62+
),
63+
)
64+
continue
65+
}
66+
metricNames[metric.Name()] = true
5367
value, err := dp.Value()
5468
if dp.HasMappingHint(hintDocCount) {
5569
docCount = dp.DocCount()

exporter/elasticsearchexporter/pdata_serializer_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ package elasticsearchexporter
66
import (
77
"bytes"
88
"encoding/json"
9+
"fmt"
910
"testing"
1011

1112
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/require"
1314
"go.opentelemetry.io/collector/pdata/pcommon"
1415
"go.opentelemetry.io/collector/pdata/plog"
16+
"go.opentelemetry.io/collector/pdata/pmetric"
1517
)
1618

1719
func TestSerializeLog(t *testing.T) {
@@ -159,6 +161,48 @@ func TestSerializeLog(t *testing.T) {
159161
}
160162
}
161163

164+
func TestSerializeMetricsConflict(t *testing.T) {
165+
resourceMetrics := pmetric.NewResourceMetrics()
166+
scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty()
167+
var dataPoints []dataPoint
168+
metric1 := scopeMetrics.Metrics().AppendEmpty()
169+
metric2 := scopeMetrics.Metrics().AppendEmpty()
170+
for _, m := range []pmetric.Metric{metric1, metric2} {
171+
m.SetName("foo")
172+
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
173+
dp.SetIntValue(42)
174+
dataPoints = append(dataPoints, newNumberDataPoint(m, dp))
175+
}
176+
177+
var validationErrors []error
178+
var buf bytes.Buffer
179+
_, err := serializeMetrics(resourceMetrics.Resource(), "", scopeMetrics.Scope(), "", dataPoints, &validationErrors, &buf)
180+
if err != nil {
181+
t.Errorf("serializeMetrics() error = %v", err)
182+
}
183+
b := buf.Bytes()
184+
eventAsJSON := string(b)
185+
var result any
186+
decoder := json.NewDecoder(bytes.NewBuffer(b))
187+
decoder.UseNumber()
188+
if err := decoder.Decode(&result); err != nil {
189+
t.Error(err)
190+
}
191+
192+
assert.Len(t, validationErrors, 1)
193+
assert.Equal(t, fmt.Errorf("metric with name 'foo' has already been serialized in document with timestamp 1970-01-01T00:00:00.000000000Z"), validationErrors[0])
194+
195+
assert.Equal(t, map[string]any{
196+
"@timestamp": "1970-01-01T00:00:00.000000000Z",
197+
"data_stream": map[string]any{},
198+
"resource": map[string]any{},
199+
"scope": map[string]any{},
200+
"metrics": map[string]any{
201+
"foo": json.Number("42"),
202+
},
203+
}, result, eventAsJSON)
204+
}
205+
162206
func TestMergeGeolocation(t *testing.T) {
163207
attributes := map[string]any{
164208
"geo.location.lon": 1.1,

0 commit comments

Comments
 (0)