Skip to content

Commit b257e39

Browse files
committed
Bytes based batching for metrics
Signed-off-by: Israel Blancas <[email protected]>
1 parent e43c36c commit b257e39

File tree

6 files changed

+542
-105
lines changed

6 files changed

+542
-105
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
5+
6+
import (
7+
"go.opentelemetry.io/collector/pdata/pmetric"
8+
) // MetricsCountSizer returns the nunmber of metrics entries.
9+
10+
type MetricsSizer interface {
11+
MetricsSize(md pmetric.Metrics) (count int)
12+
ResourceMetricsSize(rm pmetric.ResourceMetrics) (count int)
13+
ScopeMetricsSize(sm pmetric.ScopeMetrics) (count int)
14+
MetricSize(m pmetric.Metric) int
15+
DeltaSize(newItemSize int) int
16+
NumberDataPointSize(ndp pmetric.NumberDataPoint) int
17+
HistogramDataPointSize(hdp pmetric.HistogramDataPoint) int
18+
ExponentialHistogramDataPointSize(ehdp pmetric.ExponentialHistogramDataPoint) int
19+
SummaryDataPointSize(sdps pmetric.SummaryDataPoint) int
20+
}
21+
22+
type MetricsBytesSizer struct {
23+
pmetric.ProtoMarshaler
24+
}
25+
26+
var _ MetricsSizer = &MetricsBytesSizer{}
27+
28+
// DeltaSize() returns the delta size of a proto slice when a new item is added.
29+
// Example:
30+
//
31+
// prevSize := proto1.Size()
32+
// proto1.RepeatedField().AppendEmpty() = proto2
33+
//
34+
// Then currSize of proto1 can be calculated as
35+
//
36+
// currSize := (prevSize + sizer.DeltaSize(proto2.Size()))
37+
//
38+
// This is derived from opentelemetry-collector/pdata/internal/data/protogen/metrics/v1/metrics.pb.go
39+
// which is generated with gogo/protobuf.
40+
func (s *MetricsBytesSizer) DeltaSize(newItemSize int) int {
41+
return 1 + newItemSize + sov(uint64(newItemSize)) //nolint:gosec // disable G115
42+
}
43+
44+
type MetricsCountSizer struct{}
45+
46+
var _ MetricsSizer = &MetricsCountSizer{}
47+
48+
func (s *MetricsCountSizer) MetricsSize(md pmetric.Metrics) int {
49+
return md.DataPointCount()
50+
}
51+
52+
func (s *MetricsCountSizer) ResourceMetricsSize(rm pmetric.ResourceMetrics) (count int) {
53+
for i := 0; i < rm.ScopeMetrics().Len(); i++ {
54+
count += s.ScopeMetricsSize(rm.ScopeMetrics().At(i))
55+
}
56+
return count
57+
}
58+
59+
func (s *MetricsCountSizer) ScopeMetricsSize(sm pmetric.ScopeMetrics) (count int) {
60+
for i := 0; i < sm.Metrics().Len(); i++ {
61+
count += s.MetricSize(sm.Metrics().At(i))
62+
}
63+
return count
64+
}
65+
66+
func (s *MetricsCountSizer) MetricSize(m pmetric.Metric) int {
67+
switch m.Type() {
68+
case pmetric.MetricTypeGauge:
69+
return m.Gauge().DataPoints().Len()
70+
case pmetric.MetricTypeSum:
71+
return m.Sum().DataPoints().Len()
72+
case pmetric.MetricTypeHistogram:
73+
return m.Histogram().DataPoints().Len()
74+
case pmetric.MetricTypeExponentialHistogram:
75+
return m.ExponentialHistogram().DataPoints().Len()
76+
case pmetric.MetricTypeSummary:
77+
return m.Summary().DataPoints().Len()
78+
}
79+
return 0
80+
}
81+
82+
func (s *MetricsCountSizer) DeltaSize(newItemSize int) int {
83+
return newItemSize
84+
}
85+
86+
func (s *MetricsCountSizer) NumberDataPointSize(_ pmetric.NumberDataPoint) int {
87+
return 1
88+
}
89+
90+
func (s *MetricsCountSizer) HistogramDataPointSize(_ pmetric.HistogramDataPoint) int {
91+
return 1
92+
}
93+
94+
func (s *MetricsCountSizer) ExponentialHistogramDataPointSize(_ pmetric.ExponentialHistogramDataPoint) int {
95+
return 1
96+
}
97+
98+
func (s *MetricsCountSizer) SummaryDataPointSize(_ pmetric.SummaryDataPoint) int {
99+
return 1
100+
}
101+
102+
// func (s *LogsCountSizer) LogsSize(ld plog.Logs) int {
103+
// return ld.LogRecordCount()
104+
// }
105+
106+
// func (s *LogsCountSizer) ResourceLogsSize(rl plog.ResourceLogs) int {
107+
// count := 0
108+
// for k := 0; k < rl.ScopeLogs().Len(); k++ {
109+
// count += rl.ScopeLogs().At(k).LogRecords().Len()
110+
// }
111+
// return count
112+
// }
113+
114+
// func (s *LogsCountSizer) ScopeLogsSize(sl plog.ScopeLogs) int {
115+
// return sl.LogRecords().Len()
116+
// }
117+
118+
// func (s *LogsCountSizer) LogRecordSize(_ plog.LogRecord) int {
119+
// return 1
120+
// }
121+
122+
// func (s *LogsCountSizer) DeltaSize(newItemSize int) int {
123+
// return newItemSize
124+
// }
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
4+
5+
import (
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"go.opentelemetry.io/collector/pdata/testdata"
11+
)
12+
13+
func TestMetricsCountSizer(t *testing.T) {
14+
md := testdata.GenerateMetrics(7)
15+
sizer := MetricsCountSizer{}
16+
require.Equal(t, 14, sizer.MetricsSize(md))
17+
18+
rm := md.ResourceMetrics().At(0)
19+
require.Equal(t, 14, sizer.ResourceMetricsSize(rm))
20+
21+
sm := rm.ScopeMetrics().At(0)
22+
require.Equal(t, 14, sizer.ScopeMetricsSize(sm))
23+
24+
// Test different metric types
25+
require.Equal(t, 2, sizer.MetricSize(sm.Metrics().At(0)))
26+
27+
// Test data point sizes
28+
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(0).Gauge().DataPoints().At(0)))
29+
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(1).Gauge().DataPoints().At(0)))
30+
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(2).Sum().DataPoints().At(0)))
31+
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(3).Sum().DataPoints().At(0)))
32+
require.Equal(t, 1, sizer.HistogramDataPointSize(sm.Metrics().At(4).Histogram().DataPoints().At(0)))
33+
require.Equal(t, 1, sizer.ExponentialHistogramDataPointSize(sm.Metrics().At(5).ExponentialHistogram().DataPoints().At(0)))
34+
require.Equal(t, 1, sizer.SummaryDataPointSize(sm.Metrics().At(6).Summary().DataPoints().At(0)))
35+
36+
prevSize := sizer.ScopeMetricsSize(sm)
37+
sm.Metrics().At(0).CopyTo(sm.Metrics().AppendEmpty())
38+
require.Equal(t, sizer.ScopeMetricsSize(sm), prevSize+sizer.DeltaSize(sizer.MetricSize(sm.Metrics().At(0))))
39+
}
40+
41+
func TestMetricsBytesSizer(t *testing.T) {
42+
md := testdata.GenerateMetrics(7)
43+
sizer := MetricsBytesSizer{}
44+
require.Equal(t, 1594, sizer.MetricsSize(md))
45+
46+
rm := md.ResourceMetrics().At(0)
47+
require.Equal(t, 1591, sizer.ResourceMetricsSize(rm))
48+
49+
sm := rm.ScopeMetrics().At(0)
50+
require.Equal(t, 1546, sizer.ScopeMetricsSize(sm))
51+
52+
// Test different metric types
53+
require.Equal(t, 130, sizer.MetricSize(sm.Metrics().At(0)))
54+
55+
// Test data point sizes
56+
require.Equal(t, 55, sizer.NumberDataPointSize(sm.Metrics().At(0).Gauge().DataPoints().At(0)))
57+
require.Equal(t, 83, sizer.NumberDataPointSize(sm.Metrics().At(1).Gauge().DataPoints().At(0)))
58+
require.Equal(t, 55, sizer.NumberDataPointSize(sm.Metrics().At(2).Sum().DataPoints().At(0)))
59+
require.Equal(t, 83, sizer.NumberDataPointSize(sm.Metrics().At(3).Sum().DataPoints().At(0)))
60+
require.Equal(t, 92, sizer.HistogramDataPointSize(sm.Metrics().At(4).Histogram().DataPoints().At(0)))
61+
require.Equal(t, 119, sizer.ExponentialHistogramDataPointSize(sm.Metrics().At(5).ExponentialHistogram().DataPoints().At(0)))
62+
require.Equal(t, 92, sizer.SummaryDataPointSize(sm.Metrics().At(6).Summary().DataPoints().At(0)))
63+
64+
prevSize := sizer.ScopeMetricsSize(sm)
65+
sm.Metrics().At(0).CopyTo(sm.Metrics().AppendEmpty())
66+
require.Equal(t, sizer.ScopeMetricsSize(sm), prevSize+sizer.DeltaSize(sizer.MetricSize(sm.Metrics().At(0))))
67+
}
68+
69+
func TestMetricsBytesDeltaSize(t *testing.T) {
70+
sizer := MetricsBytesSizer{}
71+
require.Equal(t, 129, sizer.DeltaSize(127))
72+
require.Equal(t, 131, sizer.DeltaSize(128))
73+
require.Equal(t, 242, sizer.DeltaSize(239))
74+
}

