Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 81 additions & 14 deletions ddtrace/_trace/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from os import environ
from threading import RLock
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional

Expand Down Expand Up @@ -128,7 +127,13 @@ class TraceSamplingProcessor(TraceProcessor):
Agent even if the dropped trace is not (as is the case when trace stats computation is enabled).
"""

def __init__(self, compute_stats_enabled: bool, single_span_rules: List[SpanSamplingRule], apm_opt_out: bool):
def __init__(
self,
compute_stats_enabled: bool,
single_span_rules: List[SpanSamplingRule],
apm_opt_out: bool,
agent_based_samplers: Optional[dict] = None,
):
super(TraceSamplingProcessor, self).__init__()
self._compute_stats_enabled = compute_stats_enabled
self.single_span_rules = single_span_rules
Expand All @@ -137,9 +142,14 @@ def __init__(self, compute_stats_enabled: bool, single_span_rules: List[SpanSamp
# If ASM is enabled but tracing is disabled,
# we need to set the rate limiting to 1 trace per minute
# for the backend to consider the service as alive.
self.sampler = DatadogSampler(rate_limit=1, rate_limit_window=60e9, rate_limit_always_on=True)
self.sampler = DatadogSampler(
rate_limit=1,
rate_limit_window=60e9,
rate_limit_always_on=True,
agent_based_samplers=agent_based_samplers,
)
else:
self.sampler = DatadogSampler()
self.sampler = DatadogSampler(agent_based_samplers=agent_based_samplers)

def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
if trace:
Expand Down Expand Up @@ -260,8 +270,8 @@ def __init__(
self,
partial_flush_enabled: bool,
partial_flush_min_spans: int,
trace_processors: Iterable[TraceProcessor],
writer: Optional[TraceWriter] = None,
dd_processors: Optional[List[TraceProcessor]] = None,
user_processors: Optional[List[TraceProcessor]] = None,
):
# Set partial flushing
self.partial_flush_enabled = partial_flush_enabled
Expand All @@ -271,12 +281,10 @@ def __init__(
config._trace_compute_stats, get_span_sampling_rules(), asm_config._apm_opt_out
)
self.tags_processor = TraceTagsProcessor()
self.trace_processors = trace_processors
# Initialize writer
if writer is not None:
self.writer: TraceWriter = writer
elif SpanAggregator._use_log_writer():
self.writer = LogWriter()
self.dd_processors = dd_processors or []
self.user_processors = user_processors or []
if SpanAggregator._use_log_writer():
self.writer: TraceWriter = LogWriter()
else:
verify_url(agent_config.trace_agent_url)
self.writer = AgentWriter(
Expand Down Expand Up @@ -307,7 +315,9 @@ def __repr__(self) -> str:
f"{self.partial_flush_min_spans}, "
f"{self.sampling_processor},"
f"{self.tags_processor},"
f"{self.trace_processors}, "
f"{self.dd_processors}, "
f"{self.user_processors}, "
f"{self._span_metrics}, "
f"{self.writer})"
)

Expand Down Expand Up @@ -369,7 +379,9 @@ def on_span_finish(self, span: Span) -> None:
finished[0].set_metric("_dd.py.partial_flush", num_finished)

spans: Optional[List[Span]] = finished
for tp in chain(self.trace_processors, [self.sampling_processor, self.tags_processor]):
for tp in chain(
self.dd_processors, self.user_processors, [self.sampling_processor, self.tags_processor]
):
try:
if spans is None:
return
Expand Down Expand Up @@ -480,3 +492,58 @@ def _queue_span_count_metrics(self, metric_name: str, tag_name: str, min_count:
TELEMETRY_NAMESPACE.TRACERS, metric_name, count, tags=((tag_name, tag_value),)
)
self._span_metrics[metric_name] = defaultdict(int)

def reset(
self,
user_processors: Optional[List[TraceProcessor]] = None,
compute_stats: Optional[bool] = None,
apm_opt_out: Optional[bool] = None,
appsec_enabled: Optional[bool] = None,
reset_buffer: bool = True,
) -> None:
"""
Resets the internal state of the SpanAggregator, including the writer, sampling processor,
user-defined processors, and optionally the trace buffer and span metrics.

