Skip to content
Closed
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a0d487f
Add TraceRegistry interface.
tduncan May 6, 2025
e4c0258
Have TraceRegistry and its extenders implement the ITraceRegistry int…
tduncan May 6, 2025
0f35d76
Replace references to TraceRegistry with the ITraceRegistry interface.
tduncan May 6, 2025
2b0700a
Remove TraceRegistry inheritence from TogglableTraceRegistry.
tduncan May 6, 2025
2600c75
Remove TraceRegistry inheritence from RecordingTraceRegistry.
tduncan May 6, 2025
32c296e
Rename TraceRegistry to SimpleTraceRegistry.
tduncan May 6, 2025
466173f
Rename ITraceRegistry to TraceRegistry.
tduncan May 6, 2025
7602fe7
Add @Override annotations in SimpleTraceRegistry.
tduncan May 6, 2025
60dc92e
Add basic TraceRegistry implementation for unregistering stalled traces.
tduncan May 6, 2025
a52393c
Add test verifying that stalled traces are continually detected.
tduncan May 6, 2025
d06cf23
Stop stack trace sampling when stalled trace is automatically unregis…
tduncan May 7, 2025
88e7681
Add configuration property for snapshot profiling stalled trace timeout.
tduncan May 7, 2025
c818959
Expand the TraceRegistry interface extend AutoCloseable.
tduncan May 8, 2025
4e170f2
Unregister all traces when closed.
tduncan May 8, 2025
099096e
Close encapsulated TraceRegistry delegate when StalledTraceDetectingT…
tduncan May 8, 2025
43db5f5
Define a noop TraceRegistry and ConfigurableSupplier.
tduncan May 8, 2025
4924bb2
Access TraceRegistry via a ConfigurableSupplier.
tduncan May 23, 2025
b6e8b76
Use a daemon thread rather than a ScheduledExecutorService for detect…
tduncan May 23, 2025
a534305
Apply spotless code formatting.
tduncan May 23, 2025
8858d7a
If if/else rather than switch.
tduncan May 23, 2025
1054b75
Close TraceRegistry when OpenTelemetry SDK shutdown.
tduncan May 23, 2025
af1beab
Detect orphaned traces using WeakReferences and a ReferenceQueue rath…
tduncan May 27, 2025
221b5d6
Introduce the TraceProfilingContext in ScheduledExecutorStackTraceSam…
tduncan May 27, 2025
0a5e890
Rename class.
tduncan May 28, 2025
e805635
Extract inner class to first class citizen.
tduncan May 28, 2025
a9bce31
Use ProfilingSpanContext reference within ScheduledExecutorStackTrace…
tduncan May 28, 2025
c373d11
Use ProfilingSpanContext in ActiveSpanTracker.
tduncan May 28, 2025
d462bdc
Define an invalid ProfilingSpanContext.
tduncan May 28, 2025
204e9ab
Add test requiring that stack trace sampling be stopped when an orpha…
tduncan May 28, 2025
c8fac10
Apply spotless code formatting.
tduncan May 29, 2025
9b229d7
Update javadoc comment.
tduncan May 29, 2025
0cc5af7
Avoid creating weak references to a SpanContext when a trace is being…
tduncan May 30, 2025
8d4b2ad
Add missing equals method to correctly remove entries from the set in…
tduncan Jun 6, 2025
87db84a
Verify that sampling starts and is later stopped when an orphaned spa…
tduncan Jun 6, 2025
77679ff
Apply spotless code formatting.
tduncan Jun 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,34 @@
import io.opentelemetry.context.ContextStorage;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.internal.cache.Cache;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nullable;

