Skip to content

Commit 60b7809

Browse files
edmocostaevan-bradley
authored andcommitted
[processor/transform] Add support for flat configuration style (open-telemetry#37444)
Co-authored-by: Evan Bradley <[email protected]>
1 parent 68c7bcb commit 60b7809

File tree

15 files changed

+792
-12
lines changed

15 files changed

+792
-12
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: processor/transformprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for flat configuration style.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [29017]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The flat configuration style allows users to configure statements by providing a list of statements instead of a
20+
structured configuration map. The statement's context is expressed by adding the context's name prefix to path names,
21+
which are used to infer and to select the appropriate context for the statement.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

processor/transformprocessor/config.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-co
55

66
import (
77
"errors"
8+
"fmt"
9+
"reflect"
810

911
"go.opentelemetry.io/collector/component"
12+
"go.opentelemetry.io/collector/confmap"
1013
"go.opentelemetry.io/collector/featuregate"
1114
"go.uber.org/multierr"
1215
"go.uber.org/zap"
@@ -44,6 +47,86 @@ type Config struct {
4447
logger *zap.Logger
4548
}
4649

50+
// Unmarshal is used internally by mapstructure to parse the transformprocessor configuration (Config),
51+
// adding support to structured and flat configuration styles.
52+
// When the flat configuration style is used, each statement becomes a new common.ContextStatements
53+
// object, with empty [common.ContextStatements.Context] value.
54+
// On the other hand, structured configurations are parsed following the mapstructure Config format.
55+
// Mixed configuration styles are also supported.
56+
//
57+
// Example of flat configuration:
58+
//
59+
// log_statements:
60+
// - set(attributes["service.new_name"], attributes["service.name"])
61+
// - delete_key(attributes, "service.name")
62+
//
63+
// Example of structured configuration:
64+
//
65+
// log_statements:
66+
// - context: "span"
67+
// statements:
68+
// - set(attributes["service.new_name"], attributes["service.name"])
69+
// - delete_key(attributes, "service.name")
70+
func (c *Config) Unmarshal(conf *confmap.Conf) error {
71+
if conf == nil {
72+
return nil
73+
}
74+
75+
contextStatementsFields := map[string]*[]common.ContextStatements{
76+
"trace_statements": &c.TraceStatements,
77+
"metric_statements": &c.MetricStatements,
78+
"log_statements": &c.LogStatements,
79+
}
80+
81+
flatContextStatements := map[string][]int{}
82+
contextStatementsPatch := map[string]any{}
83+
for fieldName := range contextStatementsFields {
84+
if !conf.IsSet(fieldName) {
85+
continue
86+
}
87+
rawVal := conf.Get(fieldName)
88+
values, ok := rawVal.([]any)
89+
if !ok {
90+
return fmt.Errorf("invalid %s type, expected: array, got: %t", fieldName, rawVal)
91+
}
92+
if len(values) == 0 {
93+
continue
94+
}
95+
96+
stmts := make([]any, 0, len(values))
97+
for i, value := range values {
98+
// Array of strings means it's a flat configuration style
99+
if reflect.TypeOf(value).Kind() == reflect.String {
100+
stmts = append(stmts, map[string]any{"statements": []any{value}})
101+
flatContextStatements[fieldName] = append(flatContextStatements[fieldName], i)
102+
} else {
103+
stmts = append(stmts, value)
104+
}
105+
}
106+
contextStatementsPatch[fieldName] = stmts
107+
}
108+
109+
if len(contextStatementsPatch) > 0 {
110+
err := conf.Merge(confmap.NewFromStringMap(contextStatementsPatch))
111+
if err != nil {
112+
return err
113+
}
114+
}
115+
116+
err := conf.Unmarshal(c)
117+
if err != nil {
118+
return err
119+
}
120+
121+
for fieldName, indexes := range flatContextStatements {
122+
for _, i := range indexes {
123+
(*contextStatementsFields[fieldName])[i].SharedCache = true
124+
}
125+
}
126+
127+
return err
128+
}
129+
47130
var _ component.Config = (*Config)(nil)
48131

49132
func (c *Config) Validate() error {

processor/transformprocessor/config_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,108 @@ func TestLoadConfig(t *testing.T) {
205205
},
206206
},
207207
},
208+
{
209+
id: component.NewIDWithName(metadata.Type, "flat_configuration"),
210+
expected: &Config{
211+
ErrorMode: ottl.PropagateError,
212+
TraceStatements: []common.ContextStatements{
213+
{
214+
SharedCache: true,
215+
Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`},
216+
},
217+
{
218+
SharedCache: true,
219+
Statements: []string{`set(resource.attributes["name"], "bear")`},
220+
},
221+
},
222+
MetricStatements: []common.ContextStatements{
223+
{
224+
SharedCache: true,
225+
Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`},
226+
},
227+
{
228+
SharedCache: true,
229+
Statements: []string{`set(resource.attributes["name"], "bear")`},
230+
},
231+
},
232+
LogStatements: []common.ContextStatements{
233+
{
234+
SharedCache: true,
235+
Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`},
236+
},
237+
{
238+
SharedCache: true,
239+
Statements: []string{`set(resource.attributes["name"], "bear")`},
240+
},
241+
},
242+
},
243+
},
244+
{
245+
id: component.NewIDWithName(metadata.Type, "mixed_configuration_styles"),
246+
expected: &Config{
247+
ErrorMode: ottl.PropagateError,
248+
TraceStatements: []common.ContextStatements{
249+
{
250+
SharedCache: true,
251+
Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`},
252+
},
253+
{
254+
Context: "span",
255+
Statements: []string{
256+
`set(attributes["name"], "bear")`,
257+
`keep_keys(attributes, ["http.method", "http.path"])`,
258+
},
259+
},
260+
{
261+
Statements: []string{`set(span.attributes["name"], "lion")`},
262+
},
263+
{
264+
SharedCache: true,
265+
Statements: []string{`set(span.name, "lion") where span.attributes["http.path"] == "/animal"`},
266+
},
267+
},
268+
MetricStatements: []common.ContextStatements{
269+
{
270+
SharedCache: true,
271+
Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`},
272+
},
273+
{
274+
Context: "resource",
275+
Statements: []string{
276+
`set(attributes["name"], "bear")`,
277+
`keep_keys(attributes, ["http.method", "http.path"])`,
278+
},
279+
},
280+
{
281+
Statements: []string{`set(metric.name, "lion")`},
282+
},
283+
{
284+
SharedCache: true,
285+
Statements: []string{`set(metric.name, "lion") where resource.attributes["http.path"] == "/animal"`},
286+
},
287+
},
288+
LogStatements: []common.ContextStatements{
289+
{
290+
SharedCache: true,
291+
Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`},
292+
},
293+
{
294+
Context: "resource",
295+
Statements: []string{
296+
`set(attributes["name"], "bear")`,
297+
`keep_keys(attributes, ["http.method", "http.path"])`,
298+
},
299+
},
300+
{
301+
Statements: []string{`set(log.attributes["name"], "lion")`},
302+
},
303+
{
304+
SharedCache: true,
305+
Statements: []string{`set(log.body, "lion") where log.attributes["http.path"] == "/animal"`},
306+
},
307+
},
308+
},
309+
},
208310
}
209311
for _, tt := range tests {
210312
t.Run(tt.id.Name(), func(t *testing.T) {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
5+
6+
import (
7+
"go.opentelemetry.io/collector/pdata/pcommon"
8+
)
9+
10+
// LoadContextCache retrieves or creates a context cache map for the given context ID.
11+
// If the cache is not found, a new map is created and stored in the contextCache map.
12+
func LoadContextCache(contextCache map[ContextID]*pcommon.Map, context ContextID) *pcommon.Map {
13+
v, ok := contextCache[context]
14+
if ok {
15+
return v
16+
}
17+
m := pcommon.NewMap()
18+
contextCache[context] = &m
19+
return &m
20+
}

processor/transformprocessor/internal/common/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ type ContextStatements struct {
3939
Context ContextID `mapstructure:"context"`
4040
Conditions []string `mapstructure:"conditions"`
4141
Statements []string `mapstructure:"statements"`
42+
43+
// `SharedCache` is an experimental feature that may change or be removed in the future.
44+
// When enabled, it allows the statements cache to be shared across all other groups that share the cache.
45+
// This feature is not configurable via `mapstructure` and cannot be set in configuration files.
46+
SharedCache bool `mapstructure:"-"`
4247
}
4348

4449
func (c ContextStatements) GetStatements() []string {

processor/transformprocessor/internal/common/logs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption {
7575
func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) {
7676
pcOptions := []ottl.ParserCollectionOption[LogsConsumer]{
7777
withCommonContextParsers[LogsConsumer](),
78+
ottl.EnableParserCollectionModifiedStatementLogging[LogsConsumer](true),
7879
}
7980

8081
for _, option := range options {

processor/transformprocessor/internal/common/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ func WithMetricErrorMode(errorMode ottl.ErrorMode) MetricParserCollectionOption
195195
func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) {
196196
pcOptions := []ottl.ParserCollectionOption[MetricsConsumer]{
197197
withCommonContextParsers[MetricsConsumer](),
198+
ottl.EnableParserCollectionModifiedStatementLogging[MetricsConsumer](true),
198199
}
199200

200201
for _, option := range options {

processor/transformprocessor/internal/common/traces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ func WithTraceErrorMode(errorMode ottl.ErrorMode) TraceParserCollectionOption {
123123
func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) {
124124
pcOptions := []ottl.ParserCollectionOption[TracesConsumer]{
125125
withCommonContextParsers[TracesConsumer](),
126+
ottl.EnableParserCollectionModifiedStatementLogging[TracesConsumer](true),
126127
}
127128

128129
for _, option := range options {

processor/transformprocessor/internal/logs/processor.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/pdata/pcommon"
1011
"go.opentelemetry.io/collector/pdata/plog"
1112
"go.uber.org/multierr"
1213
"go.uber.org/zap"
@@ -16,8 +17,13 @@ import (
1617
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
1718
)
1819

20+
type parsedContextStatements struct {
21+
common.LogsConsumer
22+
sharedCache bool
23+
}
24+
1925
type Processor struct {
20-
contexts []common.LogsConsumer
26+
contexts []parsedContextStatements
2127
logger *zap.Logger
2228
flatMode bool
2329
}
@@ -28,14 +34,14 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E
2834
return nil, err
2935
}
3036

31-
contexts := make([]common.LogsConsumer, len(contextStatements))
37+
contexts := make([]parsedContextStatements, len(contextStatements))
3238
var errors error
3339
for i, cs := range contextStatements {
3440
context, err := pc.ParseContextStatements(cs)
3541
if err != nil {
3642
errors = multierr.Append(errors, err)
3743
}
38-
contexts[i] = context
44+
contexts[i] = parsedContextStatements{context, cs.SharedCache}
3945
}
4046

4147
if errors != nil {
@@ -54,8 +60,14 @@ func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, e
5460
pdatautil.FlattenLogs(ld.ResourceLogs())
5561
defer pdatautil.GroupByResourceLogs(ld.ResourceLogs())
5662
}
63+
64+
sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts))
5765
for _, c := range p.contexts {
58-
err := c.ConsumeLogs(ctx, ld, nil)
66+
var cache *pcommon.Map
67+
if c.sharedCache {
68+
cache = common.LoadContextCache(sharedContextCache, c.Context())
69+
}
70+
err := c.ConsumeLogs(ctx, ld, cache)
5971
if err != nil {
6072
p.logger.Error("failed processing logs", zap.Error(err))
6173
return ld, err

0 commit comments

Comments
 (0)