Skip to content

Commit aaf9cb1

Browse files
committed
Bytes based batching for metrics
Signed-off-by: Israel Blancas <[email protected]>
1 parent 7d3e03e commit aaf9cb1

File tree

11 files changed

+864
-134
lines changed

11 files changed

+864
-134
lines changed

exporter/exporterhelper/internal/sizer/logs_sizer.go

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
55

66
import (
7-
math_bits "math/bits"
8-
97
"go.opentelemetry.io/collector/pdata/plog"
108
)
119

@@ -22,22 +20,7 @@ type LogsSizer interface {
2220
// LogsBytesSizer returns the byte size of serialized protos.
2321
type LogsBytesSizer struct {
2422
plog.ProtoMarshaler
25-
}
26-
27-
// DeltaSize returns the delta size of a proto slice when a new item is added.
28-
// Example:
29-
//
30-
// prevSize := proto1.Size()
31-
// proto1.RepeatedField().AppendEmpty() = proto2
32-
//
33-
// Then currSize of proto1 can be calculated as
34-
//
35-
// currSize := (prevSize + sizer.DeltaSize(proto2.Size()))
36-
//
37-
// This is derived from opentelemetry-collector/pdata/internal/data/protogen/logs/v1/logs.pb.go
38-
// which is generated with gogo/protobuf.
39-
func (s *LogsBytesSizer) DeltaSize(newItemSize int) int {
40-
return 1 + newItemSize + sov(uint64(newItemSize)) //nolint:gosec // disable G115
23+
protoDeltaSizer
4124
}
4225

4326
// LogsCountSizer returns the nunmber of logs entries.
@@ -66,7 +49,3 @@ func (s *LogsCountSizer) LogRecordSize(_ plog.LogRecord) int {
6649
func (s *LogsCountSizer) DeltaSize(newItemSize int) int {
6750
return newItemSize
6851
}
69-
70-
func sov(x uint64) int {
71-
return (math_bits.Len64(x|1) + 6) / 7
72-
}

exporter/exporterhelper/internal/sizer/logs_sizer_test.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,3 @@ func TestLogsBytesSizer(t *testing.T) {
5555
lr.CopyTo(sl.LogRecords().AppendEmpty())
5656
require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr)))
5757
}
58-
59-
func TestLogsBytesDeltaSize(t *testing.T) {
60-
sizer := LogsBytesSizer{}
61-
require.Equal(t, 129, sizer.DeltaSize(127))
62-
require.Equal(t, 131, sizer.DeltaSize(128))
63-
require.Equal(t, 242, sizer.DeltaSize(239))
64-
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
5+
6+
import (
7+
"go.opentelemetry.io/collector/pdata/pmetric"
8+
) // MetricsCountSizer returns the nunmber of metrics entries.
9+
10+
type MetricsSizer interface {
11+
MetricsSize(md pmetric.Metrics) (count int)
12+
ResourceMetricsSize(rm pmetric.ResourceMetrics) (count int)
13+
ScopeMetricsSize(sm pmetric.ScopeMetrics) (count int)
14+
MetricSize(m pmetric.Metric) int
15+
DeltaSize(newItemSize int) int
16+
NumberDataPointSize(ndp pmetric.NumberDataPoint) int
17+
HistogramDataPointSize(hdp pmetric.HistogramDataPoint) int
18+
ExponentialHistogramDataPointSize(ehdp pmetric.ExponentialHistogramDataPoint) int
19+
SummaryDataPointSize(sdps pmetric.SummaryDataPoint) int
20+
}
21+
22+
type MetricsBytesSizer struct {
23+
pmetric.ProtoMarshaler
24+
protoDeltaSizer
25+
}
26+
27+
var _ MetricsSizer = &MetricsBytesSizer{}
28+
29+
type MetricsCountSizer struct{}
30+
31+
var _ MetricsSizer = &MetricsCountSizer{}
32+
33+
func (s *MetricsCountSizer) MetricsSize(md pmetric.Metrics) int {
34+
return md.DataPointCount()
35+
}
36+
37+
func (s *MetricsCountSizer) ResourceMetricsSize(rm pmetric.ResourceMetrics) (count int) {
38+
for i := 0; i < rm.ScopeMetrics().Len(); i++ {
39+
count += s.ScopeMetricsSize(rm.ScopeMetrics().At(i))
40+
}
41+
return count
42+
}
43+
44+
func (s *MetricsCountSizer) ScopeMetricsSize(sm pmetric.ScopeMetrics) (count int) {
45+
for i := 0; i < sm.Metrics().Len(); i++ {
46+
count += s.MetricSize(sm.Metrics().At(i))
47+
}
48+
return count
49+
}
50+
51+
func (s *MetricsCountSizer) MetricSize(m pmetric.Metric) int {
52+
switch m.Type() {
53+
case pmetric.MetricTypeGauge:
54+
return m.Gauge().DataPoints().Len()
55+
case pmetric.MetricTypeSum:
56+
return m.Sum().DataPoints().Len()
57+
case pmetric.MetricTypeHistogram:
58+
return m.Histogram().DataPoints().Len()
59+
case pmetric.MetricTypeExponentialHistogram:
60+
return m.ExponentialHistogram().DataPoints().Len()
61+
case pmetric.MetricTypeSummary:
62+
return m.Summary().DataPoints().Len()
63+
}
64+
return 0
65+
}
66+
67+
func (s *MetricsCountSizer) DeltaSize(newItemSize int) int {
68+
return newItemSize
69+
}
70+
71+
func (s *MetricsCountSizer) NumberDataPointSize(_ pmetric.NumberDataPoint) int {
72+
return 1
73+
}
74+
75+
func (s *MetricsCountSizer) HistogramDataPointSize(_ pmetric.HistogramDataPoint) int {
76+
return 1
77+
}
78+
79+
func (s *MetricsCountSizer) ExponentialHistogramDataPointSize(_ pmetric.ExponentialHistogramDataPoint) int {
80+
return 1
81+
}
82+
83+
func (s *MetricsCountSizer) SummaryDataPointSize(_ pmetric.SummaryDataPoint) int {
84+
return 1
85+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
4+
5+
import (
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"go.opentelemetry.io/collector/pdata/testdata"
11+
)
12+
13+
func TestMetricsCountSizer(t *testing.T) {
14+
md := testdata.GenerateMetrics(7)
15+
sizer := MetricsCountSizer{}
16+
require.Equal(t, 14, sizer.MetricsSize(md))
17+
18+
rm := md.ResourceMetrics().At(0)
19+
require.Equal(t, 14, sizer.ResourceMetricsSize(rm))
20+
21+
sm := rm.ScopeMetrics().At(0)
22+
require.Equal(t, 14, sizer.ScopeMetricsSize(sm))
23+
24+
// Test different metric types
25+
require.Equal(t, 2, sizer.MetricSize(sm.Metrics().At(0)))
26+
27+
// Test data point sizes
28+
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(0).Gauge().DataPoints().At(0)))
29+
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(1).Gauge().DataPoints().At(0)))
30+
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(2).Sum().DataPoints().At(0)))
31+
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(3).Sum().DataPoints().At(0)))
32+
require.Equal(t, 1, sizer.HistogramDataPointSize(sm.Metrics().At(4).Histogram().DataPoints().At(0)))
33+
require.Equal(t, 1, sizer.ExponentialHistogramDataPointSize(sm.Metrics().At(5).ExponentialHistogram().DataPoints().At(0)))
34+
require.Equal(t, 1, sizer.SummaryDataPointSize(sm.Metrics().At(6).Summary().DataPoints().At(0)))
35+
36+
prevSize := sizer.ScopeMetricsSize(sm)
37+
sm.Metrics().At(0).CopyTo(sm.Metrics().AppendEmpty())
38+
require.Equal(t, sizer.ScopeMetricsSize(sm), prevSize+sizer.DeltaSize(sizer.MetricSize(sm.Metrics().At(0))))
39+
}
40+
41+
func TestMetricsBytesSizer(t *testing.T) {
42+
md := testdata.GenerateMetrics(7)
43+
sizer := MetricsBytesSizer{}
44+
require.Equal(t, 1594, sizer.MetricsSize(md))
45+
46+
rm := md.ResourceMetrics().At(0)
47+
require.Equal(t, 1591, sizer.ResourceMetricsSize(rm))
48+
49+
sm := rm.ScopeMetrics().At(0)
50+
require.Equal(t, 1546, sizer.ScopeMetricsSize(sm))
51+
52+
// Test different metric types
53+
require.Equal(t, 130, sizer.MetricSize(sm.Metrics().At(0)))
54+
55+
// Test data point sizes
56+
require.Equal(t, 55, sizer.NumberDataPointSize(sm.Metrics().At(0).Gauge().DataPoints().At(0)))
57+
require.Equal(t, 83, sizer.NumberDataPointSize(sm.Metrics().At(1).Gauge().DataPoints().At(0)))
58+
require.Equal(t, 55, sizer.NumberDataPointSize(sm.Metrics().At(2).Sum().DataPoints().At(0)))
59+
require.Equal(t, 83, sizer.NumberDataPointSize(sm.Metrics().At(3).Sum().DataPoints().At(0)))
60+
require.Equal(t, 92, sizer.HistogramDataPointSize(sm.Metrics().At(4).Histogram().DataPoints().At(0)))
61+
require.Equal(t, 119, sizer.ExponentialHistogramDataPointSize(sm.Metrics().At(5).ExponentialHistogram().DataPoints().At(0)))
62+
require.Equal(t, 92, sizer.SummaryDataPointSize(sm.Metrics().At(6).Summary().DataPoints().At(0)))
63+
64+
prevSize := sizer.ScopeMetricsSize(sm)
65+
sm.Metrics().At(0).CopyTo(sm.Metrics().AppendEmpty())
66+
require.Equal(t, sizer.ScopeMetricsSize(sm), prevSize+sizer.DeltaSize(sizer.MetricSize(sm.Metrics().At(0))))
67+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
5+
6+
import (
7+
math_bits "math/bits"
8+
)
9+
10+
type protoDeltaSizer struct{}
11+
12+
// DeltaSize() returns the delta size of a proto slice when a new item is added.
13+
// Example:
14+
//
15+
// prevSize := proto1.Size()
16+
// proto1.RepeatedField().AppendEmpty() = proto2
17+
//
18+
// Then currSize of proto1 can be calculated as
19+
//
20+
// currSize := (prevSize + sizer.DeltaSize(proto2.Size()))
21+
//
22+
// This is derived from:
23+
// - opentelemetry-collector/pdata/internal/data/protogen/metrics/v1/metrics.pb.go
24+
// - opentelemetry-collector/pdata/internal/data/protogen/logs/v1/logs.pb.go
25+
// - opentelemetry-collector/pdata/internal/data/protogen/traces/v1/traces.pb.go
26+
// - opentelemetry-collector/pdata/internal/data/protogen/profiles/v1development/profiles.pb.go
27+
// which is generated with gogo/protobuf.
28+
func (s *protoDeltaSizer) DeltaSize(newItemSize int) int {
29+
return 1 + newItemSize + sov(uint64(newItemSize)) //nolint:gosec // disable G115
30+
}
31+
32+
func sov(x uint64) int {
33+
return (math_bits.Len64(x|1) + 6) / 7
34+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sizer
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestMetricsBytesDeltaSize(t *testing.T) {
13+
sizer := protoDeltaSizer{}
14+
require.Equal(t, 129, sizer.DeltaSize(127))
15+
require.Equal(t, 131, sizer.DeltaSize(128))
16+
require.Equal(t, 242, sizer.DeltaSize(239))
17+
}

exporter/exporterhelper/logs_batch_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,3 +371,20 @@ func BenchmarkSplittingBasedOnByteSizeHugeLogs(b *testing.B) {
371371
assert.Len(b, merged, 10)
372372
}
373373
}
374+
375+
func TestLogsRequest_MergeSplit_UnknownSizerType(t *testing.T) {
376+
// Create a logs request
377+
req := newLogsRequest(plog.NewLogs(), nil)
378+
379+
// Create config with invalid sizer type by using zero value
380+
cfg := exporterbatcher.SizeConfig{
381+
Sizer: exporterbatcher.SizerType{}, // Empty struct will have empty string as val
382+
}
383+
384+
// Call MergeSplit with invalid sizer
385+
result, err := req.MergeSplit(context.Background(), cfg, nil)
386+
387+
// Verify results
388+
assert.Nil(t, result)
389+
assert.EqualError(t, err, "unknown sizer type")
390+
}

exporter/exporterhelper/metrics.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/consumer/consumererror"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1718
"go.opentelemetry.io/collector/pdata/pmetric"
1819
"go.opentelemetry.io/collector/pipeline"
1920
)
@@ -24,16 +25,16 @@ var (
2425
)
2526

