Skip to content

Commit ef0396b

Browse files
authored
Collect Callstacks For Threads Processing Traces (#2207)
* Add the StackTraceSampler interface. * Add a ScheduledExecutor implementation of the StackTraceSampler interface. * Start and stop profiling when entry spans start and end respectively. * Add a noop StagingArea implementation and test builder for the SDK customizer to insulate individual tests from additional fields being added to the customizer. * Use a concurrent map in ScheduledExecutorStackTraceSampler. * Apply spotless code formatting. * Convert SAMPLING_PERIOD constant to Duration. * Remove debug logging from ScheduledExecutorStackTraceSampler. * InMemoryStagingArea doesn't need to be a JUnit extension. * Remove unused JUnit extension method from ObservableStackTraceSampler. * Rename LOGGER to lowercase. * Log explanatory message and stacktrace if something goes wrong collecting callstacks from profilied threads. * Add explanatory comment to SnapshotProfilingSpanProcessor. * Expand the StackTraceSampler interface to accept a SpanContext to eventually track traces, not only thread IDs. * Associate thread profilers with trace IDs to account for different threads managing a spans lifecycle (e.g. async web frameworks) and avoid a resource leak and runaway profiling thread. * Add test verifying that ending a span from a different thread will stop trace profiling. * Only allow a single thread per trace to be profiled at a time. * Rename the 'Profilng' class to 'Snapshotting'. * Attempt to make the explanation about the multithreaded trace limitation more clear.
1 parent 6643a17 commit ef0396b

20 files changed

+722
-49
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
class NoopStagingArea implements StagingArea {
20+
@Override
21+
public void stage(String traceId, StackTrace stackTrace) {}
22+
23+
@Override
24+
public void empty(String traceId) {}
25+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
import com.google.common.annotations.VisibleForTesting;
20+
import io.opentelemetry.api.trace.SpanContext;
21+
import java.lang.management.ManagementFactory;
22+
import java.lang.management.ThreadInfo;
23+
import java.lang.management.ThreadMXBean;
24+
import java.time.Duration;
25+
import java.time.Instant;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.ConcurrentMap;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ScheduledExecutorService;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.function.Supplier;
32+
import java.util.logging.Level;
33+
import java.util.logging.Logger;
34+
35+
class ScheduledExecutorStackTraceSampler implements StackTraceSampler {
36+
private static final Logger logger =
37+
Logger.getLogger(ScheduledExecutorStackTraceSampler.class.getName());
38+
private static final int SCHEDULER_INITIAL_DELAY = 0;
39+
private static final Duration SCHEDULER_PERIOD = Duration.ofMillis(20);
40+
private static final int MAX_ENTRY_DEPTH = 200;
41+
42+
private final ConcurrentMap<String, ScheduledExecutorService> samplers =
43+
new ConcurrentHashMap<>();
44+
private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
45+
private final StagingArea stagingArea;
46+
private final Duration samplingPeriod;
47+
48+
ScheduledExecutorStackTraceSampler(StagingArea stagingArea) {
49+
this(stagingArea, SCHEDULER_PERIOD);
50+
}
51+
52+
@VisibleForTesting
53+
ScheduledExecutorStackTraceSampler(StagingArea stagingArea, Duration samplingPeriod) {
54+
this.stagingArea = stagingArea;
55+
this.samplingPeriod = samplingPeriod;
56+
}
57+
58+
@Override
59+
public void start(SpanContext spanContext) {
60+
if (samplers.containsKey(spanContext.getTraceId())) {
61+
return;
62+
}
63+
64+
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
65+
samplers.put(spanContext.getTraceId(), scheduler);
66+
scheduler.scheduleAtFixedRate(
67+
new StackTraceGatherer(spanContext.getTraceId(), Thread.currentThread().getId()),
68+
SCHEDULER_INITIAL_DELAY,
69+
samplingPeriod.toMillis(),
70+
TimeUnit.MILLISECONDS);
71+
}
72+
73+
@Override
74+
public void stop(SpanContext spanContext) {
75+
ScheduledExecutorService scheduler = samplers.remove(spanContext.getTraceId());
76+
if (scheduler != null) {
77+
scheduler.shutdown();
78+
}
79+
stagingArea.empty(spanContext.getTraceId());
80+
}
81+
82+
class StackTraceGatherer implements Runnable {
83+
private final String traceId;
84+
private final long threadId;
85+
86+
StackTraceGatherer(String traceId, long threadId) {
87+
this.traceId = traceId;
88+
this.threadId = threadId;
89+
}
90+
91+
@Override
92+
public void run() {
93+
Instant now = Instant.now();
94+
try {
95+
ThreadInfo threadInfo = threadMXBean.getThreadInfo(threadId, MAX_ENTRY_DEPTH);
96+
StackTrace stackTrace = StackTrace.from(now, threadInfo);
97+
stagingArea.stage(traceId, stackTrace);
98+
} catch (Exception e) {
99+
logger.log(Level.SEVERE, e, samplerErrorMessage(traceId, threadId));
100+
}
101+
}
102+
103+
private Supplier<String> samplerErrorMessage(String traceId, long threadId) {
104+
return () ->
105+
"Exception thrown attempting to stage callstacks for trace ID ' "
106+
+ traceId
107+
+ "' on profiled thread "
108+
+ threadId;
109+
}
110+
}
111+
}

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@
2929
@AutoService(AutoConfigurationCustomizerProvider.class)
3030
public class SnapshotProfilingSdkCustomizer implements AutoConfigurationCustomizerProvider {
3131
private final TraceRegistry registry;
32+
private final StackTraceSampler sampler;
3233

3334
public SnapshotProfilingSdkCustomizer() {
34-
this(new TraceRegistry());
35+
this(new TraceRegistry(), new ScheduledExecutorStackTraceSampler(new NoopStagingArea()));
3536
}
3637

3738
@VisibleForTesting
38-
SnapshotProfilingSdkCustomizer(TraceRegistry registry) {
39+
SnapshotProfilingSdkCustomizer(TraceRegistry registry, StackTraceSampler sampler) {
3940
this.registry = registry;
41+
this.sampler = sampler;
4042
}
4143

4244
@Override
@@ -49,7 +51,7 @@ public void customize(AutoConfigurationCustomizer autoConfigurationCustomizer) {
4951
snapshotProfilingSpanProcessor(TraceRegistry registry) {
5052
return (builder, properties) -> {
5153
if (snapshotProfilingEnabled(properties)) {
52-
return builder.addSpanProcessor(new SnapshotProfilingSpanProcessor(registry));
54+
return builder.addSpanProcessor(new SnapshotProfilingSpanProcessor(registry, sampler));
5355
}
5456
return builder;
5557
};

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSpanProcessor.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,24 @@
2424
import io.opentelemetry.sdk.trace.ReadableSpan;
2525
import io.opentelemetry.sdk.trace.SpanProcessor;
2626

27+
/**
28+
* Custom {@link SpanProcessor} implementation that will 1. register traces for snapshot profiling
29+
* and 2. activate profiling for the thread processing the trace. <br>
30+
* <br>
31+
* <b>Implementation Note</b><br>
32+
* The current snapshot profiling extension supports profiling one thread per trace at a time. If
33+
* the trace spans multiple threads -- whether because a service is invoked multiple time
34+
* concurrently by an upstream caller or work is delegated to background threads -- only a single
35+
* thread associated with that trace (specifically the thread associated with the "Entry" span) will
36+
* be profiled at a time.
37+
*/
2738
public class SnapshotProfilingSpanProcessor implements SpanProcessor {
2839
private final TraceRegistry registry;
40+
private final StackTraceSampler sampler;
2941

30-
SnapshotProfilingSpanProcessor(TraceRegistry registry) {
42+
SnapshotProfilingSpanProcessor(TraceRegistry registry, StackTraceSampler sampler) {
3143
this.registry = registry;
44+
this.sampler = sampler;
3245
}
3346

3447
@Override
@@ -41,6 +54,7 @@ public void onStart(Context context, ReadWriteSpan span) {
4154
}
4255

4356
if (isEntry(span) && registry.isRegistered(span.getSpanContext())) {
57+
sampler.start(span.getSpanContext());
4458
span.setAttribute(SNAPSHOT_PROFILING, true);
4559
}
4660
}
@@ -61,6 +75,7 @@ public boolean isStartRequired() {
6175
public void onEnd(ReadableSpan span) {
6276
if (isEntry(span)) {
6377
registry.unregister(span.getSpanContext());
78+
sampler.stop(span.getSpanContext());
6479
}
6580
}
6681

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
import java.lang.management.ThreadInfo;
20+
import java.time.Instant;
21+
22+
class StackTrace {
23+
static StackTrace from(Instant timestamp, ThreadInfo thread) {
24+
return new StackTrace(
25+
timestamp, thread.getThreadId(), thread.getThreadName(), thread.getStackTrace());
26+
}
27+
28+
private final Instant timestamp;
29+
private final long threadId;
30+
private final String threadName;
31+
private final StackTraceElement[] stackFrames;
32+
33+
private StackTrace(
34+
Instant timestamp, long threadId, String threadName, StackTraceElement[] stackFrames) {
35+
this.timestamp = timestamp;
36+
this.threadId = threadId;
37+
this.threadName = threadName;
38+
this.stackFrames = stackFrames;
39+
}
40+
41+
Instant getTimestamp() {
42+
return timestamp;
43+
}
44+
45+
long getThreadId() {
46+
return threadId;
47+
}
48+
49+
String getThreadName() {
50+
return threadName;
51+
}
52+
53+
StackTraceElement[] getStackFrames() {
54+
return stackFrames;
55+
}
56+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
import io.opentelemetry.api.trace.SpanContext;
20+
21+
interface StackTraceSampler {
22+
void start(SpanContext spanContext);
23+
24+
void stop(SpanContext spanContext);
25+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
/**
20+
* Acts as a location to stockpile gathered {@link StackTrace}s segmented by thread ID for bulk
21+
* exportation at some later point in time.
22+
*/
23+
interface StagingArea {
24+
void stage(String traceId, StackTrace stackTrace);
25+
26+
void empty(String traceId);
27+
}

profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/DistributedProfilingSignalTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
class DistributedProfilingSignalTest {
3232
private final RecordingTraceRegistry downstreamRegistry = new RecordingTraceRegistry();
3333
private final SnapshotProfilingSdkCustomizer downstreamCustomizer =
34-
new SnapshotProfilingSdkCustomizer(downstreamRegistry);
34+
Snapshotting.customizer().with(downstreamRegistry).build();
3535

3636
@RegisterExtension
3737
public final OpenTelemetrySdkExtension downstreamSdk =
@@ -57,7 +57,7 @@ class DistributedProfilingSignalTest {
5757

5858
private final RecordingTraceRegistry upstreamRegistry = new RecordingTraceRegistry();
5959
private final SnapshotProfilingSdkCustomizer upstreamCustomizer =
60-
new SnapshotProfilingSdkCustomizer(upstreamRegistry);
60+
Snapshotting.customizer().with(upstreamRegistry).build();
6161

6262
@RegisterExtension
6363
public final OpenTelemetrySdkExtension upstreamSdk =
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.stream.Collectors;
24+
25+
/**
26+
* In memory implementation of the {@link StagingArea} interface that allows for direct access to
27+
* the stockpiled {@link StackTrace}s. Intended for testing use only.
28+
*/
29+
class InMemoryStagingArea implements StagingArea {
30+
private final Map<String, List<StackTrace>> stackTraces = new ConcurrentHashMap<>();
31+
32+
@Override
33+
public void stage(String traceId, StackTrace stackTrace) {
34+
stackTraces.compute(
35+
traceId,
36+
(id, stackTraces) -> {
37+
if (stackTraces == null) {
38+
stackTraces = new ArrayList<>();
39+
}
40+
stackTraces.add(stackTrace);
41+
return stackTraces;
42+
});
43+
}
44+
45+
@Override
46+
public void empty(String traceId) {
47+
stackTraces.remove(traceId);
48+
}
49+
50+
public List<StackTrace> allStackTraces() {
51+
return stackTraces.values().stream().flatMap(List::stream).collect(Collectors.toList());
52+
}
53+
}

0 commit comments

Comments
 (0)