class ActiveSpanTracker implements ContextStorage, SpanTracker {
private final Cache<Thread, SpanContext> cache = Cache.weak();
private final Cache<Thread, ProfilingSpanContext> cache = Cache.weak();

private final ContextStorage delegate;
private final TraceRegistry registry;
private final Supplier<TraceRegistry> registry;

ActiveSpanTracker(ContextStorage delegate, TraceRegistry registry) {
ActiveSpanTracker(ContextStorage delegate, Supplier<TraceRegistry> registry) {
this.delegate = delegate;
this.registry = registry;
}

@Override
public Scope attach(Context toAttach) {
Scope scope = delegate.attach(toAttach);
SpanContext newSpanContext = Span.fromContext(toAttach).getSpanContext();
if (doNotTrack(newSpanContext)) {
SpanContext spanContext = Span.fromContext(toAttach).getSpanContext();
if (doNotTrack(spanContext)) {
return scope;
}

Thread thread = Thread.currentThread();
SpanContext oldSpanContext = cache.get(thread);
if (oldSpanContext == newSpanContext) {
ProfilingSpanContext oldSpanContext = cache.get(thread);
ProfilingSpanContext newSpanContext = ProfilingSpanContext.from(spanContext);
if (Objects.equals(oldSpanContext, newSpanContext)) {
return scope;
}

Expand All @@ -62,7 +65,7 @@ public Scope attach(Context toAttach) {
}

private boolean doNotTrack(SpanContext spanContext) {
return !spanContext.isSampled() || !registry.isRegistered(spanContext);
return !spanContext.isSampled() || !registry.get().isRegistered(spanContext);
}

@Nullable
Expand All @@ -71,7 +74,8 @@ public Context current() {
return delegate.current();
}

public Optional<SpanContext> getActiveSpan(Thread thread) {
@Override
public Optional<ProfilingSpanContext> getActiveSpan(Thread thread) {
return Optional.ofNullable(cache.get(thread));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.context.ContextStorage;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

class InterceptingContextStorageSpanTrackingActivator implements SpanTrackingActivator {
Expand All @@ -35,7 +36,7 @@ class InterceptingContextStorageSpanTrackingActivator implements SpanTrackingAct
}

@Override
public void activate(TraceRegistry registry) {
public void activate(Supplier<TraceRegistry> registry) {
contextStorageWrappingFunction.accept(
contextStorage -> {
ActiveSpanTracker tracker = new ActiveSpanTracker(contextStorage, registry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright Splunk Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.splunk.opentelemetry.profiler.snapshot;

import io.opentelemetry.api.trace.SpanContext;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

class OrphanedTraceDetectingTraceRegistry implements TraceRegistry {
private final Set<Key> traces = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();

private final TraceRegistry delegate;
private final Supplier<StackTraceSampler> sampler;
private final Thread thread;

OrphanedTraceDetectingTraceRegistry(TraceRegistry delegate, Supplier<StackTraceSampler> sampler) {
this.delegate = delegate;
this.sampler = sampler;

thread = new Thread(this::unregisterOrphanedTraces);
thread.setName("orphaned-trace-detector");
thread.setDaemon(true);
thread.start();
}

@Override
public void register(SpanContext spanContext) {
delegate.register(spanContext);
traces.add(new WeakSpanContext(spanContext, referenceQueue));
}

@Override
public boolean isRegistered(SpanContext spanContext) {
return delegate.isRegistered(spanContext);
}

@Override
public void unregister(SpanContext spanContext) {
delegate.unregister(spanContext);
traces.remove(new LookupKey(spanContext));
}

public void unregisterOrphanedTraces() {
while (!Thread.interrupted()) {
try {
Object reference = referenceQueue.remove();
if (reference != null) {
Key key = (Key) reference;
traces.remove(key);
delegate.unregister(key.getSpanContext());
sampler.get().stop(key.getSpanContext());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

@Override
public void close() {
delegate.close();
thread.interrupt();
}

interface Key {
SpanContext getSpanContext();
}

private static class LookupKey implements Key {
private final SpanContext spanContext;

private LookupKey(SpanContext spanContext) {
this.spanContext = spanContext;
}

@Override
public SpanContext getSpanContext() {
return spanContext;
}
}

private static class WeakSpanContext extends WeakReference<SpanContext> implements Key {
private final SpanContext spanContext;

public WeakSpanContext(SpanContext referent, ReferenceQueue<Object> q) {
super(referent, q);
this.spanContext =
SpanContext.create(
referent.getTraceId(),
referent.getSpanId(),
referent.getTraceFlags(),
referent.getTraceState());
}

@Override
public SpanContext getSpanContext() {
return spanContext;
}

@Override
public int hashCode() {
return Objects.hashCode(spanContext);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
WeakSpanContext that = (WeakSpanContext) o;
return Objects.equals(spanContext, that.spanContext);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Splunk Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.splunk.opentelemetry.profiler.snapshot;

import io.opentelemetry.api.trace.SpanContext;
import java.util.Objects;

class ProfilingSpanContext {
static final ProfilingSpanContext INVALID = from(SpanContext.getInvalid());

static ProfilingSpanContext from(SpanContext spanContext) {
return new ProfilingSpanContext(spanContext.getTraceId(), spanContext.getSpanId());
}

private final String traceId;
private final String spanId;

private ProfilingSpanContext(String traceId, String spanId) {
this.traceId = traceId;
this.spanId = spanId;
}

public String getTraceId() {
return traceId;
}

public String getSpanId() {
return spanId;
}

@Override
public int hashCode() {
return Objects.hash(traceId, spanId);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ProfilingSpanContext that = (ProfilingSpanContext) o;
return Objects.equals(traceId, that.traceId) && Objects.equals(spanId, that.spanId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,17 @@ public void start(SpanContext spanContext) {
}

samplers.computeIfAbsent(
spanContext.getTraceId(), id -> new ThreadSampler(spanContext, samplingPeriod));
spanContext.getTraceId(),
id -> new ThreadSampler(ProfilingSpanContext.from(spanContext), samplingPeriod));
}

@Override
public void stop(SpanContext spanContext) {
samplers.computeIfPresent(
spanContext.getTraceId(),
(traceId, sampler) -> {
if (spanContext.equals(sampler.getSpanContext())) {
ProfilingSpanContext context = ProfilingSpanContext.from(spanContext);
if (context.equals(sampler.context)) {
sampler.shutdown();
waitForShutdown(sampler);
return null;
Expand Down Expand Up @@ -98,17 +100,15 @@ private void waitForShutdown(ThreadSampler sampler) {

private class ThreadSampler {
private final ScheduledExecutorService scheduler;
private final SpanContext spanContext;
private final ProfilingSpanContext context;
private final StackTraceGatherer gatherer;

ThreadSampler(SpanContext spanContext, Duration samplingPeriod) {
this.spanContext = spanContext;
gatherer =
new StackTraceGatherer(
spanContext.getTraceId(), Thread.currentThread(), System.nanoTime());
ThreadSampler(ProfilingSpanContext context, Duration samplingPeriod) {
this.context = context;
gatherer = new StackTraceGatherer(context, Thread.currentThread(), System.nanoTime());
scheduler =
HelpfulExecutors.newSingleThreadedScheduledExecutor(
"stack-trace-sampler-" + spanContext.getTraceId());
"stack-trace-sampler-" + context.getSpanId());
scheduler.scheduleAtFixedRate(
gatherer, SCHEDULER_INITIAL_DELAY, samplingPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
Expand All @@ -125,19 +125,15 @@ void shutdownNow() {
boolean awaitTermination(Duration timeout) throws InterruptedException {
return scheduler.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

SpanContext getSpanContext() {
return spanContext;
}
}

private class StackTraceGatherer implements Runnable {
private final String traceId;
private final ProfilingSpanContext context;
private final Thread thread;
private volatile long timestampNanos;

StackTraceGatherer(String traceId, Thread thread, long timestampNanos) {
this.traceId = traceId;
StackTraceGatherer(ProfilingSpanContext context, Thread thread, long timestampNanos) {
this.context = context;
this.thread = thread;
this.timestampNanos = timestampNanos;
}
Expand All @@ -151,23 +147,24 @@ public void run() {
Duration samplingPeriod = Duration.ofNanos(currentSampleTimestamp - previousTimestampNanos);
String spanId = retrieveActiveSpan(thread).getSpanId();
StackTrace stackTrace =
StackTrace.from(Instant.now(), samplingPeriod, threadInfo, traceId, spanId);
StackTrace.from(
Instant.now(), samplingPeriod, threadInfo, context.getTraceId(), spanId);
stagingArea.get().stage(stackTrace);
} catch (Exception e) {
logger.log(Level.SEVERE, e, samplerErrorMessage(traceId, thread.getId()));
logger.log(Level.SEVERE, e, samplerErrorMessage(context, thread.getId()));
} finally {
timestampNanos = currentSampleTimestamp;
}
}

private SpanContext retrieveActiveSpan(Thread thread) {
return spanTracker.get().getActiveSpan(thread).orElse(SpanContext.getInvalid());
private ProfilingSpanContext retrieveActiveSpan(Thread thread) {
return spanTracker.get().getActiveSpan(thread).orElse(ProfilingSpanContext.INVALID);
}

private Supplier<String> samplerErrorMessage(String traceId, long threadId) {
private Supplier<String> samplerErrorMessage(ProfilingSpanContext context, long threadId) {
return () ->
"Exception thrown attempting to stage callstacks for trace ID ' "
+ traceId
+ context.getTraceId()
+ "' on profiled thread "
+ threadId;
}
Expand Down
Loading