Skip to content

Commit 3d5bb5f

Browse files
authored
[processor/interval] Support Gauges and Summaries (#34805)
**Description:** Adds support for Gauges and Summaries **Link to tracking Issue:** #34803 **Testing:** Unit tests were extended to cover the new behavior **Documentation:** <Describe the documentation added.> --------- Signed-off-by: Arthur Silva Sens <[email protected]>
1 parent 38c4921 commit 3d5bb5f

File tree

19 files changed

+258
-31
lines changed

19 files changed

+258
-31
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: processor/interval
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Support for gauge and summary metrics.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34803]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Only the last value of a gauge or summary metric is reported in the interval processor, instead of all values.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

processor/intervalprocessor/README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,23 @@ The interval processor (`intervalprocessor`) aggregates metrics and periodically
1919
* Monotonically increasing, cumulative sums
2020
* Monotonically increasing, cumulative histograms
2121
* Monotonically increasing, cumulative exponential histograms
22+
* Gauges
23+
* Summaries
2224

2325
The following metric types will *not* be aggregated, and will instead be passed, unchanged, to the next component in the pipeline:
2426

2527
* All delta metrics
2628
* Non-monotonically increasing sums
27-
* Gauges
28-
* Summaries
29+
30+
> NOTE: Aggregating data over an interval is an inherently "lossy" process. For monotonically increasing, cumulative sums, histograms, and exponential histograms, you "lose" precision, but you don't lose overall data. But for non-monotonically increasing sums, gauges, and summaries, aggregation represents actual data loss. IE you could "lose" that a value increased and then decreased back to the original value. In most cases, this data "loss" is ok. However, if you would rather these values be passed through, and *not* aggregated, you can set that in the configuration
2931
3032
## Configuration
3133

3234
The following settings can be optionally configured:
3335

