Skip to content

Commit 3e50386

Browse files
committed
add context-based e2e pipeline processing duration metrics
1 parent 3818191 commit 3e50386

File tree

7 files changed

+152
-5
lines changed

7 files changed

+152
-5
lines changed

cmd/otelcol/config.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,10 @@ service:
4444

4545
traces:
4646
receivers: [otlp, opencensus, jaeger, zipkin]
47-
processors: [batch]
4847
exporters: [logging]
4948

5049
metrics:
5150
receivers: [otlp, opencensus, prometheus]
52-
processors: [batch]
5351
exporters: [logging]
5452

5553
extensions: [health_check, pprof, zpages]

obsreport/obsreport.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,15 @@ func AllViews() (views []*view.View) {
134134
tagKeys = []tag.Key{tagKeyProcessor}
135135
views = append(views, genViews(measures, tagKeys, view.Sum())...)
136136

137+
// Pipeline views.
138+
views = append(views, &view.View{
139+
Name: mPipelineMetricLatencyMetrics.Name(),
140+
Description: mPipelineMetricLatencyMetrics.Description(),
141+
TagKeys: []tag.Key{tagKeyPipeline},
142+
Measure: mPipelineMetricLatencyMetrics,
143+
Aggregation: view.Distribution(0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1),
144+
})
145+
137146
return views
138147
}
139148

obsreport/obsreport_consumer.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package obsreport
15+
16+
import (
17+
"context"
18+
19+
"go.opencensus.io/tag"
20+
"go.opentelemetry.io/collector/consumer"
21+
"go.opentelemetry.io/collector/consumer/pdata"
22+
)
23+
24+
// NewMetrics wraps a metrics consumer and adds pipeline tags.
25+
func NewMetrics(pipeline string, mc consumer.Metrics) consumer.Metrics {
26+
return metricsConsumer{next: mc, pipeline: pipeline}
27+
}
28+
29+
type metricsConsumer struct {
30+
next consumer.Metrics
31+
pipeline string
32+
}
33+
34+
var _ consumer.Metrics = (*metricsConsumer)(nil)
35+
36+
// ConsumeMetrics exports the pdata.Metrics to the next consumer after tagging.
37+
func (c metricsConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
38+
ctx, _ = tag.New(ctx,
39+
tag.Upsert(tagKeyPipeline, c.pipeline, tag.WithTTL(tag.TTLNoPropagation)))
40+
return c.next.ConsumeMetrics(ctx, md)
41+
}
42+
43+
// NewTraces wraps a traces consumer and adds pipeline tags.
44+
func NewTraces(pipeline string, tc consumer.Traces) consumer.Traces {
45+
return tracesConsumer{next: tc, pipeline: pipeline}
46+
}
47+
48+
type tracesConsumer struct {
49+
next consumer.Traces
50+
pipeline string
51+
}
52+
53+
var _ consumer.Traces = (*tracesConsumer)(nil)
54+
55+
// ConsumeTraces exports the pdata.Traces to the next consumer after tagging.
56+
func (c tracesConsumer) ConsumeTraces(ctx context.Context, md pdata.Traces) error {
57+
ctx, _ = tag.New(ctx,
58+
tag.Upsert(tagKeyPipeline, c.pipeline, tag.WithTTL(tag.TTLNoPropagation)))
59+
return c.next.ConsumeTraces(ctx, md)
60+
}
61+
62+
// NewLogs wraps a logs consumer and adds pipeline tags.
63+
func NewLogs(pipeline string, lc consumer.Logs) consumer.Logs {
64+
return logsConsumer{next: lc, pipeline: pipeline}
65+
}
66+
67+
type logsConsumer struct {
68+
next consumer.Logs
69+
pipeline string
70+
}
71+
72+
var _ consumer.Logs = (*logsConsumer)(nil)
73+
74+
// ConsumeLogs exports the pdata.Logs to the next consumer after tagging.
75+
func (c logsConsumer) ConsumeLogs(ctx context.Context, md pdata.Logs) error {
76+
ctx, _ = tag.New(ctx,
77+
tag.Upsert(tagKeyPipeline, c.pipeline, tag.WithTTL(tag.TTLNoPropagation)))
78+
return c.next.ConsumeLogs(ctx, md)
79+
}

