Skip to content

Commit c777fed

Browse files
bacherflzeck-ops
authored andcommitted
[processor/cumulativetodelta] Add metric type filter (open-telemetry#34407)
**Description:** Add metric type filter for cumulativetodelta processor **Link to tracking Issue:** open-telemetry#33673 **Testing:** Added unit tests **Documentation:** Extended the readme of this component to describe this new filter --------- Signed-off-by: Florian Bacher <[email protected]>
1 parent b96ac1d commit c777fed

File tree

11 files changed

+455
-73
lines changed

11 files changed

+455
-73
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: cumulativetodeltaprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add metric type filter for cumulativetodelta processor
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [33673]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

processor/cumulativetodeltaprocessor/README.md

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ Configuration is specified through a list of metrics. The processor uses metric
2323

2424
The following settings can be optionally configured:
2525

26-
- `include`: List of metrics names or patterns to convert to delta.
27-
- `exclude`: List of metrics names or patterns to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.**
26+
- `include`: List of metrics names (case-insensitive), patterns or metric types to convert to delta. Valid values are: `sum`, `histogram`.
27+
- `exclude`: List of metrics names (case-insensitive), patterns or metric types to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.** Valid values are: `sum`, `histogram`.
2828
- `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0
2929
- `initial_value`: Handling of the first observed point for a given metric identity.
3030
When the collector (re)starts, there's no record of how much of a given cumulative counter has already been converted to delta values.
@@ -56,6 +56,17 @@ processors:
5656
match_type: strict
5757
```
5858
59+
```yaml
60+
processors:
61+
# processor name: cumulativetodelta
62+
cumulativetodelta:
63+
64+
# Convert all sum metrics
65+
include:
66+
metric_types:
67+
- sum
68+
```
69+
5970
```yaml
6071
processors:
6172
# processor name: cumulativetodelta
@@ -69,6 +80,21 @@ processors:
6980
match_type: regexp
7081
```
7182
83+
```yaml
84+
processors:
85+
# processor name: cumulativetodelta
86+
cumulativetodelta:
87+
88+
# Convert cumulative sum metrics to delta
89+
# if and only if 'metric' is in the name
90+
include:
91+
metrics:
92+
- ".*metric.*"
93+
match_type: regexp
94+
metric_types:
95+
- sum
96+
```
97+
7298
```yaml
7399
processors:
74100
# processor name: cumulativetodelta
@@ -82,6 +108,22 @@ processors:
82108
match_type: regexp
83109
```
84110
111+
```yaml
112+
processors:
113+
# processor name: cumulativetodelta
114+
cumulativetodelta:
115+
116+
# Convert cumulative sum metrics with 'metric' in their name,
117+
# but exclude histogram metrics
118+
include:
119+
metrics:
120+
- ".*metric.*"
121+
match_type: regexp
122+
exclude:
123+
metric_types:
124+
- histogram
125+
```
126+
85127
```yaml
86128
processors:
87129
# processor name: cumulativetodelta

processor/cumulativetodeltaprocessor/config.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,24 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele
55

66
import (
77
"fmt"
8+
"strings"
89
"time"
910

1011
"go.opentelemetry.io/collector/component"
12+
"go.opentelemetry.io/collector/pdata/pmetric"
13+
"golang.org/x/exp/maps"
1114

1215
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset"
1316
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking"
1417
)
1518

19+
var validMetricTypes = map[string]bool{
20+
strings.ToLower(pmetric.MetricTypeSum.String()): true,
21+
strings.ToLower(pmetric.MetricTypeHistogram.String()): true,
22+
}
23+
24+
var validMetricTypeList = maps.Keys(validMetricTypes)
25+
1626
// Config defines the configuration for the processor.
1727
type Config struct {
1828
// MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely.
@@ -37,6 +47,8 @@ type MatchMetrics struct {
3747
filterset.Config `mapstructure:",squash"`
3848

3949
Metrics []string `mapstructure:"metrics"`
50+
51+
MetricTypes []string `mapstructure:"metric_types"`
4052
}
4153

4254
var _ component.Config = (*Config)(nil)
@@ -52,5 +64,24 @@ func (config *Config) Validate() error {
5264
(len(config.Exclude.MatchType) > 0 && len(config.Exclude.Metrics) == 0) {
5365
return fmt.Errorf("metrics must be supplied if match_type is set")
5466
}
67+
68+
for _, metricType := range config.Exclude.MetricTypes {
69+
if valid := validMetricTypes[strings.ToLower(metricType)]; !valid {
70+
return fmt.Errorf(
71+
"found invalid metric type in exclude.metric_types: %s. Valid values are %s",
72+
metricType,
73+
validMetricTypeList,
74+
)
75+
}
76+
}
77+
for _, metricType := range config.Include.MetricTypes {
78+
if valid := validMetricTypes[strings.ToLower(metricType)]; !valid {
79+
return fmt.Errorf(
80+
"found invalid metric type in include.metric_types: %s. Valid values are %s",
81+
metricType,
82+
validMetricTypeList,
83+
)
84+
}
85+
}
5586
return nil
5687
}

processor/cumulativetodeltaprocessor/config_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package cumulativetodeltaprocessor
55

66
import (
7+
"fmt"
78
"path/filepath"
89
"testing"
910
"time"
@@ -82,6 +83,45 @@ func TestLoadConfig(t *testing.T) {
8283
InitialValue: tracking.InitialValueAuto,
8384
},
8485
},
86+
{
87+
id: component.NewIDWithName(metadata.Type, "metric_type_filter"),
88+
expected: &Config{
89+
Include: MatchMetrics{
90+
Metrics: []string{
91+
"a*",
92+
},
93+
Config: filterset.Config{
94+
MatchType: "regexp",
95+
RegexpConfig: nil,
96+
},
97+
MetricTypes: []string{
98+
"sum",
99+
},
100+
},
101+
Exclude: MatchMetrics{
102+
Metrics: []string{
103+
"b*",
104+
},
105+
Config: filterset.Config{
106+
MatchType: "regexp",
107+
RegexpConfig: nil,
108+
},
109+
MetricTypes: []string{
110+
"histogram",
111+
},
112+
},
113+
MaxStaleness: 10 * time.Second,
114+
InitialValue: tracking.InitialValueAuto,
115+
},
116+
},
117+
{
118+
id: component.NewIDWithName(metadata.Type, "invalid_include_metric_type_filter"),
119+
errorMessage: fmt.Sprintf("found invalid metric type in include.metric_types: gauge. Valid values are %s", validMetricTypeList),
120+
},
121+
{
122+
id: component.NewIDWithName(metadata.Type, "invalid_exclude_metric_type_filter"),
123+
errorMessage: fmt.Sprintf("found invalid metric type in exclude.metric_types: Invalid. Valid values are %s", validMetricTypeList),
124+
},
85125
{
86126
id: component.NewIDWithName(metadata.Type, "missing_match_type"),
87127
errorMessage: "match_type must be set if metrics are supplied",

processor/cumulativetodeltaprocessor/factory.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ func createMetricsProcessor(
4040
return nil, fmt.Errorf("configuration parsing error")
4141
}
4242

43-
metricsProcessor := newCumulativeToDeltaProcessor(processorConfig, set.Logger)
43+
metricsProcessor, err := newCumulativeToDeltaProcessor(processorConfig, set.Logger)
44+
if err != nil {
45+
return nil, err
46+
}
4447

4548
return processorhelper.NewMetrics(
4649
ctx,

processor/cumulativetodeltaprocessor/factory_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package cumulativetodeltaprocessor
66
import (
77
"context"
88
"path/filepath"
9+
"strings"
910
"testing"
1011

1112
"github.com/stretchr/testify/assert"
@@ -61,6 +62,12 @@ func TestCreateProcessors(t *testing.T) {
6162
processortest.NewNopSettings(),
6263
cfg,
6364
consumertest.NewNop())
65+
66+
if strings.Contains(k, "invalid") {
67+
assert.Error(t, mErr)
68+
assert.Nil(t, mp)
69+
return
70+
}
6471
assert.NotNil(t, mp)
6572
assert.NoError(t, mErr)
6673
assert.NoError(t, mp.Shutdown(context.Background()))

processor/cumulativetodeltaprocessor/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
go.opentelemetry.io/collector/processor/processortest v0.118.1-0.20250121185328-fbefb22cc2b3
1717
go.uber.org/goleak v1.3.0
1818
go.uber.org/zap v1.27.0
19+
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
1920
)
2021

2122
require (

processor/cumulativetodeltaprocessor/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/cumulativetodeltaprocessor/processor.go

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele
55

66
import (
77
"context"
8+
"fmt"
89
"math"
10+
"strings"
911

1012
"go.opentelemetry.io/collector/pdata/pmetric"
1113
"go.uber.org/zap"
@@ -15,35 +17,71 @@ import (
1517
)
1618

1719
type cumulativeToDeltaProcessor struct {
18-
includeFS filterset.FilterSet
19-
excludeFS filterset.FilterSet
20-
logger *zap.Logger
21-
deltaCalculator *tracking.MetricTracker
22-
cancelFunc context.CancelFunc
20+
includeFS filterset.FilterSet
21+
excludeFS filterset.FilterSet
22+
includeMetricTypes map[pmetric.MetricType]bool
23+
excludeMetricTypes map[pmetric.MetricType]bool
24+
logger *zap.Logger
25+
deltaCalculator *tracking.MetricTracker
26+
cancelFunc context.CancelFunc
2327
}
2428

25-
func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor {
29+
func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) (*cumulativeToDeltaProcessor, error) {
2630
ctx, cancel := context.WithCancel(context.Background())
31+
2732
p := &cumulativeToDeltaProcessor{
28-
logger: logger,
29-
deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue),
30-
cancelFunc: cancel,
33+
logger: logger,
34+
cancelFunc: cancel,
3135
}
3236
if len(config.Include.Metrics) > 0 {
3337
p.includeFS, _ = filterset.CreateFilterSet(config.Include.Metrics, &config.Include.Config)
3438
}
3539
if len(config.Exclude.Metrics) > 0 {
3640
p.excludeFS, _ = filterset.CreateFilterSet(config.Exclude.Metrics, &config.Exclude.Config)
3741
}
38-
return p
42+
43+
if len(config.Include.MetricTypes) > 0 {
44+
includeMetricTypeFilter, err := getMetricTypeFilter(config.Include.MetricTypes)
45+
if err != nil {
46+
return nil, err
47+
}
48+
p.includeMetricTypes = includeMetricTypeFilter
49+
}
50+
51+
if len(config.Exclude.MetricTypes) > 0 {
52+
excludeMetricTypeFilter, err := getMetricTypeFilter(config.Exclude.MetricTypes)
53+
if err != nil {
54+
return nil, err
55+
}
56+
p.excludeMetricTypes = excludeMetricTypeFilter
57+
}
58+
59+
p.deltaCalculator = tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue)
60+
61+
return p, nil
62+
}
63+
64+
func getMetricTypeFilter(types []string) (map[pmetric.MetricType]bool, error) {
65+
res := map[pmetric.MetricType]bool{}
66+
for _, t := range types {
67+
switch strings.ToLower(t) {
68+
case strings.ToLower(pmetric.MetricTypeSum.String()):
69+
res[pmetric.MetricTypeSum] = true
70+
case strings.ToLower(pmetric.MetricTypeHistogram.String()):
71+
res[pmetric.MetricTypeHistogram] = true
72+
default:
73+
return nil, fmt.Errorf("unsupported metric type filter: %s", t)
74+
}
75+
}
76+
return res, nil
3977
}
4078

4179
// processMetrics implements the ProcessMetricsFunc type.
4280
func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
4381
md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool {
4482
rm.ScopeMetrics().RemoveIf(func(ilm pmetric.ScopeMetrics) bool {
4583
ilm.Metrics().RemoveIf(func(m pmetric.Metric) bool {
46-
if !ctdp.shouldConvertMetric(m.Name()) {
84+
if !ctdp.shouldConvertMetric(m) {
4785
return false
4886
}
4987
switch m.Type() {
@@ -111,9 +149,11 @@ func (ctdp *cumulativeToDeltaProcessor) shutdown(context.Context) error {
111149
return nil
112150
}
113151

114-
func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metricName string) bool {
115-
return (ctdp.includeFS == nil || ctdp.includeFS.Matches(metricName)) &&
116-
(ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metricName))
152+
func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metric pmetric.Metric) bool {
153+
return (ctdp.includeFS == nil || ctdp.includeFS.Matches(metric.Name())) &&
154+
(len(ctdp.includeMetricTypes) == 0 || ctdp.includeMetricTypes[metric.Type()]) &&
155+
(ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metric.Name())) &&
156+
(len(ctdp.excludeMetricTypes) == 0 || !ctdp.excludeMetricTypes[metric.Type()])
117157
}
118158

119159
func (ctdp *cumulativeToDeltaProcessor) convertNumberDataPoints(dps pmetric.NumberDataPointSlice, baseIdentity tracking.MetricIdentity) {

0 commit comments

Comments
 (0)