Skip to content

Commit 88f1be0

Browse files
yvrhdnchengchuanpeng
authored andcommitted
[processor/tailsampling] Fixed sampling decision metrics (open-telemetry#37212)
#### Description Fixes some of the metrics emitted from sampling decisions. I believe `otelcol_processor_tail_sampling_sampling_trace_dropped_too_early` and `otelcol_processor_tail_sampling_sampling_policy_evaluation_error_total` are sometimes overcounted. The bug: `samplingPolicyOnTick` creates a struct `policyMetrics` to hold on to some counters. This struct is shared for all the traces that are evaluated during that tick: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/22c647a3ae134697d90b67c45879227ea54d63be/processor/tailsamplingprocessor/processor.go#L324 Each loop, the values of the counters are added to the metrics: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/22c647a3ae134697d90b67c45879227ea54d63be/processor/tailsamplingprocessor/processor.go#L340-L344 But the counters are not reset in between loops, so if the first evaluated trace could not be found this would set `idNotFoundOnMapCount` to `1`. Every loop after this will add `1` to `otelcol_processor_tail_sampling_sampling_trace_dropped_too_early` metric, even though the trace was found. I've moved the metrics outside of the for loop so the counters are only added once. #### Testing I have added a dedicated test for each metric processing multiple traces in one tick. ~~I've a added a test for `otelcol_processor_tail_sampling_sampling_trace_dropped_too_early`. I can add one for `sampling_policy_evaluation_error` too, just not sure how to deliberatly fail a policy.~~
1 parent 0d98576 commit 88f1be0

File tree

3 files changed

+167
-1
lines changed

3 files changed

+167
-1
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: tailsamplingprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fixed sampling decision metrics `otelcol_processor_tail_sampling_sampling_trace_dropped_too_early` and `otelcol_processor_tail_sampling_sampling_policy_evaluation_error_total`, these were sometimes overcounted.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37212]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
As a result of this change non-zero values of `otelcol_processor_tail_sampling_sampling_trace_dropped_too_early`
20+
and `otelcol_processor_tail_sampling_sampling_policy_evaluation_error_total` metrics will be lower.
21+
Before this fix, errors got counted several times depending on the amount of traces being processed
22+
that tick and where in the batch the error happened.
23+
Zero values are unaffected.
24+
25+
# If your change doesn't affect end users or the exported elements of any package,
26+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
27+
# Optional: The change log or logs in which this entry should be included.
28+
# e.g. '[user]' or '[user, api]'
29+
# Include 'user' if the change is relevant to end users.
30+
# Include 'api' if there is a change to a library API.
31+
# Default: '[user]'
32+
change_logs: [user]

processor/tailsamplingprocessor/processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,6 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
344344
decision := tsp.makeDecision(id, trace, &metrics)
345345

346346
tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Microsecond))
347-
tsp.telemetry.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(tsp.numTracesOnMap.Load()))
348347
tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, decisionToAttribute[decision])
349348

350349
// Sampled or not, remove the batches
@@ -362,6 +361,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
362361
}
363362
}
364363

364+
tsp.telemetry.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(tsp.numTracesOnMap.Load()))
365365
tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount)
366366
tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount)
367367

processor/tailsamplingprocessor/processor_telemetry_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
2121
"go.opentelemetry.io/otel/sdk/metric/metricdata"
2222
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
23+
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
2325
)
2426

