Skip to content

Commit fa34718

Browse files
authored
Add ingoing and outgoing counts to processorhelper (#10910)
#### Description Implements ingoing and outgoing counts as described in #10708.
1 parent 2c22ed7 commit fa34718

File tree

12 files changed

+473
-4
lines changed

12 files changed

+473
-4
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: processor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add incoming and outgoing counts for processors using processorhelper.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [10910]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Any processor using the processorhelper package (this is most processors) will automatically report
20+
incoming and outgoing item counts. The new metrics are:
21+
- otelcol_processor_incoming_spans
22+
- otelcol_processor_outgoing_spans
23+
- otelcol_processor_incoming_metric_points
24+
- otelcol_processor_outgoing_metric_points
25+
- otelcol_processor_incoming_log_records
26+
- otelcol_processor_outgoing_log_records
27+
28+
# Optional: The change log or logs in which this entry should be included.
29+
# e.g. '[user]' or '[user, api]'
30+
# Include 'user' if the change is relevant to end users.
31+
# Include 'api' if there is a change to a library API.
32+
# Default: '[user]'
33+
change_logs: []

processor/memorylimiterprocessor/memorylimiter_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ func TestNoDataLoss(t *testing.T) {
5757
require.NoError(t, err)
5858

5959
processor, err := processorhelper.NewLogsProcessor(context.Background(), processor.Settings{
60-
ID: component.MustNewID("nop"),
60+
ID: component.MustNewID("nop"),
61+
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
6162
}, cfg, exporter,
6263
limiter.processLogs,
6364
processorhelper.WithStart(limiter.start),

processor/processorhelper/documentation.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,30 @@ Number of spans that were dropped.
5454
| ---- | ----------- | ---------- | --------- |
5555
| {spans} | Sum | Int | true |
5656

57+
### otelcol_processor_incoming_log_records
58+
59+
Number of log records passed to the processor.
60+
61+
| Unit | Metric Type | Value Type | Monotonic |
62+
| ---- | ----------- | ---------- | --------- |
63+
| {records} | Sum | Int | true |
64+
65+
### otelcol_processor_incoming_metric_points
66+
67+
Number of metric points passed to the processor.
68+
69+
| Unit | Metric Type | Value Type | Monotonic |
70+
| ---- | ----------- | ---------- | --------- |
71+
| {datapoints} | Sum | Int | true |
72+
73+
### otelcol_processor_incoming_spans
74+
75+
Number of spans passed to the processor.
76+
77+
| Unit | Metric Type | Value Type | Monotonic |
78+
| ---- | ----------- | ---------- | --------- |
79+
| {spans} | Sum | Int | true |
80+
5781
### otelcol_processor_inserted_log_records
5882

5983
Number of log records that were inserted.
@@ -78,6 +102,30 @@ Number of spans that were inserted.
78102
| ---- | ----------- | ---------- | --------- |
79103
| {spans} | Sum | Int | true |
80104

105+
### otelcol_processor_outgoing_log_records
106+
107+
Number of log records emitted from the processor.
108+
109+
| Unit | Metric Type | Value Type | Monotonic |
110+
| ---- | ----------- | ---------- | --------- |
111+
| {records} | Sum | Int | true |
112+
113+
### otelcol_processor_outgoing_metric_points
114+
115+
Number of metric points emitted from the processor.
116+
117+
| Unit | Metric Type | Value Type | Monotonic |
118+
| ---- | ----------- | ---------- | --------- |
119+
| {datapoints} | Sum | Int | true |
120+
121+
### otelcol_processor_outgoing_spans
122+
123+
Number of spans emitted from the processor.
124+
125+
| Unit | Metric Type | Value Type | Monotonic |
126+
| ---- | ----------- | ---------- | --------- |
127+
| {spans} | Sum | Int | true |
128+
81129
### otelcol_processor_refused_log_records
82130

83131
Number of log records that were rejected by the next component in the pipeline.

processor/processorhelper/internal/metadata/generated_telemetry.go

Lines changed: 42 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/processorhelper/logs.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,21 @@ func NewLogsProcessor(
3939
return nil, errors.New("nil logsFunc")
4040
}
4141

42+
obs, err := newObsReport(ObsReportSettings{
43+
ProcessorID: set.ID,
44+
ProcessorCreateSettings: set,
45+
})
46+
if err != nil {
47+
return nil, err
48+
}
49+
4250
eventOptions := spanAttributes(set.ID)
4351
bs := fromOptions(options)
4452
logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
4553
span := trace.SpanFromContext(ctx)
4654
span.AddEvent("Start processing.", eventOptions)
47-
var err error
55+
recordsIn := ld.LogRecordCount()
56+
4857
ld, err = logsFunc(ctx, ld)
4958
span.AddEvent("End processing.", eventOptions)
5059
if err != nil {
@@ -53,6 +62,8 @@ func NewLogsProcessor(
5362
}
5463
return err
5564
}
65+
recordsOut := ld.LogRecordCount()
66+
obs.recordInOut(ctx, component.DataTypeLogs, recordsIn, recordsOut)
5667
return nextConsumer.ConsumeLogs(ctx, ld)
5768
}, bs.consumerOptions...)
5869
if err != nil {

processor/processorhelper/logs_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,20 @@ package processorhelper
66
import (
77
"context"
88
"errors"
9+
"strings"
910
"testing"
1011

1112
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/require"
14+
"go.opentelemetry.io/otel/attribute"
15+
"go.opentelemetry.io/otel/metric"
16+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
17+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
18+
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
1319

1420
"go.opentelemetry.io/collector/component"
1521
"go.opentelemetry.io/collector/component/componenttest"
22+
"go.opentelemetry.io/collector/config/configtelemetry"
1623
"go.opentelemetry.io/collector/consumer"
1724
"go.opentelemetry.io/collector/consumer/consumertest"
1825
"go.opentelemetry.io/collector/pdata/plog"
@@ -67,3 +74,77 @@ func newTestLProcessor(retError error) ProcessLogsFunc {
6774
return ld, retError
6875
}
6976
}
77+
78+
func TestLogsProcessor_RecordInOut(t *testing.T) {
79+
// Regardless of how many logs are ingested, emit just one
80+
mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) {
81+
ld := plog.NewLogs()
82+
ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
83+
return ld, nil
84+
}
85+
86+
incomingLogs := plog.NewLogs()
87+
incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
88+
89+
// Add 3 records to the incoming
90+
incomingLogRecords.AppendEmpty()
91+
incomingLogRecords.AppendEmpty()
92+
incomingLogRecords.AppendEmpty()
93+
94+
metricReader := sdkmetric.NewManualReader()
95+
set := processortest.NewNopSettings()
96+
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic
97+
set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider {
98+
if level >= configtelemetry.LevelBasic {
99+
return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader))
100+
}
101+
return nil
102+
}
103+
104+
lp, err := NewLogsProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate)
105+
require.NoError(t, err)
106+
107+
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
108+
assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs))
109+
assert.NoError(t, lp.Shutdown(context.Background()))
110+
111+
ownMetrics := new(metricdata.ResourceMetrics)
112+
require.NoError(t, metricReader.Collect(context.Background(), ownMetrics))
113+
114+
require.Len(t, ownMetrics.ScopeMetrics, 1)
115+
require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2)
116+
117+
inMetric := ownMetrics.ScopeMetrics[0].Metrics[0]
118+
outMetric := ownMetrics.ScopeMetrics[0].Metrics[1]
119+
if strings.Contains(inMetric.Name, "outgoing") {
120+
inMetric, outMetric = outMetric, inMetric
121+
}
122+
123+
metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
124+
Temporality: metricdata.CumulativeTemporality,
125+
IsMonotonic: true,
126+
DataPoints: []metricdata.DataPoint[int64]{
127+
{
128+
Attributes: attribute.NewSet(attribute.KeyValue{
129+
Key: attribute.Key("processor"),
130+
Value: attribute.StringValue(set.ID.String()),
131+
}),
132+
Value: 3,
133+
},
134+
},
135+
}, inMetric.Data, metricdatatest.IgnoreTimestamp())
136+
137+
metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
138+
Temporality: metricdata.CumulativeTemporality,
139+
IsMonotonic: true,
140+
DataPoints: []metricdata.DataPoint[int64]{
141+
{
142+
Attributes: attribute.NewSet(attribute.KeyValue{
143+
Key: attribute.Key("processor"),
144+
Value: attribute.StringValue(set.ID.String()),
145+
}),
146+
Value: 1,
147+
},
148+
},
149+
}, outMetric.Data, metricdatatest.IgnoreTimestamp())
150+
}

