Skip to content

Commit 4ebb7af

Browse files
authored
[processor/transform] Replace ParserCollection and add initial support for context inference (open-telemetry#37272)
1 parent 6156ecb commit 4ebb7af

File tree

14 files changed

+1938
-311
lines changed

14 files changed

+1938
-311
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: processor/transformprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Replace parser collection implementations with `ottl.ParserCollection` and add initial support for expressing statement's context via path names.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [29017]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

processor/transformprocessor/config_test.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,63 @@ func TestLoadConfig(t *testing.T) {
147147
id: component.NewIDWithName(metadata.Type, "bad_syntax_multi_signal"),
148148
errorLen: 3,
149149
},
150+
{
151+
id: component.NewIDWithName(metadata.Type, "structured_configuration_with_path_context"),
152+
expected: &Config{
153+
ErrorMode: ottl.PropagateError,
154+
TraceStatements: []common.ContextStatements{
155+
{
156+
Context: "span",
157+
Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`},
158+
},
159+
},
160+
MetricStatements: []common.ContextStatements{
161+
{
162+
Context: "metric",
163+
Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`},
164+
},
165+
},
166+
LogStatements: []common.ContextStatements{
167+
{
168+
Context: "log",
169+
Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`},
170+
},
171+
},
172+
},
173+
},
174+
{
175+
id: component.NewIDWithName(metadata.Type, "structured_configuration_with_inferred_context"),
176+
expected: &Config{
177+
ErrorMode: ottl.PropagateError,
178+
TraceStatements: []common.ContextStatements{
179+
{
180+
Statements: []string{
181+
`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`,
182+
`set(resource.attributes["name"], "bear")`,
183+
},
184+
},
185+
},
186+
MetricStatements: []common.ContextStatements{
187+
{
188+
Statements: []string{
189+
`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`,
190+
`set(resource.attributes["name"], "bear")`,
191+
},
192+
},
193+
},
194+
LogStatements: []common.ContextStatements{
195+
{
196+
Statements: []string{
197+
`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`,
198+
`set(resource.attributes["name"], "bear")`,
199+
},
200+
},
201+
},
202+
},
203+
},
150204
}
151205
for _, tt := range tests {
152-
t.Run(tt.id.String(), func(t *testing.T) {
206+
t.Run(tt.id.Name(), func(t *testing.T) {
153207
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
154208
assert.NoError(t, err)
155209

processor/transformprocessor/internal/common/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-cont
66
import (
77
"fmt"
88
"strings"
9+
10+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
911
)
1012

13+
var _ ottl.StatementsGetter = (*ContextStatements)(nil)
14+
1115
type ContextID string
1216

1317
const (
@@ -36,3 +40,15 @@ type ContextStatements struct {
3640
Conditions []string `mapstructure:"conditions"`
3741
Statements []string `mapstructure:"statements"`
3842
}
43+
44+
func (c ContextStatements) GetStatements() []string {
45+
return c.Statements
46+
}
47+
48+
func toContextStatements(statements any) (*ContextStatements, error) {
49+
contextStatements, ok := statements.(ContextStatements)
50+
if !ok {
51+
return nil, fmt.Errorf("invalid context statements type, expected: common.ContextStatements, got: %T", statements)
52+
}
53+
return &contextStatements, nil
54+
}

processor/transformprocessor/internal/common/logs.go

Lines changed: 41 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -7,38 +7,37 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10-
"go.opentelemetry.io/collector/consumer"
10+
"go.opentelemetry.io/collector/pdata/pcommon"
1111
"go.opentelemetry.io/collector/pdata/plog"
1212

1313
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr"
1414
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl"
1515
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
1616
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
17-
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource"
18-
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope"
1917
)
2018

21-
var _ consumer.Logs = &logStatements{}
19+
type LogsConsumer interface {
20+
Context() ContextID
21+
ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error
22+
}
2223

2324
type logStatements struct {
2425
ottl.StatementSequence[ottllog.TransformContext]
2526
expr.BoolExpr[ottllog.TransformContext]
2627
}
2728

28-
func (l logStatements) Capabilities() consumer.Capabilities {
29-
return consumer.Capabilities{
30-
MutatesData: true,
31-
}
29+
func (l logStatements) Context() ContextID {
30+
return Log
3231
}
3332

34-
func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
33+
func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error {
3534
for i := 0; i < ld.ResourceLogs().Len(); i++ {
3635
rlogs := ld.ResourceLogs().At(i)
3736
for j := 0; j < rlogs.ScopeLogs().Len(); j++ {
3837
slogs := rlogs.ScopeLogs().At(j)
3938
logs := slogs.LogRecords()
4039
for k := 0; k < logs.Len(); k++ {
41-
tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource(), slogs, rlogs)
40+
tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource(), slogs, rlogs, ottllog.WithCache(cache))
4241
condition, err := l.BoolExpr.Eval(ctx, tCtx)
4342
if err != nil {
4443
return err
@@ -55,76 +54,59 @@ func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
5554
return nil
5655
}
5756

58-
type LogParserCollection struct {
59-
parserCollection
60-
logParser ottl.Parser[ottllog.TransformContext]
61-
}
57+
type LogParserCollection ottl.ParserCollection[LogsConsumer]
6258

63-
type LogParserCollectionOption func(*LogParserCollection) error
59+
type LogParserCollectionOption ottl.ParserCollectionOption[LogsConsumer]
6460

6561
func WithLogParser(functions map[string]ottl.Factory[ottllog.TransformContext]) LogParserCollectionOption {
66-
return func(lp *LogParserCollection) error {
67-
logParser, err := ottllog.NewParser(functions, lp.settings)
62+
return func(pc *ottl.ParserCollection[LogsConsumer]) error {
63+
logParser, err := ottllog.NewParser(functions, pc.Settings, ottllog.EnablePathContextNames())
6864
if err != nil {
6965
return err
7066
}
71-
lp.logParser = logParser
72-
return nil
67+
return ottl.WithParserCollectionContext(ottllog.ContextName, &logParser, convertLogStatements)(pc)
7368
}
7469
}
7570

7671
func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption {
77-
return func(lp *LogParserCollection) error {
78-
lp.errorMode = errorMode
79-
return nil
80-
}
72+
return LogParserCollectionOption(ottl.WithParserCollectionErrorMode[LogsConsumer](errorMode))
8173
}
8274

8375
func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) {
84-
rp, err := ottlresource.NewParser(ResourceFunctions(), settings)
76+
pcOptions := []ottl.ParserCollectionOption[LogsConsumer]{
77+
withCommonContextParsers[LogsConsumer](),
78+
}
79+
80+
for _, option := range options {
81+
pcOptions = append(pcOptions, ottl.ParserCollectionOption[LogsConsumer](option))
82+
}
83+
84+
pc, err := ottl.NewParserCollection(settings, pcOptions...)
8585
if err != nil {
8686
return nil, err
8787
}
88-
sp, err := ottlscope.NewParser(ScopeFunctions(), settings)
88+
89+
lpc := LogParserCollection(*pc)
90+
return &lpc, nil
91+
}
92+
93+
func convertLogStatements(pc *ottl.ParserCollection[LogsConsumer], _ *ottl.Parser[ottllog.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottllog.TransformContext]) (LogsConsumer, error) {
94+
contextStatements, err := toContextStatements(statements)
8995
if err != nil {
9096
return nil, err
9197
}
92-
lpc := &LogParserCollection{
93-
parserCollection: parserCollection{
94-
settings: settings,
95-
resourceParser: rp,
96-
scopeParser: sp,
97-
},
98+
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardLogFuncs())
99+
if errGlobalBoolExpr != nil {
100+
return nil, errGlobalBoolExpr
98101
}
99-
100-
for _, op := range options {
101-
err := op(lpc)
102-
if err != nil {
103-
return nil, err
104-
}
105-
}
106-
107-
return lpc, nil
102+
lStatements := ottllog.NewStatementSequence(parsedStatements, pc.Settings, ottllog.WithStatementSequenceErrorMode(pc.ErrorMode))
103+
return logStatements{lStatements, globalExpr}, nil
108104
}
109105

110-
func (pc LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Logs, error) {
111-
switch contextStatements.Context {
112-
case Log:
113-
parsedStatements, err := pc.logParser.ParseStatements(contextStatements.Statements)
114-
if err != nil {
115-
return nil, err
116-
}
117-
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.parserCollection, filterottl.StandardLogFuncs())
118-
if errGlobalBoolExpr != nil {
119-
return nil, errGlobalBoolExpr
120-
}
121-
lStatements := ottllog.NewStatementSequence(parsedStatements, pc.settings, ottllog.WithStatementSequenceErrorMode(pc.errorMode))
122-
return logStatements{lStatements, globalExpr}, nil
123-
default:
124-
statements, err := pc.parseCommonContextStatements(contextStatements)
125-
if err != nil {
126-
return nil, err
127-
}
128-
return statements, nil
106+
func (lpc *LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (LogsConsumer, error) {
107+
pc := ottl.ParserCollection[LogsConsumer](*lpc)
108+
if contextStatements.Context != "" {
109+
return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true)
129110
}
111+
return pc.ParseStatements(contextStatements)
130112
}

0 commit comments

Comments
 (0)