3436
* `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s
37+
* `gauge_pass_through`: Whether gauges should pass through as they are to the next component or be aggregated. Default: false
38+
* `summary_pass_through`: Whether summaries should pass through as they are to the next component or be aggregated. Default: false
3539

3640
## Example of metric flows
3741

processor/intervalprocessor/config.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,14 @@ var _ component.Config = (*Config)(nil)
1818

1919
// Config defines the configuration for the processor.
2020
type Config struct {
21-
// Interval is the time
21+
// Interval is the time interval at which the processor will aggregate metrics.
2222
Interval time.Duration `mapstructure:"interval"`
23+
// GaugePassThrough is a flag that determines whether gauge metrics should be passed through
24+
// as they are or aggregated.
25+
GaugePassThrough bool `mapstructure:"gauge_pass_through"`
26+
// SummaryPassThrough is a flag that determines whether summary metrics should be passed through
27+
// as they are or aggregated.
28+
SummaryPassThrough bool `mapstructure:"summary_pass_through"`
2329
}
2430

2531
// Validate checks whether the input configuration has all of the required fields for the processor.

processor/intervalprocessor/factory.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ func NewFactory() processor.Factory {
2525

2626
func createDefaultConfig() component.Config {
2727
return &Config{
28-
Interval: 60 * time.Second,
28+
Interval: 60 * time.Second,
29+
GaugePassThrough: false,
30+
SummaryPassThrough: false,
2931
}
3032
}
3133

processor/intervalprocessor/internal/metrics/metrics.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con
55

66
import (
77
"go.opentelemetry.io/collector/pdata/pcommon"
8-
"go.opentelemetry.io/collector/pdata/pmetric"
98
)
109

1110
type DataPointSlice[DP DataPoint[DP]] interface {
@@ -15,8 +14,6 @@ type DataPointSlice[DP DataPoint[DP]] interface {
1514
}
1615

1716
type DataPoint[Self any] interface {
18-
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint
19-
2017
Timestamp() pcommon.Timestamp
2118
Attributes() pcommon.Map
2219
CopyTo(dest Self)

processor/intervalprocessor/processor.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,11 @@ type Processor struct {
3636
numberLookup map[identity.Stream]pmetric.NumberDataPoint
3737
histogramLookup map[identity.Stream]pmetric.HistogramDataPoint
3838
expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint
39+
summaryLookup map[identity.Stream]pmetric.SummaryDataPoint
3940

40-
exportInterval time.Duration
41+
exportInterval time.Duration
42+
gaugePassThrough bool
43+
summaryPassThrough bool
4144

4245
nextConsumer consumer.Metrics
4346
}
@@ -59,8 +62,11 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics
5962
numberLookup: map[identity.Stream]pmetric.NumberDataPoint{},
6063
histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{},
6164
expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{},
65+
summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{},
6266

63-
exportInterval: config.Interval,
67+
exportInterval: config.Interval,
68+
gaugePassThrough: config.GaugePassThrough,
69+
summaryPassThrough: config.SummaryPassThrough,
6470

6571
nextConsumer: nextConsumer,
6672
}
@@ -102,8 +108,22 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
102108
rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool {
103109
sm.Metrics().RemoveIf(func(m pmetric.Metric) bool {
104110
switch m.Type() {
105-
case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary:
106-
return false
111+
case pmetric.MetricTypeSummary:
112+
if p.summaryPassThrough {
113+
return false
114+
}
115+
116+
mClone, metricID := p.getOrCloneMetric(rm, sm, m)
117+
aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup)
118+
return true
119+
case pmetric.MetricTypeGauge:
120+
if p.gaugePassThrough {
121+
return false
122+
}
123+
124+
mClone, metricID := p.getOrCloneMetric(rm, sm, m)
125+
aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.numberLookup)
126+
return true
107127
case pmetric.MetricTypeSum:
108128
// Check if we care about this value
109129
sum := m.Sum()
@@ -202,6 +222,7 @@ func (p *Processor) exportMetrics() {
202222
clear(p.numberLookup)
203223
clear(p.histogramLookup)
204224
clear(p.expHistogramLookup)
225+
clear(p.summaryLookup)
205226

206227
return out
207228
}()

processor/intervalprocessor/processor_test.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,29 @@ import (
2121
func TestAggregation(t *testing.T) {
2222
t.Parallel()
2323

24-
testCases := []string{
25-
"basic_aggregation",
26-
"non_monotonic_sums_are_passed_through",
27-
"summaries_are_passed_through",
28-
"histograms_are_aggregated",
29-
"exp_histograms_are_aggregated",
30-
"all_delta_metrics_are_passed_through",
24+
testCases := []struct {
25+
name string
26+
passThrough bool
27+
}{
28+
{name: "basic_aggregation"},
29+
{name: "histograms_are_aggregated"},
30+
{name: "exp_histograms_are_aggregated"},
31+
{name: "gauges_are_aggregated"},
32+
{name: "summaries_are_aggregated"},
33+
{name: "all_delta_metrics_are_passed_through"}, // Deltas are passed through even when aggregation is enabled
34+
{name: "non_monotonic_sums_are_passed_through"}, // Non-monotonic sums are passed through even when aggregation is enabled
35+
{name: "gauges_are_passed_through", passThrough: true},
36+
{name: "summaries_are_passed_through", passThrough: true},
3137
}
3238

3339
ctx, cancel := context.WithCancel(context.Background())
3440
defer cancel()
3541

36-
config := &Config{Interval: time.Second}
37-
42+
var config *Config
3843
for _, tc := range testCases {
39-
testName := tc
40-
41-
t.Run(testName, func(t *testing.T) {
42-
t.Parallel()
44+
config = &Config{Interval: time.Second, GaugePassThrough: tc.passThrough, SummaryPassThrough: tc.passThrough}
4345

46+
t.Run(tc.name, func(t *testing.T) {
4447
// next stores the results of the filter metric processor
4548
next := &consumertest.MetricsSink{}
4649

@@ -53,7 +56,7 @@ func TestAggregation(t *testing.T) {
5356
)
5457
require.NoError(t, err)
5558

56-
dir := filepath.Join("testdata", testName)
59+
dir := filepath.Join("testdata", tc.name)
5760

5861
md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml"))
5962
require.NoError(t, err)
@@ -75,6 +78,7 @@ func TestAggregation(t *testing.T) {
7578
require.Empty(t, processor.numberLookup)
7679
require.Empty(t, processor.histogramLookup)
7780
require.Empty(t, processor.expHistogramLookup)
81+
require.Empty(t, processor.summaryLookup)
7882

7983
// Exporting again should return nothing
8084
processor.exportMetrics()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
resourceMetrics:
2+
- schemaUrl: https://test-res-schema.com/schema
3+
resource:
4+
attributes:
5+
- key: asdf
6+
value:
7+
stringValue: foo
8+
scopeMetrics:
9+
- schemaUrl: https://test-scope-schema.com/schema
10+
scope:
11+
name: MyTestInstrument
12+
version: "1.2.3"
13+
attributes:
14+
- key: foo
15+
value:
16+
stringValue: bar
17+
metrics:
18+
- name: test.gauge
19+
gauge:
20+
aggregationTemporality: 2
21+
dataPoints:
22+
- timeUnixNano: 50
23+
asDouble: 345
24+
attributes:
25+
- key: aaa
26+
value:
27+
stringValue: bbb
28+
- timeUnixNano: 20
29+
asDouble: 258
30+
attributes:
31+
- key: aaa
32+
value:
33+
stringValue: bbb
34+
# For interval processor point of view, only the last datapoint should be passed through.
35+
- timeUnixNano: 80
36+
asDouble: 178
37+
attributes:
38+
- key: aaa
39+
value:
40+
stringValue: bbb
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
resourceMetrics: []
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
resourceMetrics:
2+
- schemaUrl: https://test-res-schema.com/schema
3+
resource:
4+
attributes:
5+
- key: asdf
6+
value:
7+
stringValue: foo
8+
scopeMetrics:
9+
- schemaUrl: https://test-scope-schema.com/schema
10+
scope:
11+
name: MyTestInstrument
12+
version: "1.2.3"
13+
attributes:
14+
- key: foo
15+
value:
16+
stringValue: bar
17+
metrics:
18+
- name: test.gauge
19+
gauge:
20+
aggregationTemporality: 2
21+
dataPoints:
22+
- timeUnixNano: 80
23+
asDouble: 178
24+
attributes:
25+
- key: aaa
26+
value:
27+
stringValue: bbb

processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,4 @@ resourceMetrics:
3636
attributes:
3737
- key: aaa
3838
value:
39-
stringValue: bbb
39+
stringValue: bbb

processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,4 @@ resourceMetrics:
3636
attributes:
3737
- key: aaa
3838
value:
39-
stringValue: bbb
39+
stringValue: bbb
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
resourceMetrics: []
1+
resourceMetrics: []
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
resourceMetrics:
2+
- schemaUrl: https://test-res-schema.com/schema
3+
resource:
4+
attributes:
5+
- key: asdf
6+
value:
7+
stringValue: foo
8+
scopeMetrics:
9+
- schemaUrl: https://test-scope-schema.com/schema
10+
scope:
11+
name: MyTestInstrument
12+
version: "1.2.3"
13+
attributes:
14+
- key: foo
15+
value:
16+
stringValue: bar
17+
metrics:
18+
- name: summary.test
19+
summary:
20+
dataPoints:
21+
- timeUnixNano: 50
22+
quantileValues:
23+
- quantile: 0.25
24+
value: 50
25+
- quantile: 0.5
26+
value: 20
27+
- quantile: 0.75
28+
value: 75
29+
- quantile: 0.95
30+
value: 10
31+
attributes:
32+
- key: aaa
33+
value:
34+
stringValue: bbb
35+
- timeUnixNano: 20
36+
quantileValues:
37+
- quantile: 0.25
38+
value: 40
39+
- quantile: 0.5
40+
value: 10
41+
- quantile: 0.75
42+
value: 60
43+
- quantile: 0.95
44+
value: 5
45+
attributes:
46+
- key: aaa
47+
value:
48+
stringValue: bbb
49+
# Only last summary should pass through
50+
- timeUnixNano: 80
51+
quantileValues:
52+
- quantile: 0.25
53+
value: 80
54+
- quantile: 0.5
55+
value: 35
56+
- quantile: 0.75
57+
value: 90
58+
- quantile: 0.95
59+
value: 15
60+
attributes:
61+
- key: aaa
62+
value:
63+
stringValue: bbb
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
resourceMetrics: []

0 commit comments

Comments
 (0)