Skip to content

Commit 3868323

Browse files
authored
[processor/tailsampling] Refactor internal telemetry (#33108)
This PR is the first step in refactoring the internal telemetry for the tail-sampling processor. There are no expected changes in user behavior between this and the current main, but all OpenCensus code is now isolated and can be replaced without affecting other parts of the code. Once this PR is merged, the next step is to create equivalent metrics using the OpenTelemetry Go API. Signed-off-by: Juraci Paixão Kröhling <[email protected]> --------- Signed-off-by: Juraci Paixão Kröhling <[email protected]>
1 parent aee4b75 commit 3868323

File tree

7 files changed

+160
-121
lines changed

7 files changed

+160
-121
lines changed

processor/tailsamplingprocessor/factory.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,38 +7,17 @@ package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry
77

88
import (
99
"context"
10-
"sync"
1110
"time"
1211

13-
"go.opencensus.io/stats/view"
1412
"go.opentelemetry.io/collector/component"
15-
"go.opentelemetry.io/collector/config/configtelemetry"
1613
"go.opentelemetry.io/collector/consumer"
17-
"go.opentelemetry.io/collector/featuregate"
1814
"go.opentelemetry.io/collector/processor"
1915

2016
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadata"
2117
)
2218

23-
var onceMetrics sync.Once
24-
25-
var metricStatCountSpansSampledFeatureGate = featuregate.GlobalRegistry().MustRegister(
26-
"processor.tailsamplingprocessor.metricstatcountspanssampled",
27-
featuregate.StageAlpha,
28-
featuregate.WithRegisterDescription("When enabled, a new metric stat_count_spans_sampled will be available in the tail sampling processor. Differently from stat_count_traces_sampled, this metric will count the number of spans sampled or not per sampling policy, where the original counts traces."),
29-
)
30-
31-
func isMetricStatCountSpansSampledEnabled() bool {
32-
return metricStatCountSpansSampledFeatureGate.IsEnabled()
33-
}
34-
3519
// NewFactory returns a new factory for the Tail Sampling processor.
3620
func NewFactory() processor.Factory {
37-
onceMetrics.Do(func() {
38-
// TODO: this is hardcoding the metrics level and skips error handling
39-
_ = view.Register(samplingProcessorMetricViews(configtelemetry.LevelNormal)...)
40-
})
41-
4221
return processor.NewFactory(
4322
metadata.Type,
4423
createDefaultConfig,

processor/tailsamplingprocessor/internal/sampling/package_test.go

Lines changed: 0 additions & 17 deletions
This file was deleted.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry"
5+
6+
import "go.opentelemetry.io/collector/featuregate"
7+
8+
var metricStatCountSpansSampledFeatureGate = featuregate.GlobalRegistry().MustRegister(
9+
"processor.tailsamplingprocessor.metricstatcountspanssampled",
10+
featuregate.StageAlpha,
11+
featuregate.WithRegisterDescription("When enabled, a new metric stat_count_spans_sampled will be available in the tail sampling processor. Differently from stat_count_traces_sampled, this metric will count the number of spans sampled or not per sampling policy, where the original counts traces."),
12+
)
13+
14+
func isMetricStatCountSpansSampledEnabled() bool {
15+
return metricStatCountSpansSampledFeatureGate.IsEnabled()
16+
}

processor/tailsamplingprocessor/metrics.go renamed to processor/tailsamplingprocessor/internal/telemetry/metrics_oc.go

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
4+
package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry"
55

66
import (
7+
"context"
8+
79
"go.opencensus.io/stats"
810
"go.opencensus.io/stats/view"
911
"go.opencensus.io/tag"
1012
"go.opentelemetry.io/collector/config/configtelemetry"
1113
"go.opentelemetry.io/collector/processor/processorhelper"
1214

1315
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadata"
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
1417
)
1518

1619
// Variables related to metrics specific to tail sampling.
@@ -19,6 +22,9 @@ var (
1922
tagSampledKey, _ = tag.NewKey("sampled")
2023
tagSourceFormat, _ = tag.NewKey("source_format")
2124

25+
tagMutatorSampled = []tag.Mutator{tag.Upsert(tagSampledKey, "true")}
26+
tagMutatorNotSampled = []tag.Mutator{tag.Upsert(tagSampledKey, "false")}
27+
2228
statDecisionLatencyMicroSec = stats.Int64("sampling_decision_latency", "Latency (in microseconds) of a given sampling policy", "µs")
2329
statOverallDecisionLatencyUs = stats.Int64("sampling_decision_timer_latency", "Latency (in microseconds) of each run of the sampling decision timer", "µs")
2430

@@ -36,6 +42,74 @@ var (
3642
statTracesOnMemoryGauge = stats.Int64("sampling_traces_on_memory", "Tracks the number of traces current on memory", stats.UnitDimensionless)
3743
)
3844

45+
func contextForPolicyOC(ctx context.Context, configName, format string) (context.Context, error) {
46+
return tag.New(ctx, tag.Upsert(tagPolicyKey, configName), tag.Upsert(tagSourceFormat, format))
47+
}
48+
49+
func recordFinalDecisionOC(ctx context.Context, latencyMicroSec, droppedTooEarly, evaluationErrors, tracesOnMemory int64, decision sampling.Decision) {
50+
stats.Record(ctx,
51+
statOverallDecisionLatencyUs.M(latencyMicroSec),
52+
statDroppedTooEarlyCount.M(droppedTooEarly),
53+
statPolicyEvaluationErrorCount.M(evaluationErrors),
54+
statTracesOnMemoryGauge.M(tracesOnMemory),
55+
)
56+
57+
var mutators []tag.Mutator
58+
switch decision {
59+
case sampling.Sampled:
60+
mutators = tagMutatorSampled
61+
case sampling.NotSampled:
62+
mutators = tagMutatorNotSampled
63+
}
64+
65+
_ = stats.RecordWithTags(
66+
ctx,
67+
mutators,
68+
statCountGlobalTracesSampled.M(int64(1)),
69+
)
70+
}
71+
72+
func recordPolicyLatencyOC(ctx context.Context, latencyMicroSec int64) {
73+
stats.Record(ctx,
74+
statDecisionLatencyMicroSec.M(latencyMicroSec),
75+
)
76+
}
77+
78+
func recordPolicyDecisionOC(ctx context.Context, sampled bool, numSpans int64) {
79+
var mutators []tag.Mutator
80+
if sampled {
81+
mutators = tagMutatorSampled
82+
} else {
83+
mutators = tagMutatorNotSampled
84+
}
85+
86+
_ = stats.RecordWithTags(
87+
ctx,
88+
mutators,
89+
statCountTracesSampled.M(int64(1)),
90+
)
91+
if isMetricStatCountSpansSampledEnabled() {
92+
_ = stats.RecordWithTags(
93+
ctx,
94+
mutators,
95+
statCountSpansSampled.M(numSpans),
96+
)
97+
}
98+
99+
}
100+
101+
func recordNewTraceIDsOC(ctx context.Context, count int64) {
102+
stats.Record(ctx, statNewTraceIDReceivedCount.M(count))
103+
}
104+
105+
func recordLateSpanOC(ctx context.Context, ageSec int64) {
106+
stats.Record(ctx, statLateSpanArrivalAfterDecision.M(ageSec))
107+
}
108+
109+
func recordTraceRemovalAgeOC(ctx context.Context, ageSec int64) {
110+
stats.Record(ctx, statTraceRemovalAgeSec.M(ageSec))
111+
}
112+
39113
// samplingProcessorMetricViews return the metrics views according to given telemetry level.
40114
func samplingProcessorMetricViews(level configtelemetry.Level) []*view.View {
41115
if level == configtelemetry.LevelNone {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry"
5+
6+
import (
7+
"context"
8+
"sync"
9+
10+
"go.opencensus.io/stats/view"
11+
"go.opentelemetry.io/collector/config/configtelemetry"
12+
13+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
14+
)
15+
16+
var onceTelemetry sync.Once
17+
18+
type T struct {
19+
ContextForPolicy func(ctx context.Context, configName, format string) (context.Context, error)
20+
RecordPolicyLatency func(ctx context.Context, latencyMicroSec int64)
21+
RecordPolicyDecision func(ctx context.Context, sampled bool, numSpans int64)
22+
RecordNewTraceIDs func(ctx context.Context, count int64)
23+
RecordLateSpan func(ctx context.Context, ageSec int64)
24+
RecordTraceRemovalAge func(ctx context.Context, ageSec int64)
25+
RecordFinalDecision func(ctx context.Context, latencyMicroSec, droppedTooEarly, evaluationErrors, tracesOnMemory int64, decision sampling.Decision)
26+
}
27+
28+
func New() *T {
29+
onceTelemetry.Do(func() {
30+
_ = view.Register(samplingProcessorMetricViews(configtelemetry.LevelNormal)...)
31+
})
32+
33+
return &T{
34+
ContextForPolicy: contextForPolicyOC,
35+
RecordPolicyLatency: recordPolicyLatencyOC,
36+
RecordPolicyDecision: recordPolicyDecisionOC,
37+
RecordNewTraceIDs: recordNewTraceIDsOC,
38+
RecordLateSpan: recordLateSpanOC,
39+
RecordTraceRemovalAge: recordTraceRemovalAgeOC,
40+
RecordFinalDecision: recordFinalDecisionOC,
41+
}
42+
}

0 commit comments

Comments
 (0)