Skip to content

Commit bf6ff84

Browse files
djaglowskizeck-ops
authored andcommitted
[connector/routing] Disconnect 'match_once' parameter (open-telemetry#37095)
1 parent 8fb97e2 commit bf6ff84

File tree

11 files changed

+65
-351
lines changed

11 files changed

+65
-351
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: routingconnector
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Change `match_once` parameter from `bool` to `*bool`.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [29882]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Boolean values should still unmarshal successfully, but direct instantiation in code will fail.
20+
The change allows us to check for usage and warn of the upcoming removal in v0.120.0.
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [api]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: routingconnector
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Disconnect `match_once` parameter from functionality.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [29882]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The parameter will be ignored, except to trigger a warning log about its upcoming removal in v0.120.0.
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: []

connector/routingconnector/config.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ type Config struct {
4343
Table []RoutingTableItem `mapstructure:"table"`
4444

4545
// MatchOnce determines whether the connector matches multiple statements.
46-
// Optional.
47-
MatchOnce bool `mapstructure:"match_once"`
46+
// Unused. Deprecated in v0.116.0. Will be removed in v0.120.0.
47+
MatchOnce *bool `mapstructure:"match_once"`
4848
}
4949

5050
// Validate checks if the processor configuration is valid.
@@ -77,10 +77,6 @@ func (c *Config) Validate() error {
7777
return err
7878
}
7979
fallthrough
80-
case "span", "metric", "datapoint", "log": // ok
81-
if !c.MatchOnce {
82-
return fmt.Errorf(`%q context is not supported with "match_once: false"`, item.Context)
83-
}
8480
default:
8581
return errors.New("invalid context: " + item.Context)
8682
}

