Skip to content

Commit 8041156

Browse files
authored
Cleanup interaction of exemplar and aggregation (#5899)
Follow-up to #5861. This is an attempt to: * Limit the API surface of the aggregate package * Try to use predefined types (e.g. exemplar.Filter) over custom functions where possible. * Avoid using nil, and use No-Ops where it makes sense This makes `aggregate.NewFilteredExemplarReservoir` no longer exported, removes the `aggregate.FilteredExemplarReservoir` interface, and removes the `aggregate.dropReservoir`.
1 parent bf6a7e1 commit 8041156

16 files changed

+165
-169
lines changed

sdk/metric/exemplar.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,14 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
66
import (
77
"runtime"
88

9-
"go.opentelemetry.io/otel/attribute"
109
"go.opentelemetry.io/otel/sdk/metric/exemplar"
11-
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
1210
)
1311

1412
// ExemplarReservoirProviderSelector selects the
1513
// [exemplar.ReservoirProvider] to use
1614
// based on the [Aggregation] of the metric.
1715
type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider
1816

19-
// reservoirFunc returns the appropriately configured exemplar reservoir
20-
// creation func based on the passed InstrumentKind and filter configuration.
21-
func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
22-
return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
23-
return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
24-
}
25-
}
26-
2717
// DefaultExemplarReservoirProviderSelector returns the default
2818
// [exemplar.ReservoirProvider] for the
2919
// provided [Aggregation].

sdk/metric/instrument_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99

1010
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/sdk/metric/exemplar"
1112
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
1213
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1314
)
@@ -22,7 +23,7 @@ func BenchmarkInstrument(b *testing.B) {
2223
}
2324

2425
b.Run("instrumentImpl/aggregate", func(b *testing.B) {
25-
build := aggregate.Builder[int64]{}
26+
build := aggregate.Builder[int64]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: exemplar.FixedSizeReservoirProvider(0)}
2627
var meas []aggregate.Measure[int64]
2728

2829
build.Temporality = metricdata.CumulativeTemporality
@@ -52,7 +53,7 @@ func BenchmarkInstrument(b *testing.B) {
5253
})
5354

5455
b.Run("observable/observe", func(b *testing.B) {
55-
build := aggregate.Builder[int64]{}
56+
build := aggregate.Builder[int64]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: exemplar.FixedSizeReservoirProvider(0)}
5657
var meas []aggregate.Measure[int64]
5758

5859
in, _ := build.PrecomputedLastValue()

sdk/metric/internal/aggregate/aggregate.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/sdk/metric/exemplar"
1112
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1213
)
1314

@@ -33,12 +34,12 @@ type Builder[N int64 | float64] struct {
3334
// Filter is the attribute filter the aggregate function will use on the
3435
// input of measurements.
3536
Filter attribute.Filter
36-
// ReservoirFunc is the factory function used by aggregate functions to
37-
// create new exemplar reservoirs for a new seen attribute set.
38-
//
39-
// If this is not provided a default factory function that returns an
40-
// dropReservoir reservoir will be used.
41-
ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
37+
// ExemplarFilter is the filter applied to measurements before offering
38+
// them to the exemplar Reservoir.
39+
ExemplarFilter exemplar.Filter
40+
// ExemplarReservoirProvider is the factory function used to create a new
41+
// exemplar Reservoir for a given attribute set.
42+
ExemplarReservoirProvider exemplar.ReservoirProvider
4243
// AggregationLimit is the cardinality limit of measurement attributes. Any
4344
// measurement for new attributes once the limit has been reached will be
4445
// aggregated into a single aggregate for the "otel.metric.overflow"
@@ -49,12 +50,10 @@ type Builder[N int64 | float64] struct {
4950
AggregationLimit int
5051
}
5152

52-
func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
53-
if b.ReservoirFunc != nil {
54-
return b.ReservoirFunc
53+
func (b Builder[N]) resFunc() func(attribute.Set) *filteredExemplarReservoir[N] {
54+
return func(attrs attribute.Set) *filteredExemplarReservoir[N] {
55+
return newFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs))
5556
}
56-
57-
return dropReservoir
5857
}
5958

6059
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)

