Skip to content

Commit 1da23ed

Browse files
committed
[rotuingprocessor] Instrument routing processor with non-routed signals counters.
1 parent 23eeb67 commit 1da23ed

File tree

11 files changed

+244
-44
lines changed

11 files changed

+244
-44
lines changed

.chloggen/routing-processor.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: processor/routingprocessor
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Instrument the routing processor with non-routed spans/metricpoints/logrecords counters (OTel SDK).
9+
10+
# One or more tracking issues related to the change
11+
issues: [21476]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

processor/routingprocessor/factory.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ const (
3030
typeStr = "routing"
3131
// The stability level of the processor.
3232
stability = component.StabilityLevelBeta
33+
34+
scopeName = "github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor"
35+
nameSep = "/"
36+
37+
processorKey = "processor"
38+
metricSep = "_"
39+
nonRoutedSpansKey = "non_routed_spans"
40+
nonRoutedMetricPointsKey = "non_routed_metric_points"
41+
nonRoutedLogRecordsKey = "non_routed_log_records"
3342
)
3443

3544
// NewFactory creates a factory for the routing processor.
@@ -52,17 +61,17 @@ func createDefaultConfig() component.Config {
5261

5362
func createTracesProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
5463
warnIfNotLastInPipeline(nextConsumer, params.Logger)
55-
return newTracesProcessor(params.TelemetrySettings, cfg), nil
64+
return newTracesProcessor(params.TelemetrySettings, cfg)
5665
}
5766

5867
func createMetricsProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (processor.Metrics, error) {
5968
warnIfNotLastInPipeline(nextConsumer, params.Logger)
60-
return newMetricProcessor(params.TelemetrySettings, cfg), nil
69+
return newMetricProcessor(params.TelemetrySettings, cfg)
6170
}
6271

6372
func createLogsProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, nextConsumer consumer.Logs) (processor.Logs, error) {
6473
warnIfNotLastInPipeline(nextConsumer, params.Logger)
65-
return newLogProcessor(params.TelemetrySettings, cfg), nil
74+
return newLogProcessor(params.TelemetrySettings, cfg)
6675
}
6776