2527
func TestMetricsAfterOneEvaluation(t *testing.T) {
@@ -522,6 +524,138 @@ func TestProcessorTailSamplingSamplingLateSpanAge(t *testing.T) {
522524
metricdatatest.AssertEqual(t, m, got, metricdatatest.IgnoreTimestamp())
523525
}
524526

527+
func TestProcessorTailSamplingSamplingTraceDroppedTooEarly(t *testing.T) {
528+
// prepare
529+
s := setupTestTelemetry()
530+
b := newSyncIDBatcher()
531+
syncBatcher := b.(*syncIDBatcher)
532+
533+
cfg := Config{
534+
DecisionWait: 1,
535+
NumTraces: 2,
536+
PolicyCfgs: []PolicyCfg{
537+
{
538+
sharedPolicyCfg: sharedPolicyCfg{
539+
Name: "always",
540+
Type: AlwaysSample,
541+
},
542+
},
543+
},
544+
}
545+
cs := &consumertest.TracesSink{}
546+
ct := s.newSettings()
547+
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
548+
require.NoError(t, err)
549+
defer func() {
550+
err = proc.Shutdown(context.Background())
551+
require.NoError(t, err)
552+
}()
553+
554+
err = proc.Start(context.Background(), componenttest.NewNopHost())
555+
require.NoError(t, err)
556+
557+
// test
558+
_, batches := generateIDsAndBatches(3)
559+
for _, batch := range batches {
560+
err = proc.ConsumeTraces(context.Background(), batch)
561+
require.NoError(t, err)
562+
}
563+
564+
tsp := proc.(*tailSamplingSpanProcessor)
565+
tsp.policyTicker.OnTick() // the first tick always gets an empty batch
566+
tsp.policyTicker.OnTick()
567+
568+
// verify
569+
var md metricdata.ResourceMetrics
570+
require.NoError(t, s.reader.Collect(context.Background(), &md))
571+
572+
m := metricdata.Metrics{
573+
Name: "otelcol_processor_tail_sampling_sampling_trace_dropped_too_early",
574+
Description: "Count of traces that needed to be dropped before the configured wait time",
575+
Unit: "{traces}",
576+
Data: metricdata.Sum[int64]{
577+
IsMonotonic: true,
578+
Temporality: metricdata.CumulativeTemporality,
579+
DataPoints: []metricdata.DataPoint[int64]{
580+
{
581+
Value: 1,
582+
},
583+
},
584+
},
585+
}
586+
587+
got := s.getMetric(m.Name, md)
588+
metricdatatest.AssertEqual(t, m, got, metricdatatest.IgnoreTimestamp())
589+
}
590+
591+
func TestProcessorTailSamplingSamplingPolicyEvaluationError(t *testing.T) {
592+
// prepare
593+
s := setupTestTelemetry()
594+
b := newSyncIDBatcher()
595+
syncBatcher := b.(*syncIDBatcher)
596+
597+
cfg := Config{
598+
DecisionWait: 1,
599+
NumTraces: 100,
600+
PolicyCfgs: []PolicyCfg{
601+
{
602+
sharedPolicyCfg: sharedPolicyCfg{
603+
Name: "ottl",
604+
Type: OTTLCondition,
605+
OTTLConditionCfg: OTTLConditionCfg{
606+
ErrorMode: ottl.PropagateError,
607+
SpanConditions: []string{"attributes[1] == \"test\""},
608+
},
609+
},
610+
},
611+
},
612+
}
613+
cs := &consumertest.TracesSink{}
614+
ct := s.newSettings()
615+
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
616+
require.NoError(t, err)
617+
defer func() {
618+
err = proc.Shutdown(context.Background())
619+
require.NoError(t, err)
620+
}()
621+
622+
err = proc.Start(context.Background(), componenttest.NewNopHost())
623+
require.NoError(t, err)
624+
625+
// test
626+
_, batches := generateIDsAndBatches(2)
627+
for _, batch := range batches {
628+
err = proc.ConsumeTraces(context.Background(), batch)
629+
require.NoError(t, err)
630+
}
631+
632+
tsp := proc.(*tailSamplingSpanProcessor)
633+
tsp.policyTicker.OnTick() // the first tick always gets an empty batch
634+
tsp.policyTicker.OnTick()
635+
636+
// verify
637+
var md metricdata.ResourceMetrics
638+
require.NoError(t, s.reader.Collect(context.Background(), &md))
639+
640+
m := metricdata.Metrics{
641+
Name: "otelcol_processor_tail_sampling_sampling_policy_evaluation_error",
642+
Description: "Count of sampling policy evaluation errors",
643+
Unit: "{errors}",
644+
Data: metricdata.Sum[int64]{
645+
IsMonotonic: true,
646+
Temporality: metricdata.CumulativeTemporality,
647+
DataPoints: []metricdata.DataPoint[int64]{
648+
{
649+
Value: 2,
650+
},
651+
},
652+
},
653+
}
654+
655+
got := s.getMetric(m.Name, md)
656+
metricdatatest.AssertEqual(t, m, got, metricdatatest.IgnoreTimestamp())
657+
}
658+
525659
type testTelemetry struct {
526660
reader *sdkmetric.ManualReader
527661
meterProvider *sdkmetric.MeterProvider

0 commit comments

Comments
 (0)