sdk/metric/internal/aggregate/aggregate_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/stretchr/testify/assert"
1414

1515
"go.opentelemetry.io/otel/attribute"
16+
"go.opentelemetry.io/otel/sdk/metric/exemplar"
1617
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1718
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
1819
)
@@ -72,8 +73,12 @@ func (c *clock) Register() (unregister func()) {
7273
return func() { now = orig }
7374
}
7475

75-
func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] {
76-
return dropReservoir[N](attr)
76+
func newNoopReservoir(attribute.Set) exemplar.Reservoir {
77+
return exemplar.NewFixedSizeReservoir(0)
78+
}
79+
80+
func dropExemplars[N int64 | float64](attr attribute.Set) *filteredExemplarReservoir[N] {
81+
return newFilteredExemplarReservoir[N](exemplar.AlwaysOffFilter, newNoopReservoir(attr))
7782
}
7883

7984
func TestBuilderFilter(t *testing.T) {
@@ -99,8 +104,8 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) {
99104
}
100105
}
101106

102-
t.Run("NoFilter", run(Builder[N]{}, attr, nil))
103-
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue}))
107+
t.Run("NoFilter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: newNoopReservoir}, attr, nil))
108+
t.Run("Filter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: newNoopReservoir, Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue}))
104109
}
105110
}
106111

sdk/metric/internal/aggregate/drop.go

Lines changed: 0 additions & 26 deletions
This file was deleted.

sdk/metric/internal/aggregate/drop_test.go

Lines changed: 0 additions & 27 deletions
This file was deleted.