2627
type metricsRequest struct {
27-
md pmetric.Metrics
28-
pusher consumer.ConsumeMetricsFunc
29-
cachedItemsCount int
28+
md pmetric.Metrics
29+
pusher consumer.ConsumeMetricsFunc
30+
cachedSize int
3031
}
3132

3233
func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) Request {
3334
return &metricsRequest{
34-
md: md,
35-
pusher: pusher,
36-
cachedItemsCount: md.DataPointCount(),
35+
md: md,
36+
pusher: pusher,
37+
cachedSize: -1,
3738
}
3839
}
3940

@@ -66,11 +67,18 @@ func (req *metricsRequest) Export(ctx context.Context) error {
6667
}
6768

6869
func (req *metricsRequest) ItemsCount() int {
69-
return req.cachedItemsCount
70+
return req.md.DataPointCount()
7071
}
7172

72-
func (req *metricsRequest) setCachedItemsCount(count int) {
73-
req.cachedItemsCount = count
73+
func (req *metricsRequest) size(sizer sizer.MetricsSizer) int {
74+
if req.cachedSize == -1 {
75+
req.cachedSize = sizer.MetricsSize(req.md)
76+
}
77+
return req.cachedSize
78+
}
79+
80+
func (req *metricsRequest) setCachedSize(count int) {
81+
req.cachedSize = count
7482
}
7583

7684
type metricsExporter struct {

0 commit comments

Comments
 (0)