Skip to content

Commit 1fd6851

Browse files
authored
[exporter/elasticsearch] Add explicit bounds histogram support to metrics (#34045)
1 parent 1821ecd commit 1fd6851

File tree

7 files changed

+261
-29
lines changed

7 files changed

+261
-29
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add explicit bounds histogram support to metrics
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34045]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/data_stream_router.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"go.opentelemetry.io/collector/pdata/pcommon"
1010
"go.opentelemetry.io/collector/pdata/plog"
11-
"go.opentelemetry.io/collector/pdata/pmetric"
1211
"go.opentelemetry.io/collector/pdata/ptrace"
1312
)
1413

@@ -60,7 +59,7 @@ func routeLogRecord(
6059
// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes.
6160
// This function may mutate record attributes.
6261
func routeDataPoint(
63-
dataPoint pmetric.NumberDataPoint,
62+
dataPoint dataPoint,
6463
scope pcommon.InstrumentationScope,
6564
resource pcommon.Resource,
6665
fIndex string,

exporter/elasticsearchexporter/exporter.go

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -179,27 +179,64 @@ func (e *elasticsearchExporter) pushMetricsData(
179179
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
180180
metric := scopeMetrics.Metrics().At(k)
181181

182-
// We only support Sum and Gauge metrics at the moment.
183-
var dataPoints pmetric.NumberDataPointSlice
184-
switch metric.Type() {
185-
case pmetric.MetricTypeSum:
186-
dataPoints = metric.Sum().DataPoints()
187-
case pmetric.MetricTypeGauge:
188-
dataPoints = metric.Gauge().DataPoints()
189-
}
190-
191-
for l := 0; l < dataPoints.Len(); l++ {
192-
dataPoint := dataPoints.At(l)
193-
fIndex, err := e.getMetricDataPointIndex(resource, scope, dataPoint)
182+
upsertDataPoint := func(dp dataPoint, dpValue pcommon.Value) error {
183+
fIndex, err := e.getMetricDataPointIndex(resource, scope, dp)
194184
if err != nil {
195-
errs = append(errs, err)
196-
continue
185+
return err
197186
}
198187
if _, ok := resourceDocs[fIndex]; !ok {
199188
resourceDocs[fIndex] = make(map[uint32]objmodel.Document)
200189
}
201-
if err := e.model.upsertMetricDataPoint(resourceDocs[fIndex], resource, scope, metric, dataPoint); err != nil {
202-
errs = append(errs, err)
190+
191+
if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource, scope, metric, dp, dpValue); err != nil {
192+
return err
193+
}
194+
return nil
195+
}
196+
197+
// TODO: support exponential histogram
198+
switch metric.Type() {
199+
case pmetric.MetricTypeSum:
200+
dps := metric.Sum().DataPoints()
201+
for l := 0; l < dps.Len(); l++ {
202+
dp := dps.At(l)
203+
val, err := numberToValue(dp)
204+
if err != nil {
205+
errs = append(errs, err)
206+
continue
207+
}
208+
if err := upsertDataPoint(dp, val); err != nil {
209+
errs = append(errs, err)
210+
continue
211+
}
212+
}
213+
case pmetric.MetricTypeGauge:
214+
dps := metric.Gauge().DataPoints()
215+
for l := 0; l < dps.Len(); l++ {
216+
dp := dps.At(l)
217+
val, err := numberToValue(dp)
218+
if err != nil {
219+
errs = append(errs, err)
220+
continue
221+
}
222+
if err := upsertDataPoint(dp, val); err != nil {
223+
errs = append(errs, err)
224+
continue
225+
}
226+
}
227+
case pmetric.MetricTypeHistogram:
228+
dps := metric.Histogram().DataPoints()
229+
for l := 0; l < dps.Len(); l++ {
230+
dp := dps.At(l)
231+
val, err := histogramToValue(dp)
232+
if err != nil {
233+
errs = append(errs, err)
234+
continue
235+
}
236+
if err := upsertDataPoint(dp, val); err != nil {
237+
errs = append(errs, err)
238+
continue
239+
}
203240
}
204241
}
205242
}
@@ -238,7 +275,7 @@ func (e *elasticsearchExporter) pushMetricsData(
238275
func (e *elasticsearchExporter) getMetricDataPointIndex(
239276
resource pcommon.Resource,
240277
scope pcommon.InstrumentationScope,
241-
dataPoint pmetric.NumberDataPoint,
278+
dataPoint dataPoint,
242279
) (string, error) {
243280
fIndex := e.index
244281
if e.dynamicIndex {

exporter/elasticsearchexporter/exporter_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/json"
99
"errors"
1010
"fmt"
11+
"math"
1112
"net/http"
1213
"runtime"
1314
"sync"
@@ -495,6 +496,7 @@ func TestExporterMetrics(t *testing.T) {
495496
},
496497
)
497498
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("my.metric")
499+
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)
498500
mustSendMetrics(t, exporter, metrics)
499501

500502
rec.WaitItems(1)
@@ -633,6 +635,103 @@ func TestExporterMetrics(t *testing.T) {
633635

634636
assertItemsEqual(t, expected, rec.Items(), false)
635637
})
638+
639+
t.Run("publish histogram", func(t *testing.T) {
640+
rec := newBulkRecorder()
641+
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
642+
rec.Record(docs)
643+
return itemsAllOK(docs)
644+
})
645+
646+
exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
647+
cfg.Mapping.Mode = "ecs"
648+
})
649+
650+
metrics := pmetric.NewMetrics()
651+
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
652+
scopeA := resourceMetrics.ScopeMetrics().AppendEmpty()
653+
metricSlice := scopeA.Metrics()
654+
fooMetric := metricSlice.AppendEmpty()
655+
fooMetric.SetName("metric.foo")
656+
fooDps := fooMetric.SetEmptyHistogram().DataPoints()
657+
fooDp := fooDps.AppendEmpty()
658+
fooDp.ExplicitBounds().FromRaw([]float64{1.0, 2.0, 3.0})
659+
fooDp.BucketCounts().FromRaw([]uint64{1, 2, 3, 4})
660+
fooOtherDp := fooDps.AppendEmpty()
661+
fooOtherDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
662+
fooOtherDp.ExplicitBounds().FromRaw([]float64{4.0, 5.0, 6.0})
663+
fooOtherDp.BucketCounts().FromRaw([]uint64{4, 5, 6, 7})
664+
665+
mustSendMetrics(t, exporter, metrics)
666+
667+
rec.WaitItems(2)
668+
669+
expected := []itemRequest{
670+
{
671+
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
672+
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3]}}}`),
673+
},
674+
{
675+
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
676+
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"counts":[4,5,6,7],"values":[2,4.5,5.5,6]}}}`),
677+
},
678+
}
679+
680+
assertItemsEqual(t, expected, rec.Items(), false)
681+
})
682+
683+
t.Run("publish only valid data points", func(t *testing.T) {
684+
rec := newBulkRecorder()
685+
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
686+
rec.Record(docs)
687+
return itemsAllOK(docs)
688+
})
689+
690+
exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
691+
cfg.Mapping.Mode = "ecs"
692+
})
693+
694+
metrics := pmetric.NewMetrics()
695+
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
696+
scopeA := resourceMetrics.ScopeMetrics().AppendEmpty()
697+
metricSlice := scopeA.Metrics()
698+
fooMetric := metricSlice.AppendEmpty()
699+
fooMetric.SetName("metric.foo")
700+
fooDps := fooMetric.SetEmptyHistogram().DataPoints()
701+
fooDp := fooDps.AppendEmpty()
702+
fooDp.ExplicitBounds().FromRaw([]float64{1.0, 2.0, 3.0})
703+
fooDp.BucketCounts().FromRaw([]uint64{})
704+
fooOtherDp := fooDps.AppendEmpty()
705+
fooOtherDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
706+
fooOtherDp.ExplicitBounds().FromRaw([]float64{4.0, 5.0, 6.0})
707+
fooOtherDp.BucketCounts().FromRaw([]uint64{4, 5, 6, 7})
708+
barMetric := metricSlice.AppendEmpty()
709+
barMetric.SetName("metric.bar")
710+
barDps := barMetric.SetEmptySum().DataPoints()
711+
barDp := barDps.AppendEmpty()
712+
barDp.SetDoubleValue(math.Inf(1))
713+
barOtherDp := barDps.AppendEmpty()
714+
barOtherDp.SetDoubleValue(1.0)
715+
716+
err := exporter.ConsumeMetrics(context.Background(), metrics)
717+
require.ErrorContains(t, err, "invalid histogram data point")
718+
require.ErrorContains(t, err, "invalid number data point")
719+
720+
rec.WaitItems(2)
721+
722+
expected := []itemRequest{
723+
{
724+
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
725+
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"bar":1}}`),
726+
},
727+
{
728+
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
729+
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"counts":[4,5,6,7],"values":[2,4.5,5.5,6]}}}`),
730+
},
731+
}
732+
733+
assertItemsEqual(t, expected, rec.Items(), false)
734+
})
636735
}
637736