processor/processorhelper/metadata.yaml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,55 @@ status:
99

1010
telemetry:
1111
metrics:
12+
13+
processor_incoming_spans:
14+
enabled: true
15+
description: Number of spans passed to the processor.
16+
unit: "{spans}"
17+
sum:
18+
value_type: int
19+
monotonic: true
20+
21+
processor_outgoing_spans:
22+
enabled: true
23+
description: Number of spans emitted from the processor.
24+
unit: "{spans}"
25+
sum:
26+
value_type: int
27+
monotonic: true
28+
29+
processor_incoming_metric_points:
30+
enabled: true
31+
description: Number of metric points passed to the processor.
32+
unit: "{datapoints}"
33+
sum:
34+
value_type: int
35+
monotonic: true
36+
37+
processor_outgoing_metric_points:
38+
enabled: true
39+
description: Number of metric points emitted from the processor.
40+
unit: "{datapoints}"
41+
sum:
42+
value_type: int
43+
monotonic: true
44+
45+
processor_incoming_log_records:
46+
enabled: true
47+
description: Number of log records passed to the processor.
48+
unit: "{records}"
49+
sum:
50+
value_type: int
51+
monotonic: true
52+
53+
processor_outgoing_log_records:
54+
enabled: true
55+
description: Number of log records emitted from the processor.
56+
unit: "{records}"
57+
sum:
58+
value_type: int
59+
monotonic: true
60+
1261
processor_accepted_spans:
1362
enabled: true
1463
description: Number of spans successfully pushed into the next component in the pipeline.

processor/processorhelper/metrics.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,21 @@ func NewMetricsProcessor(
3939
return nil, errors.New("nil metricsFunc")
4040
}
4141

42+
obs, err := newObsReport(ObsReportSettings{
43+
ProcessorID: set.ID,
44+
ProcessorCreateSettings: set,
45+
})
46+
if err != nil {
47+
return nil, err
48+
}
49+
4250
eventOptions := spanAttributes(set.ID)
4351
bs := fromOptions(options)
4452
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
4553
span := trace.SpanFromContext(ctx)
4654
span.AddEvent("Start processing.", eventOptions)
47-
var err error
55+
pointsIn := md.DataPointCount()
56+
4857
md, err = metricsFunc(ctx, md)
4958
span.AddEvent("End processing.", eventOptions)
5059
if err != nil {
@@ -53,6 +62,8 @@ func NewMetricsProcessor(
5362
}
5463
return err
5564
}
65+
pointsOut := md.DataPointCount()
66+
obs.recordInOut(ctx, component.DataTypeMetrics, pointsIn, pointsOut)
5667
return nextConsumer.ConsumeMetrics(ctx, md)
5768
}, bs.consumerOptions...)
5869
if err != nil {

0 commit comments

Comments
 (0)