obsreport/obsreport_exporter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ func (eor *Exporter) recordMetrics(ctx context.Context, numSent, numFailedToSend
167167
eor.mutators,
168168
sentMeasure.M(numSent),
169169
failedToSendMeasure.M(numFailedToSend))
170+
171+
recordPipelineDuration(ctx)
170172
}
171173

172174
func endSpan(ctx context.Context, err error, numSent, numFailedToSend int64, sentItemsKey, failedToSendItemsKey string) {

obsreport/obsreport_pipeline.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package obsreport
16+
17+
import (
18+
"context"
19+
"time"
20+
21+
"go.opencensus.io/stats"
22+
"go.opencensus.io/tag"
23+
)
24+
25+
const (
26+
// PipelineKey the name of the pipeline
27+
PipelineKey = "pipeline"
28+
29+
MetricLatencyKey = "metric_processing_duration_seconds"
30+
)
31+
32+
var (
33+
tagKeyPipeline, _ = tag.NewKey(PipelineKey)
34+
e2ePrefix = PipelineKey + nameSep
35+
mPipelineMetricLatencyMetrics = stats.Float64(
36+
e2ePrefix+MetricLatencyKey,
37+
"Duration of handling a metric in the pipeline.",
38+
stats.UnitSeconds)
39+
)
40+
41+
type pipelineStartContextKey struct{}
42+
43+
func recordPipelineStart(ctx context.Context) context.Context {
44+
return context.WithValue(ctx, pipelineStartContextKey{}, time.Now())
45+
}
46+
47+
func recordPipelineDuration(ctx context.Context) {
48+
startTime, _ := ctx.Value(pipelineStartContextKey{}).(time.Time)
49+
if startTime.IsZero() {
50+
return
51+
}
52+
stats.Record(
53+
ctx,
54+
mPipelineMetricLatencyMetrics.M(time.Since(startTime).Seconds()))
55+
}

obsreport/obsreport_receiver.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ func StartTraceDataReceiveOp(
152152
transport string,
153153
opt ...StartReceiveOption,
154154
) context.Context {
155+
operationCtx = recordPipelineStart(operationCtx)
155156
return traceReceiveOp(
156157
operationCtx,
157158
receiver,
@@ -186,6 +187,7 @@ func StartLogsReceiveOp(
186187
transport string,
187188
opt ...StartReceiveOption,
188189
) context.Context {
190+
operationCtx = recordPipelineStart(operationCtx)
189191
return traceReceiveOp(
190192
operationCtx,
191193
receiver,
@@ -220,6 +222,7 @@ func StartMetricsReceiveOp(
220222
transport string,
221223
opt ...StartReceiveOption,
222224
) context.Context {
225+
operationCtx = recordPipelineStart(operationCtx)
223226
return traceReceiveOp(
224227
operationCtx,
225228
receiver,

service/internal/builder/pipelines_builder.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"go.opentelemetry.io/collector/consumer"
2626
"go.opentelemetry.io/collector/consumer/consumererror"
2727
"go.opentelemetry.io/collector/consumer/fanoutconsumer"
28+
"go.opentelemetry.io/collector/obsreport"
2829
)
2930

3031
// builtPipeline is a pipeline that is built based on a config.
@@ -125,11 +126,11 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
125126

126127
switch pipelineCfg.InputType {
127128
case config.TracesDataType:
128-
tc = pb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters)
129+
tc = obsreport.NewTraces(pipelineCfg.Name, pb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters))
129130
case config.MetricsDataType:
130-
mc = pb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters)
131+
mc = obsreport.NewMetrics(pipelineCfg.Name, pb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters))
131132
case config.LogsDataType:
132-
lc = pb.buildFanoutExportersLogConsumer(pipelineCfg.Exporters)
133+
lc = obsreport.NewLogs(pipelineCfg.Name, pb.buildFanoutExportersLogConsumer(pipelineCfg.Exporters))
133134
}
134135

135136
mutatesConsumedData := false

0 commit comments

Comments
 (0)