Skip to content

Commit 055572f

Browse files
committed
1 parent 080b765 commit 055572f

File tree

7 files changed

+73
-5
lines changed

7 files changed

+73
-5
lines changed

processor/processorhelper/documentation.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@
66

77
The following telemetry is emitted by this component.
88

9+
### otelcol_processor_errors
10+
11+
Number of errors emitted from the processor [alpha]
12+
13+
| Unit | Metric Type | Value Type | Monotonic |
14+
| ---- | ----------- | ---------- | --------- |
15+
| {errors} | Sum | Int | true |
16+
917
### otelcol_processor_incoming_items
1018

1119
Number of items passed to the processor. [alpha]
@@ -21,3 +29,11 @@ Number of items emitted from the processor. [alpha]
2129
| Unit | Metric Type | Value Type | Monotonic |
2230
| ---- | ----------- | ---------- | --------- |
2331
| {items} | Sum | Int | true |
32+
33+
### otelcol_processor_skips
34+
35+
Number of skips by processor [alpha]
36+
37+
| Unit | Metric Type | Value Type | Monotonic |
38+
| ---- | ----------- | ---------- | --------- |
39+
| {errors} | Sum | Int | true |

processor/processorhelper/internal/metadata/generated_telemetry.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/processorhelper/logs.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,20 @@ func NewLogs(
5151
span.AddEvent("Start processing.", eventOptions)
5252
recordsIn := ld.LogRecordCount()
5353

54+
obs.recordIn(ctx, recordsIn)
5455
var errFunc error
5556
ld, errFunc = logsFunc(ctx, ld)
5657
span.AddEvent("End processing.", eventOptions)
5758
if errFunc != nil {
5859
if errors.Is(errFunc, ErrSkipProcessingData) {
60+
obs.processorSkipped(ctx)
5961
return nil
6062
}
63+
obs.processorError(ctx)
6164
return errFunc
6265
}
6366
recordsOut := ld.LogRecordCount()
64-
obs.recordInOut(ctx, recordsIn, recordsOut)
67+
obs.recordOut(ctx, recordsOut)
6568
return nextConsumer.ConsumeLogs(ctx, ld)
6669
}, bs.consumerOptions...)
6770
if err != nil {

processor/processorhelper/metadata.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,22 @@ telemetry:
2828
sum:
2929
value_type: int
3030
monotonic: true
31+
32+
processor_errors:
33+
enabled: true
34+
stability:
35+
level: alpha
36+
description: Number of errors emitted from the processor
37+
unit: "{errors}"
38+
sum:
39+
value_type: int
40+
monotonic: true
41+
processor_skips:
42+
enabled: true
43+
stability:
44+
level: alpha
45+
description: Number of skips by processor
46+
unit: ""
47+
sum:
48+
value_type: int
49+
monotonic: true

processor/processorhelper/metrics.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,21 @@ func NewMetrics(
5050
span := trace.SpanFromContext(ctx)
5151
span.AddEvent("Start processing.", eventOptions)
5252
pointsIn := md.DataPointCount()
53+
obs.recordIn(ctx, pointsIn)
5354

5455
var errFunc error
5556
md, errFunc = metricsFunc(ctx, md)
5657
span.AddEvent("End processing.", eventOptions)
5758
if errFunc != nil {
5859
if errors.Is(errFunc, ErrSkipProcessingData) {
60+
obs.processorSkipped(ctx)
5961
return nil
6062
}
63+
obs.processorError(ctx)
6164
return errFunc
6265
}
6366
pointsOut := md.DataPointCount()
64-
obs.recordInOut(ctx, pointsIn, pointsOut)
67+
obs.recordOut(ctx, pointsOut)
6568
return nextConsumer.ConsumeMetrics(ctx, md)
6669
}, bs.consumerOptions...)
6770
if err != nil {

processor/processorhelper/obsreport.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,18 @@ func newObsReport(set processor.Settings, signal pipeline.Signal) (*obsReport, e
6262
}, nil
6363
}
6464

65-
func (or *obsReport) recordInOut(ctx context.Context, incoming, outgoing int) {
65+
func (or *obsReport) recordIn(ctx context.Context, incoming int) {
6666
or.telemetryBuilder.ProcessorIncomingItems.Add(ctx, int64(incoming), metric.WithAttributes(or.otelAttrs...))
67+
}
68+
69+
func (or *obsReport) recordOut(ctx context.Context, outgoing int) {
6770
or.telemetryBuilder.ProcessorOutgoingItems.Add(ctx, int64(outgoing), metric.WithAttributes(or.otelAttrs...))
6871
}
72+
73+
func (or *obsReport) processorError(ctx context.Context) {
74+
or.telemetryBuilder.ProcessorErrors.Add(ctx, 1, metric.WithAttributes(or.otelAttrs...))
75+
}
76+
77+
func (or *obsReport) processorSkipped(ctx context.Context) {
78+
or.telemetryBuilder.ProcessorSkips.Add(ctx, 1, metric.WithAttributes(or.otelAttrs...))
79+
}

processor/processorhelper/traces.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,22 @@ func NewTraces(
5151
span.AddEvent("Start processing.", eventOptions)
5252
spansIn := td.SpanCount()
5353

54+
obs.recordIn(ctx, spansIn)
5455
var errFunc error
5556
td, errFunc = tracesFunc(ctx, td)
5657
span.AddEvent("End processing.", eventOptions)
5758
if errFunc != nil {
5859
if errors.Is(errFunc, ErrSkipProcessingData) {
60+
obs.processorSkipped(ctx)
5961
return nil
6062
}
63+
obs.processorError(ctx)
6164
return errFunc
6265
}
6366
spansOut := td.SpanCount()
64-
obs.recordInOut(ctx, spansIn, spansOut)
67+
obs.recordOut(ctx, spansOut)
6568
return nextConsumer.ConsumeTraces(ctx, td)
6669
}, bs.consumerOptions...)
67-
6870
if err != nil {
6971
return nil, err
7072
}

0 commit comments

Comments
 (0)