connector/routingconnector/config_test.go

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ func TestLoadConfig(t *testing.T) {
2727
configPath: filepath.Join("testdata", "config", "traces.yaml"),
2828
id: component.NewIDWithName(metadata.Type, ""),
2929
expected: &Config{
30-
MatchOnce: true,
3130
DefaultPipelines: []pipeline.ID{
3231
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp-all"),
3332
},
@@ -53,7 +52,6 @@ func TestLoadConfig(t *testing.T) {
5352
configPath: filepath.Join("testdata", "config", "metrics.yaml"),
5453
id: component.NewIDWithName(metadata.Type, ""),
5554
expected: &Config{
56-
MatchOnce: true,
5755
DefaultPipelines: []pipeline.ID{
5856
pipeline.NewIDWithName(pipeline.SignalMetrics, "otlp-all"),
5957
},
@@ -79,7 +77,6 @@ func TestLoadConfig(t *testing.T) {
7977
configPath: filepath.Join("testdata", "config", "logs.yaml"),
8078
id: component.NewIDWithName(metadata.Type, ""),
8179
expected: &Config{
82-
MatchOnce: true,
8380
DefaultPipelines: []pipeline.ID{
8481
pipeline.NewIDWithName(pipeline.SignalLogs, "otlp-all"),
8582
},
@@ -221,70 +218,6 @@ func TestValidateConfig(t *testing.T) {
221218
},
222219
error: "invalid context: invalid",
223220
},
224-
{
225-
name: "span context with match_once false",
226-
config: &Config{
227-
MatchOnce: false,
228-
Table: []RoutingTableItem{
229-
{
230-
Context: "span",
231-
Statement: `route() where attributes["attr"] == "acme"`,
232-
Pipelines: []pipeline.ID{
233-
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
234-
},
235-
},
236-
},
237-
},
238-
error: `"span" context is not supported with "match_once: false"`,
239-
},
240-
{
241-
name: "metric context with match_once false",
242-
config: &Config{
243-
MatchOnce: false,
244-
Table: []RoutingTableItem{
245-
{
246-
Context: "metric",
247-
Statement: `route() where attributes["attr"] == "acme"`,
248-
Pipelines: []pipeline.ID{
249-
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
250-
},
251-
},
252-
},
253-
},
254-
error: `"metric" context is not supported with "match_once: false"`,
255-
},
256-
{
257-
name: "datapoint context with match_once false",
258-
config: &Config{
259-
MatchOnce: false,
260-
Table: []RoutingTableItem{
261-
{
262-
Context: "datapoint",
263-
Statement: `route() where attributes["attr"] == "acme"`,
264-
Pipelines: []pipeline.ID{
265-
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
266-
},
267-
},
268-
},
269-
},
270-
error: `"datapoint" context is not supported with "match_once: false"`,
271-
},
272-
{
273-
name: "log context with match_once false",
274-
config: &Config{
275-
MatchOnce: false,
276-
Table: []RoutingTableItem{
277-
{
278-
Context: "log",
279-
Statement: `route() where attributes["attr"] == "acme"`,
280-
Pipelines: []pipeline.ID{
281-
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
282-
},
283-
},
284-
},
285-
},
286-
error: `"log" context is not supported with "match_once: false"`,
287-
},
288221
{
289222
name: "request context with statement",
290223
config: &Config{
@@ -349,7 +282,6 @@ func withDefault(pipelines ...pipeline.ID) testConfigOption {
349282

350283
func testConfig(opts ...testConfigOption) *Config {
351284
cfg := createDefaultConfig().(*Config)
352-
cfg.MatchOnce = true
353285
for _, opt := range opts {
354286
opt(cfg)
355287
}

connector/routingconnector/factory.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ func NewFactory() connector.Factory {
3131
func createDefaultConfig() component.Config {
3232
return &Config{
3333
ErrorMode: ottl.PropagateError,
34-
MatchOnce: true,
3534
}
3635
}
3736

connector/routingconnector/logs.go

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ func newLogsConnector(
3535
) (*logsConnector, error) {
3636
cfg := config.(*Config)
3737

38-
if !cfg.MatchOnce {
39-
set.Logger.Error("The 'match_once' field has been deprecated and will be removed in v0.120.0. Remove usage of the parameter to suppress this warning.")
38+
if cfg.MatchOnce != nil {
39+
set.Logger.Error("The 'match_once' field has been deprecated and no longer has any effect. It will be removed in v0.120.0.")
4040
}
4141

4242
lr, ok := logs.(connector.LogsRouterAndConsumer)
@@ -65,15 +65,6 @@ func (c *logsConnector) Capabilities() consumer.Capabilities {
6565
}
6666

6767
func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
68-
if c.config.MatchOnce {
69-
return c.switchLogs(ctx, ld)
70-
}
71-
return c.matchAllLogs(ctx, ld)
72-
}
73-
74-
// switchLogs removes items from the original plog.Logs as they are matched,
75-
// and sends them to the appropriate consumer.
76-
func (c *logsConnector) switchLogs(ctx context.Context, ld plog.Logs) error {
7768
groups := make(map[consumer.Logs]plog.Logs)
7869
var errs error
7970
for i := 0; i < len(c.router.routeSlice) && ld.ResourceLogs().Len() > 0; i++ {
@@ -120,42 +111,6 @@ func (c *logsConnector) switchLogs(ctx context.Context, ld plog.Logs) error {
120111
return errs
121112
}
122113

123-
func (c *logsConnector) matchAllLogs(ctx context.Context, ld plog.Logs) error {
124-
// routingEntry is used to group plog.ResourceLogs that are routed to
125-
// the same set of exporters.
126-
// This way we're not ending up with all the logs split up which would cause
127-
// higher CPU usage.
128-
groups := make(map[consumer.Logs]plog.Logs)
129-
var errs error
130-
for i := 0; i < ld.ResourceLogs().Len(); i++ {
131-
rlogs := ld.ResourceLogs().At(i)
132-
rtx := ottlresource.NewTransformContext(rlogs.Resource(), rlogs)
133-
noRoutesMatch := true
134-
for _, route := range c.router.routeSlice {
135-
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
136-
if err != nil {
137-
if c.config.ErrorMode == ottl.PropagateError {
138-
return err
139-
}
140-
groupLogs(groups, c.router.defaultConsumer, rlogs)
141-
continue
142-
}
143-
if isMatch {
144-
noRoutesMatch = false
145-
groupLogs(groups, route.consumer, rlogs)
146-
}
147-
}
148-
if noRoutesMatch {
149-
// no route conditions are matched, add resource logs to default exporters group
150-
groupLogs(groups, c.router.defaultConsumer, rlogs)
151-
}
152-
}
153-
for consumer, group := range groups {
154-
errs = errors.Join(errs, consumer.ConsumeLogs(ctx, group))
155-
}
156-
return errs
157-
}
158-
159114
func groupAllLogs(
160115
groups map[consumer.Logs]plog.Logs,
161116
cons consumer.Logs,

connector/routingconnector/logs_test.go

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -160,57 +160,6 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
160160
assert.Empty(t, sink1.AllLogs())
161161
})
162162

163-
t.Run("logs matched by two expressions", func(t *testing.T) {
164-
resetSinks()
165-
166-
l := plog.NewLogs()
167-
168-
rl := l.ResourceLogs().AppendEmpty()
169-
rl.Resource().Attributes().PutStr("X-Tenant", "x_acme")
170-
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
171-
172-
rl = l.ResourceLogs().AppendEmpty()
173-
rl.Resource().Attributes().PutStr("X-Tenant", "_acme")
174-
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
175-
176-
require.NoError(t, conn.ConsumeLogs(context.Background(), l))
177-
178-
assert.Empty(t, defaultSink.AllLogs())
179-
assert.Len(t, sink0.AllLogs(), 1)
180-
assert.Len(t, sink1.AllLogs(), 1)
181-
182-
assert.Equal(t, 2, sink0.AllLogs()[0].LogRecordCount())
183-
assert.Equal(t, 2, sink1.AllLogs()[0].LogRecordCount())
184-
assert.Equal(t, sink0.AllLogs(), sink1.AllLogs())
185-
})
186-
187-
t.Run("one log matched by multiple expressions, other matched none", func(t *testing.T) {
188-
resetSinks()
189-
190-
l := plog.NewLogs()
191-
192-
rl := l.ResourceLogs().AppendEmpty()
193-
rl.Resource().Attributes().PutStr("X-Tenant", "_acme")
194-
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
195-
196-
rl = l.ResourceLogs().AppendEmpty()
197-
rl.Resource().Attributes().PutStr("X-Tenant", "something-else")
198-
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
199-
200-
require.NoError(t, conn.ConsumeLogs(context.Background(), l))
201-
202-
assert.Len(t, defaultSink.AllLogs(), 1)
203-
assert.Len(t, sink0.AllLogs(), 1)
204-
assert.Len(t, sink1.AllLogs(), 1)
205-
206-
assert.Equal(t, sink0.AllLogs(), sink1.AllLogs())
207-
208-
rlog := defaultSink.AllLogs()[0].ResourceLogs().At(0)
209-
attr, ok := rlog.Resource().Attributes().Get("X-Tenant")
210-
assert.True(t, ok, "routing attribute must exists")
211-
assert.Equal(t, "something-else", attr.AsString())
212-
})
213-
214163
t.Run("logs matched by one expression, multiple pipelines", func(t *testing.T) {
215164
resetSinks()
216165

@@ -253,7 +202,6 @@ func TestLogsAreCorrectlyMatchOnceWithOTTL(t *testing.T) {
253202
Pipelines: []pipeline.ID{logsDefault, logs0},
254203
},
255204
},
256-
MatchOnce: true,
257205
}
258206

259207
var defaultSink, sink0, sink1 consumertest.LogsSink

connector/routingconnector/metrics.go

Lines changed: 2 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ func newMetricsConnector(
3636
) (*metricsConnector, error) {
3737
cfg := config.(*Config)
3838

39-
if !cfg.MatchOnce {
40-
set.Logger.Error("The 'match_once' field has been deprecated and will be removed in v0.120.0. Remove usage of the parameter to suppress this warning.")
39+
if cfg.MatchOnce != nil {
40+
set.Logger.Error("The 'match_once' field has been deprecated and no longer has any effect. It will be removed in v0.120.0.")
4141
}
4242

4343
mr, ok := metrics.(connector.MetricsRouterAndConsumer)
@@ -66,13 +66,6 @@ func (c *metricsConnector) Capabilities() consumer.Capabilities {
6666
}
6767

6868
func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
69-
if c.config.MatchOnce {
70-
return c.switchMetrics(ctx, md)
71-
}
72-
return c.matchAllMetrics(ctx, md)
73-
}
74-
75-
func (c *metricsConnector) switchMetrics(ctx context.Context, md pmetric.Metrics) error {
7669
groups := make(map[consumer.Metrics]pmetric.Metrics)
7770
var errs error
7871
for i := 0; i < len(c.router.routeSlice) && md.ResourceMetrics().Len() > 0; i++ {
@@ -128,43 +121,6 @@ func (c *metricsConnector) switchMetrics(ctx context.Context, md pmetric.Metrics
128121
return errs
129122
}
130123

131-
func (c *metricsConnector) matchAllMetrics(ctx context.Context, md pmetric.Metrics) error {
132-
// groups is used to group pmetric.ResourceMetrics that are routed to
133-
// the same set of exporters. This way we're not ending up with all the
134-
// metrics split up which would cause higher CPU usage.
135-
groups := make(map[consumer.Metrics]pmetric.Metrics)
136-
137-
var errs error
138-
for i := 0; i < md.ResourceMetrics().Len(); i++ {
139-
rmetrics := md.ResourceMetrics().At(i)
140-
rtx := ottlresource.NewTransformContext(rmetrics.Resource(), rmetrics)
141-
142-
noRoutesMatch := true
143-
for _, route := range c.router.routeSlice {
144-
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
145-
if err != nil {
146-
if c.config.ErrorMode == ottl.PropagateError {
147-
return err
148-
}
149-
groupMetrics(groups, c.router.defaultConsumer, rmetrics)
150-
continue
151-
}
152-
if isMatch {
153-
noRoutesMatch = false
154-
groupMetrics(groups, route.consumer, rmetrics)
155-
}
156-
}
157-
if noRoutesMatch {
158-
// no route conditions are matched, add resource metrics to default exporters group
159-
groupMetrics(groups, c.router.defaultConsumer, rmetrics)
160-
}
161-
}
162-
for consumer, group := range groups {
163-
errs = errors.Join(errs, consumer.ConsumeMetrics(ctx, group))
164-
}
165-
return errs
166-
}
167-
168124
func groupAllMetrics(
169125
groups map[consumer.Metrics]pmetric.Metrics,
170126
cons consumer.Metrics,

0 commit comments

Comments
 (0)