exporter/exporterhelper/metrics.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/consumer/consumererror"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1718
"go.opentelemetry.io/collector/pdata/pmetric"
1819
"go.opentelemetry.io/collector/pipeline"
1920
)
@@ -24,16 +25,16 @@ var (
2425
)
2526

2627
type metricsRequest struct {
27-
md pmetric.Metrics
28-
pusher consumer.ConsumeMetricsFunc
29-
cachedItemsCount int
28+
md pmetric.Metrics
29+
pusher consumer.ConsumeMetricsFunc
30+
cachedSize int
3031
}
3132

3233
func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) Request {
3334
return &metricsRequest{
34-
md: md,
35-
pusher: pusher,
36-
cachedItemsCount: md.DataPointCount(),
35+
md: md,
36+
pusher: pusher,
37+
cachedSize: -1,
3738
}
3839
}
3940

@@ -66,11 +67,18 @@ func (req *metricsRequest) Export(ctx context.Context) error {
6667
}
6768

6869
func (req *metricsRequest) ItemsCount() int {
69-
return req.cachedItemsCount
70+
return req.md.DataPointCount()
7071
}
7172

72-
func (req *metricsRequest) setCachedItemsCount(count int) {
73-
req.cachedItemsCount = count
73+
func (req *metricsRequest) Size(sizer sizer.MetricsSizer) int {
74+
if req.cachedSize == -1 {
75+
req.cachedSize = sizer.MetricsSize(req.md)
76+
}
77+
return req.cachedSize
78+
}
79+
80+
func (req *metricsRequest) setCachedSize(count int) {
81+
req.cachedSize = count
7482
}
7583

7684
type metricsExporter struct {

0 commit comments

Comments
 (0)