sdk/metric/internal/aggregate/exponential_histogram.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const (
3030
// expoHistogramDataPoint is a single data point in an exponential histogram.
3131
type expoHistogramDataPoint[N int64 | float64] struct {
3232
attrs attribute.Set
33-
res FilteredExemplarReservoir[N]
33+
res *filteredExemplarReservoir[N]
3434

3535
count uint64
3636
min N
@@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
283283
// newExponentialHistogram returns an Aggregator that summarizes a set of
284284
// measurements as an exponential histogram. Each histogram is scoped by attributes
285285
// and the aggregation cycle the measurements were made in.
286-
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
286+
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *expoHistogram[N] {
287287
return &expoHistogram[N]{
288288
noSum: noSum,
289289
noMinMax: noMinMax,
@@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
306306
maxSize int
307307
maxScale int32
308308

309-
newRes func(attribute.Set) FilteredExemplarReservoir[N]
309+
newRes func(attribute.Set) *filteredExemplarReservoir[N]
310310
limit limiter[*expoHistogramDataPoint[N]]
311311
values map[attribute.Distinct]*expoHistogramDataPoint[N]
312312
valuesMu sync.Mutex

sdk/metric/internal/aggregate/exponential_histogram_test.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/stretchr/testify/require"
1414

1515
"go.opentelemetry.io/otel/internal/global"
16+
"go.opentelemetry.io/otel/sdk/metric/exemplar"
1617
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1718
)
1819

@@ -682,22 +683,30 @@ func BenchmarkExponentialHistogram(b *testing.B) {
682683

683684
b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
684685
return Builder[int64]{
685-
Temporality: metricdata.CumulativeTemporality,
686+
Temporality: metricdata.CumulativeTemporality,
687+
ExemplarFilter: exemplar.AlwaysOffFilter,
688+
ExemplarReservoirProvider: newNoopReservoir,
686689
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
687690
}))
688691
b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
689692
return Builder[int64]{
690-
Temporality: metricdata.DeltaTemporality,
693+
Temporality: metricdata.DeltaTemporality,
694+
ExemplarFilter: exemplar.AlwaysOffFilter,
695+
ExemplarReservoirProvider: newNoopReservoir,
691696
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
692697
}))
693698
b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
694699
return Builder[float64]{
695-
Temporality: metricdata.CumulativeTemporality,
700+
Temporality: metricdata.CumulativeTemporality,
701+
ExemplarFilter: exemplar.AlwaysOffFilter,
702+
ExemplarReservoirProvider: newNoopReservoir,
696703
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
697704
}))
698705
b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
699706
return Builder[float64]{
700-
Temporality: metricdata.DeltaTemporality,
707+
Temporality: metricdata.DeltaTemporality,
708+
ExemplarFilter: exemplar.AlwaysOffFilter,
709+
ExemplarReservoirProvider: newNoopReservoir,
701710
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
702711
}))
703712
}
@@ -744,9 +753,11 @@ func TestExponentialHistogramAggregation(t *testing.T) {
744753

745754
func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
746755
in, out := Builder[N]{
747-
Temporality: metricdata.DeltaTemporality,
748-
Filter: attrFltr,
749-
AggregationLimit: 2,
756+
Temporality: metricdata.DeltaTemporality,
757+
Filter: attrFltr,
758+
AggregationLimit: 2,
759+
ExemplarFilter: exemplar.AlwaysOffFilter,
760+
ExemplarReservoirProvider: newNoopReservoir,
750761
}.ExponentialBucketHistogram(4, 20, false, false)
751762
ctx := context.Background()
752763
return test[N](in, out, []teststep[N]{
@@ -871,9 +882,11 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
871882

872883
func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
873884
in, out := Builder[N]{
874-
Temporality: metricdata.CumulativeTemporality,
875-
Filter: attrFltr,
876-
AggregationLimit: 2,
885+
Temporality: metricdata.CumulativeTemporality,
886+
Filter: attrFltr,
887+
AggregationLimit: 2,
888+
ExemplarFilter: exemplar.AlwaysOffFilter,
889+
ExemplarReservoirProvider: newNoopReservoir,
877890
}.ExponentialBucketHistogram(4, 20, false, false)
878891
ctx := context.Background()
879892
return test[N](in, out, []teststep[N]{

sdk/metric/internal/aggregate/filtered_reservoir.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,29 +11,16 @@ import (
1111
"go.opentelemetry.io/otel/sdk/metric/exemplar"
1212
)
1313

14-
// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
15-
type FilteredExemplarReservoir[N int64 | float64] interface {
16-
// Offer accepts the parameters associated with a measurement. The
17-
// parameters will be stored as an exemplar if the filter decides to
18-
// sample the measurement.
19-
//
20-
// The passed ctx needs to contain any baggage or span that were active
21-
// when the measurement was made. This information may be used by the
22-
// Reservoir in making a sampling decision.
23-
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
24-
// Collect returns all the held exemplars in the reservoir.
25-
Collect(dest *[]exemplar.Exemplar)
26-
}
27-
2814
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
2915
type filteredExemplarReservoir[N int64 | float64] struct {
3016
filter exemplar.Filter
3117
reservoir exemplar.Reservoir
3218
}
3319

34-
// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
35-
// that are allowed by the filter.
36-
func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] {
20+
// newFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which
21+
// only offers values that are allowed by the filter. If the provided filter is
22+
// nil, all measurements are dropped..
23+
func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) *filteredExemplarReservoir[N] {
3724
return &filteredExemplarReservoir[N]{
3825
filter: f,
3926
reservoir: r,

sdk/metric/internal/aggregate/histogram.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616

1717
type buckets[N int64 | float64] struct {
1818
attrs attribute.Set
19-
res FilteredExemplarReservoir[N]
19+
res *filteredExemplarReservoir[N]
2020

2121
counts []uint64
2222
count uint64
@@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct {
4747
noSum bool
4848
bounds []float64
4949

50-
newRes func(attribute.Set) FilteredExemplarReservoir[N]
50+
newRes func(attribute.Set) *filteredExemplarReservoir[N]
5151
limit limiter[*buckets[N]]
5252
values map[attribute.Distinct]*buckets[N]
5353
valuesMu sync.Mutex
5454
}
5555

56-
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
56+
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *histValues[N] {
5757
// The responsibility of keeping all buckets correctly associated with the
5858
// passed boundaries is ultimately this type's responsibility. Make a copy
5959
// here so we can always guarantee this. Or, in the case of failure, have
@@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
108108

109109
// newHistogram returns an Aggregator that summarizes a set of measurements as
110110
// an histogram.
111-
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
111+
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *histogram[N] {
112112
return &histogram[N]{
113113
histValues: newHistValues[N](boundaries, noSum, limit, r),
114114
noMinMax: noMinMax,

0 commit comments

Comments
 (0)