638737
func TestExporterTraces(t *testing.T) {

exporter/elasticsearchexporter/model.go

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"bytes"
88
"encoding/binary"
99
"encoding/json"
10+
"errors"
1011
"fmt"
1112
"hash"
1213
"hash/fnv"
@@ -65,7 +66,7 @@ var resourceAttrsToPreserve = map[string]bool{
6566
type mappingModel interface {
6667
encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error)
6768
encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error)
68-
upsertMetricDataPoint(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, pmetric.NumberDataPoint) error
69+
upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, dataPoint, pcommon.Value) error
6970
encodeDocument(objmodel.Document) ([]byte, error)
7071
}
7172

@@ -80,6 +81,11 @@ type encodeModel struct {
8081
mode MappingMode
8182
}
8283

84+
type dataPoint interface {
85+
Timestamp() pcommon.Timestamp
86+
Attributes() pcommon.Map
87+
}
88+
8389
const (
8490
traceIDField = "traceID"
8591
spanIDField = "spanID"
@@ -176,7 +182,7 @@ func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error)
176182
return buf.Bytes(), nil
177183
}
178184

179-
func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp pmetric.NumberDataPoint) error {
185+
func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error {
180186
hash := metricHash(dp.Timestamp(), dp.Attributes())
181187
var (
182188
document objmodel.Document
@@ -188,15 +194,74 @@ func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Docume
188194
document.AddAttributes("", dp.Attributes())
189195
}
190196

197+
document.AddAttribute(metric.Name(), value)
198+
199+
documents[hash] = document
200+
return nil
201+
}
202+
203+
func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) {
204+
// Histogram conversion function is from
205+
// https://github.com/elastic/apm-data/blob/3b28495c3cbdc0902983134276eb114231730249/input/otlp/metrics.go#L277
206+
bucketCounts := dp.BucketCounts()
207+
explicitBounds := dp.ExplicitBounds()
208+
if bucketCounts.Len() != explicitBounds.Len()+1 || explicitBounds.Len() == 0 {
209+
return pcommon.Value{}, errors.New("invalid histogram data point")
210+
}
211+
212+
vm := pcommon.NewValueMap()
213+
m := vm.Map()
214+
counts := m.PutEmptySlice("counts")
215+
values := m.PutEmptySlice("values")
216+
217+
values.EnsureCapacity(bucketCounts.Len())
218+
counts.EnsureCapacity(bucketCounts.Len())
219+
for i := 0; i < bucketCounts.Len(); i++ {
220+
count := bucketCounts.At(i)
221+
if count == 0 {
222+
continue
223+
}
224+
225+
var value float64
226+
switch i {
227+
// (-infinity, explicit_bounds[i]]
228+
case 0:
229+
value = explicitBounds.At(i)
230+
if value > 0 {
231+
value /= 2
232+
}
233+
234+
// (explicit_bounds[i], +infinity)
235+
case bucketCounts.Len() - 1:
236+
value = explicitBounds.At(i - 1)
237+
238+
// [explicit_bounds[i-1], explicit_bounds[i])
239+
default:
240+
// Use the midpoint between the boundaries.
241+
value = explicitBounds.At(i-1) + (explicitBounds.At(i)-explicitBounds.At(i-1))/2.0
242+
}
243+
244+
counts.AppendEmpty().SetInt(int64(count))
245+
values.AppendEmpty().SetDouble(value)
246+
}
247+
248+
return vm, nil
249+
}
250+
251+
var errInvalidNumberDataPoint = errors.New("invalid number data point")
252+
253+
func numberToValue(dp pmetric.NumberDataPoint) (pcommon.Value, error) {
191254
switch dp.ValueType() {
192255
case pmetric.NumberDataPointValueTypeDouble:
193-
document.AddAttribute(metric.Name(), pcommon.NewValueDouble(dp.DoubleValue()))
256+
value := dp.DoubleValue()
257+
if math.IsNaN(value) || math.IsInf(value, 0) {
258+
return pcommon.Value{}, errInvalidNumberDataPoint
259+
}
260+
return pcommon.NewValueDouble(value), nil
194261
case pmetric.NumberDataPointValueTypeInt:
195-
document.AddAttribute(metric.Name(), pcommon.NewValueInt(dp.IntValue()))
262+
return pcommon.NewValueInt(dp.IntValue()), nil
196263
}
197-
198-
documents[hash] = document
199-
return nil
264+
return pcommon.Value{}, errInvalidNumberDataPoint
200265
}
201266

202267
func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) {

exporter/elasticsearchexporter/model_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,14 @@ func TestEncodeMetric(t *testing.T) {
9999

100100
var docsBytes [][]byte
101101
for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ {
102-
err := model.upsertMetricDataPoint(docs,
102+
val, err := numberToValue(metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i))
103+
require.NoError(t, err)
104+
err = model.upsertMetricDataPointValue(docs,
103105
metrics.ResourceMetrics().At(0).Resource(),
104106
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(),
105107
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0),
106-
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i))
108+
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i),
109+
val)
107110
require.NoError(t, err)
108111
}
109112

0 commit comments

Comments
 (0)