6877
func warnIfNotLastInPipeline(nextConsumer interface{}, logger *zap.Logger) {

processor/routingprocessor/factory_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,17 @@ import (
3131
"go.opentelemetry.io/collector/pdata/ptrace"
3232
"go.opentelemetry.io/collector/processor/processorhelper"
3333
"go.opentelemetry.io/collector/processor/processortest"
34+
"go.opentelemetry.io/otel/metric/noop"
35+
"go.opentelemetry.io/otel/trace"
3436
"go.uber.org/zap"
3537
)
3638

39+
var noopTelemetrySettings = component.TelemetrySettings{
40+
TracerProvider: trace.NewNoopTracerProvider(),
41+
MeterProvider: noop.NewMeterProvider(),
42+
Logger: zap.NewNop(),
43+
}
44+
3745
func TestProcessorGetsCreatedWithValidConfiguration(t *testing.T) {
3846
// prepare
3947
factory := NewFactory()
@@ -174,7 +182,9 @@ func TestProcessorDoesNotFailToBuildExportersWithMultiplePipelines(t *testing.T)
174182
require.NoError(t, err)
175183
require.NoError(t, component.UnmarshalConfig(sub, cfg))
176184

177-
exp := newMetricProcessor(component.TelemetrySettings{Logger: zap.NewNop()}, cfg)
185+
exp, err := newMetricProcessor(noopTelemetrySettings, cfg)
186+
require.NoError(t, err)
187+
178188
err = exp.Start(context.Background(), host)
179189
// assert that no error is thrown due to multiple pipelines and exporters not using the routing processor
180190
assert.NoError(t, err)

processor/routingprocessor/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ require (
1212
go.opentelemetry.io/collector/exporter v0.77.0
1313
go.opentelemetry.io/collector/exporter/otlpexporter v0.77.0
1414
go.opentelemetry.io/collector/pdata v1.0.0-rcv0011
15+
go.opentelemetry.io/otel v1.15.1
16+
go.opentelemetry.io/otel/metric v0.38.1
17+
go.opentelemetry.io/otel/trace v1.15.1
1518
go.uber.org/multierr v1.11.0
1619
go.uber.org/zap v1.24.0
1720
google.golang.org/grpc v1.55.0
@@ -43,9 +46,6 @@ require (
4346
go.opentelemetry.io/collector/featuregate v0.77.0 // indirect
4447
go.opentelemetry.io/collector/receiver v0.77.0 // indirect
4548
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.41.1 // indirect
46-
go.opentelemetry.io/otel v1.15.1 // indirect
47-
go.opentelemetry.io/otel/metric v0.38.1 // indirect
48-
go.opentelemetry.io/otel/trace v1.15.1 // indirect
4949
go.uber.org/atomic v1.10.0 // indirect
5050
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
5151
golang.org/x/net v0.10.0 // indirect

processor/routingprocessor/logs.go

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"go.opentelemetry.io/collector/pdata/pcommon"
2424
"go.opentelemetry.io/collector/pdata/plog"
2525
"go.opentelemetry.io/collector/processor"
26+
"go.opentelemetry.io/otel/attribute"
27+
"go.opentelemetry.io/otel/metric"
2628
"go.uber.org/multierr"
2729
"go.uber.org/zap"
2830

@@ -39,12 +41,26 @@ type logProcessor struct {
3941

4042
extractor extractor
4143
router router[exporter.Logs, ottllog.TransformContext]
44+
45+
nonRoutedLogRecordsCounter metric.Int64Counter
4246
}
4347

44-
func newLogProcessor(settings component.TelemetrySettings, config component.Config) *logProcessor {
48+
func newLogProcessor(settings component.TelemetrySettings, config component.Config) (*logProcessor, error) {
4549
cfg := rewriteRoutingEntriesToOTTL(config.(*Config))
4650

47-
logParser, _ := ottllog.NewParser(common.Functions[ottllog.TransformContext](), settings)
51+
logParser, err := ottllog.NewParser(common.Functions[ottllog.TransformContext](), settings)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
meter := settings.MeterProvider.Meter(scopeName + nameSep + "logs")
57+
nonRoutedLogRecordsCounter, err := meter.Int64Counter(
58+
typeStr+metricSep+processorKey+metricSep+nonRoutedLogRecordsKey,
59+
metric.WithDescription("Number of log records that were not routed to some or all exporters"),
60+
)
61+
if err != nil {
62+
return nil, err
63+
}
4864

4965
return &logProcessor{
5066
logger: settings.Logger,
@@ -55,8 +71,9 @@ func newLogProcessor(settings component.TelemetrySettings, config component.Conf
5571
settings,
5672
logParser,
5773
),
58-
extractor: newExtractor(cfg.FromAttribute, settings.Logger),
59-
}
74+
extractor: newExtractor(cfg.FromAttribute, settings.Logger),
75+
nonRoutedLogRecordsCounter: nonRoutedLogRecordsCounter,
76+
}, nil
6077
}
6178

6279
func (p *logProcessor) Start(_ context.Context, host component.Host) error {
@@ -111,6 +128,7 @@ func (p *logProcessor) route(ctx context.Context, l plog.Logs) error {
111128
return err
112129
}
113130
p.group("", groups, p.router.defaultExporters, rlogs)
131+
p.recordNonRoutedResourceLogs(ctx, key, rlogs)
114132
continue
115133
}
116134
if !isMatch {
@@ -123,6 +141,7 @@ func (p *logProcessor) route(ctx context.Context, l plog.Logs) error {
123141
if matchCount == 0 {
124142
// no route conditions are matched, add resource logs to default exporters group
125143
p.group("", groups, p.router.defaultExporters, rlogs)
144+
p.recordNonRoutedResourceLogs(ctx, "", rlogs)
126145
}
127146
}
128147
for _, g := range groups {
@@ -148,9 +167,34 @@ func (p *logProcessor) group(
148167
groups[key] = group
149168
}
150169

170+
func (p *logProcessor) recordNonRoutedResourceLogs(ctx context.Context, routingKey string, rlogs plog.ResourceLogs) {
171+
logRecordsCount := 0
172+
sl := rlogs.ScopeLogs()
173+
for j := 0; j < sl.Len(); j++ {
174+
logRecordsCount += sl.At(j).LogRecords().Len()
175+
}
176+
177+
p.nonRoutedLogRecordsCounter.Add(
178+
ctx,
179+
int64(logRecordsCount),
180+
metric.WithAttributes(
181+
attribute.String("routing_key", routingKey),
182+
),
183+
)
184+
}
185+
151186
func (p *logProcessor) routeForContext(ctx context.Context, l plog.Logs) error {
152187
value := p.extractor.extractFromContext(ctx)
153188
exporters := p.router.getExporters(value)
189+
if value == "" { // "" is a key for default exporters
190+
p.nonRoutedLogRecordsCounter.Add(
191+
ctx,
192+
int64(l.LogRecordCount()),
193+
metric.WithAttributes(
194+
attribute.String("routing_key", p.extractor.fromAttr),
195+
),
196+
)
197+
}
154198

155199
var errs error
156200
for _, e := range exporters {

processor/routingprocessor/logs_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"go.opentelemetry.io/collector/component"
2424
"go.opentelemetry.io/collector/consumer/consumertest"
2525
"go.opentelemetry.io/collector/pdata/plog"
26-
"go.uber.org/zap"
2726
"google.golang.org/grpc/metadata"
2827
)
2928

@@ -38,7 +37,8 @@ func TestLogProcessorCapabilities(t *testing.T) {
3837
}
3938

4039
// test
41-
p := newLogProcessor(component.TelemetrySettings{Logger: zap.NewNop()}, config)
40+
p, err := newLogProcessor(noopTelemetrySettings, config)
41+
require.NoError(t, err)
4242
require.NotNil(t, p)
4343

4444
// verify
@@ -56,7 +56,7 @@ func TestLogs_RoutingWorks_Context(t *testing.T) {
5656
},
5757
})
5858

59-
exp := newLogProcessor(component.TelemetrySettings{Logger: zap.NewNop()}, &Config{
59+
exp, err := newLogProcessor(noopTelemetrySettings, &Config{
6060
FromAttribute: "X-Tenant",
6161
AttributeSource: contextAttributeSource,
6262
DefaultExporters: []string{"otlp"},
@@ -67,6 +67,8 @@ func TestLogs_RoutingWorks_Context(t *testing.T) {
6767
},
6868
},
6969
})
70+
require.NoError(t, err)
71+
7072
require.NoError(t, exp.Start(context.Background(), host))
7173

7274
l := plog.NewLogs()
@@ -115,7 +117,7 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) {
115117
},
116118
})
117119

118-
exp := newLogProcessor(component.TelemetrySettings{Logger: zap.NewNop()}, &Config{
120+
exp, err := newLogProcessor(noopTelemetrySettings, &Config{
119121
FromAttribute: "X-Tenant",
120122
AttributeSource: resourceAttributeSource,
121123
DefaultExporters: []string{"otlp"},
@@ -126,6 +128,8 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) {
126128
},
127129
},
128130
})
131+
require.NoError(t, err)
132+
129133
require.NoError(t, exp.Start(context.Background(), host))
130134

131135
t.Run("non default route is properly used", func(t *testing.T) {
@@ -168,7 +172,7 @@ func TestLogs_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T)
168172
},
169173
})
170174

171-
exp := newLogProcessor(component.TelemetrySettings{Logger: zap.NewNop()}, &Config{
175+
exp, err := newLogProcessor(noopTelemetrySettings, &Config{
172176
AttributeSource: resourceAttributeSource,
173177
FromAttribute: "X-Tenant",
174178
DropRoutingResourceAttribute: true,
@@ -180,6 +184,8 @@ func TestLogs_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T)
180184
},
181185
},
182186
})
187+
require.NoError(t, err)
188+
183189
require.NoError(t, exp.Start(context.Background(), host))
184190

185191
l := plog.NewLogs()
@@ -210,7 +216,7 @@ func TestLogs_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) {
210216
},
211217
})
212218

213-
exp := newLogProcessor(component.TelemetrySettings{Logger: zap.NewNop()}, &Config{
219+
exp, err := newLogProcessor(noopTelemetrySettings, &Config{
214220
FromAttribute: "X-Tenant",
215221
AttributeSource: resourceAttributeSource,
216222
DefaultExporters: []string{"otlp"},
@@ -221,6 +227,7 @@ func TestLogs_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) {
221227
},
222228
},
223229
})
230+
require.NoError(t, err)
224231

225232
l := plog.NewLogs()
226233

@@ -264,7 +271,7 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
264271
},
265272
})
266273

267-
exp := newLogProcessor(component.TelemetrySettings{Logger: zap.NewNop()}, &Config{
274+
exp, err := newLogProcessor(noopTelemetrySettings, &Config{
268275
DefaultExporters: []string{"otlp"},
269276
Table: []RoutingTableItem{
270277
{
@@ -277,6 +284,7 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
277284
},
278285
},
279286
})
287+
require.NoError(t, err)
280288

281289
require.NoError(t, exp.Start(context.Background(), host))
282290

0 commit comments

Comments
 (0)