This method is typically used after a process fork or during runtime reconfiguration.
Arguments that are None will not override existing values.
"""
try:
# Stop the writer to ensure it is not running while we reconfigure it.
self.writer.stop()
except ServiceStatusError:
# Writers like AgentWriter may not start until the first trace is encoded.
# Stopping them before that will raise a ServiceStatusError.
pass

if isinstance(self.writer, AgentWriter) and appsec_enabled:
# Ensure AppSec metadata is encoded by setting the API version to v0.4.
self.writer._api_version = "v0.4"
# Re-create the writer to ensure it is consistent with updated configurations (ex: api_version)
self.writer = self.writer.recreate()

# Recreate the sampling processor using new or existing config values.
# If an argument is None, the current value is preserved.
if compute_stats is None:
compute_stats = self.sampling_processor._compute_stats_enabled
if apm_opt_out is None:
apm_opt_out = self.sampling_processor.apm_opt_out
self.sampling_processor = TraceSamplingProcessor(
compute_stats,
get_span_sampling_rules(),
apm_opt_out,
self.sampling_processor.sampler._agent_based_samplers,
)

# Update user processors if provided.
if user_processors is not None:
self.user_processors = user_processors

# Reset the trace buffer and span metrics.
# Useful when forking to prevent sending duplicate spans from parent and child processes.
if reset_buffer:
self._traces = defaultdict(lambda: _Trace())
self._span_metrics = {
"spans_created": defaultdict(int),
"spans_finished": defaultdict(int),
}
15 changes: 8 additions & 7 deletions ddtrace/_trace/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class DatadogSampler:
"limiter",
"rules",
"_rate_limit_always_on",
"_by_service_samplers",
"_agent_based_samplers",
)
_default_key = "service:,env:"

Expand All @@ -85,6 +85,7 @@ def __init__(
rate_limit: Optional[int] = None,
rate_limit_window: float = 1e9,
rate_limit_always_on: bool = False,
agent_based_samplers: Optional[Dict[str, RateSampler]] = None,
):
"""
Constructor for DatadogSampler sampler
Expand All @@ -101,7 +102,7 @@ def __init__(
else:
self.rules: List[SamplingRule] = rules or []
# Set Agent based samplers
self._by_service_samplers: Dict[str, RateSampler] = {}
self._agent_based_samplers = agent_based_samplers or {}
# Set rate limiter
self._rate_limit_always_on: bool = rate_limit_always_on
if rate_limit is None:
Expand All @@ -119,10 +120,10 @@ def update_rate_by_service_sample_rates(self, rate_by_service: Dict[str, float])
samplers: Dict[str, RateSampler] = {}
for key, sample_rate in rate_by_service.items():
samplers[key] = RateSampler(sample_rate)
self._by_service_samplers = samplers
self._agent_based_samplers = samplers

def __str__(self):
rates = {key: sampler.sample_rate for key, sampler in self._by_service_samplers.items()}
rates = {key: sampler.sample_rate for key, sampler in self._agent_based_samplers.items()}
return "{}(agent_rates={!r}, limiter={!r}, rules={!r}), rate_limit_always_on={!r}".format(
self.__class__.__name__,
rates,
Expand Down Expand Up @@ -181,11 +182,11 @@ def sample(self, span: Span) -> bool:
sample_rate = matched_rule.sample_rate
else:
key = self._key(span.service, span.get_tag(ENV_KEY))
if key in self._by_service_samplers:
if key in self._agent_based_samplers:
# Agent service based sampling
agent_service_based = True
sampled = self._by_service_samplers[key].sample(span)
sample_rate = self._by_service_samplers[key].sample_rate
sampled = self._agent_based_samplers[key].sample(span)
sample_rate = self._agent_based_samplers[key].sample_rate

if matched_rule or self._rate_limit_always_on:
# Avoid rate limiting when trace sample rules and/or sample rates are NOT provided
Expand Down
86 changes: 30 additions & 56 deletions ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@
from ddtrace.internal.processor.endpoint_call_counter import EndpointCallCounterProcessor
from ddtrace.internal.runtime import get_runtime_id
from ddtrace.internal.schema.processor import BaseServiceProcessor
from ddtrace.internal.service import ServiceStatusError
from ddtrace.internal.utils import _get_metas_to_propagate
from ddtrace.internal.utils.formats import format_trace_id
from ddtrace.internal.writer import AgentWriter
from ddtrace.internal.writer import HTTPWriter
from ddtrace.internal.writer import TraceWriter
from ddtrace.settings._config import config
from ddtrace.settings.asm import config as asm_config
from ddtrace.settings.peer_service import _ps_config
Expand Down Expand Up @@ -89,20 +87,9 @@ def _start_appsec_processor() -> Optional["AppSecSpanProcessor"]:


def _default_span_processors_factory(
trace_filters: List[TraceProcessor],
writer: Optional[TraceWriter],
partial_flush_enabled: bool,
partial_flush_min_spans: int,
profiling_span_processor: EndpointCallCounterProcessor,
) -> Tuple[List[SpanProcessor], Optional["AppSecSpanProcessor"], SpanAggregator]:
) -> Tuple[List[SpanProcessor], Optional["AppSecSpanProcessor"]]:
"""Construct the default list of span processors to use."""
trace_processors: List[TraceProcessor] = []
trace_processors += [
PeerServiceProcessor(_ps_config),
BaseServiceProcessor(),
]
trace_processors += trace_filters

span_processors: List[SpanProcessor] = []
span_processors += [TopLevelSpanProcessor()]

Expand Down Expand Up @@ -140,14 +127,7 @@ def _default_span_processors_factory(

span_processors.append(profiling_span_processor)

# These need to run after all the other processors
span_aggregagtor = SpanAggregator(
partial_flush_enabled=partial_flush_enabled,
partial_flush_min_spans=partial_flush_min_spans,
trace_processors=trace_processors,
writer=writer,
)
return span_processors, appsec_processor, span_aggregagtor
return span_processors, appsec_processor


class Tracer(object):
Expand Down Expand Up @@ -181,8 +161,6 @@ def __init__(self) -> None:
"Initializing multiple Tracer instances is not supported. Use ``ddtrace.trace.tracer`` instead.",
)

self._user_trace_processors: List[TraceProcessor] = []

# globally set tags
self._tags = config.tags.copy()

Expand All @@ -199,12 +177,13 @@ def __init__(self) -> None:
config._trace_compute_stats = False
# Direct link to the appsec processor
self._endpoint_call_counter_span_processor = EndpointCallCounterProcessor()
self._span_processors, self._appsec_processor, self._span_aggregator = _default_span_processors_factory(
self._user_trace_processors,
None,
config._partial_flush_enabled,
config._partial_flush_min_spans,
self._endpoint_call_counter_span_processor,
self._span_processors, self._appsec_processor = _default_span_processors_factory(
self._endpoint_call_counter_span_processor
)
self._span_aggregator = SpanAggregator(
partial_flush_enabled=config._partial_flush_enabled,
partial_flush_min_spans=config._partial_flush_min_spans,
dd_processors=[PeerServiceProcessor(_ps_config), BaseServiceProcessor()],
)
if config._data_streams_enabled:
# Inline the import to avoid pulling in ddsketch or protobuf
Expand Down Expand Up @@ -389,13 +368,6 @@ def configure(
if compute_stats_enabled is not None:
config._trace_compute_stats = compute_stats_enabled

if isinstance(self._span_aggregator.writer, AgentWriter):
if appsec_enabled:
self._span_aggregator.writer._api_version = "v0.4"

if trace_processors:
self._user_trace_processors = trace_processors

if any(
x is not None
for x in [
Expand All @@ -405,7 +377,9 @@ def configure(
iast_enabled,
]
):
self._recreate()
self._recreate(
trace_processors, compute_stats_enabled, asm_config._apm_opt_out, appsec_enabled, reset_buffer=False
)

if context_provider is not None:
self.context_provider = context_provider
Expand Down Expand Up @@ -433,31 +407,31 @@ def _generate_diagnostic_logs(self):

def _child_after_fork(self):
self._pid = getpid()
self._recreate()
self._recreate(reset_buffer=True)
self._new_process = True

def _recreate(self):
"""Re-initialize the tracer's processors and trace writer. This method should only be used in tests."""
def _recreate(
self,
trace_processors: Optional[List[TraceProcessor]] = None,
compute_stats_enabled: Optional[bool] = None,
apm_opt_out: Optional[bool] = None,
appsec_enabled: Optional[bool] = None,
reset_buffer: bool = True,
) -> None:
"""Re-initialize the tracer's processors and trace writer"""
# Stop the writer.
# This will stop the periodic thread in HTTPWriters, preventing memory leaks and unnecessary I/O.
try:
self._span_aggregator.writer.stop()
except ServiceStatusError:
# Some writers (ex: AgentWriter), start when the first trace chunk is encoded. Stopping
# the writer before that point will raise a ServiceStatusError.
pass
# Re-create the background writer thread
rules = self._span_aggregator.sampling_processor.sampler._by_service_samplers
self._span_aggregator.writer = self._span_aggregator.writer.recreate()
self.enabled = config._tracing_enabled
self._span_processors, self._appsec_processor, self._span_aggregator = _default_span_processors_factory(
self._user_trace_processors,
self._span_aggregator.writer,
self._span_aggregator.partial_flush_enabled,
self._span_aggregator.partial_flush_min_spans,
self._span_aggregator.reset(
user_processors=trace_processors,
compute_stats=compute_stats_enabled,
apm_opt_out=apm_opt_out,
appsec_enabled=appsec_enabled,
reset_buffer=reset_buffer,
)
self._span_processors, self._appsec_processor = _default_span_processors_factory(
self._endpoint_call_counter_span_processor,
)
self._span_aggregator.sampling_processor.sampler._by_service_samplers = rules.copy()

def _start_span_after_shutdown(
self,
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/internal/ci_visibility/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ def disable(cls) -> None:
log.debug("%s disabled", cls.__name__)

def _start_service(self) -> None:
tracer_filters = self.tracer._user_trace_processors
tracer_filters = self.tracer._span_aggregator.user_processors
if not any(isinstance(tracer_filter, TraceCiVisibilityFilter) for tracer_filter in tracer_filters):
tracer_filters += [TraceCiVisibilityFilter(self._tags, self._service)] # type: ignore[arg-type]
self.tracer.configure(trace_processors=tracer_filters)
Expand Down
1 change: 1 addition & 0 deletions ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ def recreate(self) -> HTTPWriter:
api_version=self._api_version,
headers=self._headers,
report_metrics=self._report_metrics,
response_callback=self._response_cb,
)
return new_instance

Expand Down
2 changes: 1 addition & 1 deletion ddtrace/opentracer/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(
trace_processors = None
if isinstance(self._config.get(keys.SETTINGS), dict) and self._config[keys.SETTINGS].get("FILTERS"): # type: ignore[union-attr]
trace_processors = self._config[keys.SETTINGS]["FILTERS"] # type: ignore[index]
self._dd_tracer._user_trace_processors = trace_processors
self._dd_tracer._span_aggregator.user_processors = trace_processors

if self._config[keys.ENABLED]:
self._dd_tracer.enabled = self._config[keys.ENABLED]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing: Resolves a sampling issue where agent-based sampling rates were not correctly applied after a process forked or the tracer was reconfigured.
Loading
Loading