Skip to content

Commit 092eaaa

Browse files
tduncanlaurit
andauthored
Stop Snapshot Profiler On OpenTelemetry SDK Shutdown (#2262)
* Define StackTraceSampler as Closeable. * Stop all running samplers and prevent new trace samplers from being started when ScheduledExecutorStackTraceSampler is closed. * Define StagingArea as Closeable and have AccumulatingStagingArea prevent new StackTraces from being staged when closed as well as export every staged StackTrace. * Define StackTraceExporter as Closeable and have AsyncStackTraceExporter prevent new StackTraces from being exported when closed. * Add the StackTraceSamplerProvider. * Referece the StackTraceSampler via the StackTraceSamplerProvider in SnapshotProfilingSpanProcessor. * Add the StagingAreaProvider. * Referece the StagingArea via the StagingAreaProvider in AccumulatingStagingArea. * Add the Closer class. * Add the CloserProvider. * Add the SpanProcessor to hook into the OpenTelemetry SDK shutdown process. * Add test to verify that snapshot profiling is stopped when the OpenTelemetry SDK is shutdown. * Remove unused classes. * Add the generic ConfigurableSupplier class. * Add constant ConfigurableSupplier instances available on SpanTracker, StackTraceSampler, StagingArea, and StackTraceExporter. * Rename tests. * Apply spotless code formatting. * Initialize a ConfigurableSupplier value to the defaultValue. * Mark the 'value' in ConfigurableSupplier volatile. * Maintain a 'closed' field in AsyncStackTraceExporter and avoid submitting new stack trace export jobs when closed is true. * Wait for schedulers to fully shutdown in ScheduledExecutorStackTraceSampler and AsyncStackTraceExporter before returning from the 'close' method. * Wait for shutdown prior to emptying staging area. * Apply spotless code formatting. * Apply spotless code formatting. * fix merge * Update tests to compare staged stack traces after ScheduledExecutorStackTraceSampler has been stopped. --------- Co-authored-by: Lauri Tulmin <[email protected]>
1 parent 569d98b commit 092eaaa

25 files changed

+526
-146
lines changed

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/AccumulatingStagingArea.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,18 @@
2525
class AccumulatingStagingArea implements StagingArea {
2626
private final ConcurrentMap<String, Queue<StackTrace>> stackTraces = new ConcurrentHashMap<>();
2727
private final Supplier<StackTraceExporter> exporter;
28+
private volatile boolean closed = false;
2829

2930
AccumulatingStagingArea(Supplier<StackTraceExporter> exporter) {
3031
this.exporter = exporter;
3132
}
3233

3334
@Override
3435
public void stage(String traceId, StackTrace stackTrace) {
36+
if (closed) {
37+
return;
38+
}
39+
3540
stackTraces.compute(
3641
traceId,
3742
(id, stackTraces) -> {
@@ -50,4 +55,11 @@ public void empty(String traceId) {
5055
exporter.get().export(stackTraces);
5156
}
5257
}
58+
59+
@Override
60+
public void close() {
61+
closed = true;
62+
stackTraces.values().forEach(exporter.get()::export);
63+
stackTraces.clear();
64+
}
5365
}

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/AsyncStackTraceExporter.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.opentelemetry.api.logs.Logger;
2424
import java.util.Collection;
2525
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.TimeUnit;
2627
import java.util.logging.Level;
2728

2829
class AsyncStackTraceExporter implements StackTraceExporter {
@@ -33,6 +34,7 @@ class AsyncStackTraceExporter implements StackTraceExporter {
3334
HelpfulExecutors.newSingleThreadExecutor("async-stack-trace-exporter");
3435
private final Logger otelLogger;
3536
private final int maxDepth;
37+
private volatile boolean closed = false;
3638

3739
AsyncStackTraceExporter(Logger logger, int maxDepth) {
3840
this.otelLogger = logger;
@@ -41,9 +43,27 @@ class AsyncStackTraceExporter implements StackTraceExporter {
4143

4244
@Override
4345
public void export(Collection<StackTrace> stackTraces) {
46+
if (closed) {
47+
return;
48+
}
4449
executor.submit(pprofExporter(otelLogger, stackTraces));
4550
}
4651

52+
@Override
53+
public void close() {
54+
closed = true;
55+
56+
try {
57+
executor.shutdown();
58+
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
59+
executor.shutdownNow();
60+
}
61+
} catch (InterruptedException e) {
62+
executor.shutdownNow();
63+
Thread.currentThread().interrupt();
64+
}
65+
}
66+
4767
private Runnable pprofExporter(Logger otelLogger, Collection<StackTrace> stackTraces) {
4868
return () -> {
4969
try {

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SpanTrackerProvider.java renamed to profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/ConfigurableSupplier.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,25 @@
1919
import java.util.Objects;
2020
import java.util.function.Supplier;
2121

22-
class SpanTrackerProvider implements Supplier<SpanTracker> {
23-
public static final SpanTrackerProvider INSTANCE = new SpanTrackerProvider();
22+
class ConfigurableSupplier<T> implements Supplier<T> {
23+
private final T defaultValue;
24+
private volatile T value;
2425

25-
private SpanTracker tracker = SpanTracker.NOOP;
26+
ConfigurableSupplier(T defaultValue) {
27+
this.defaultValue = Objects.requireNonNull(defaultValue);
28+
this.value = defaultValue;
29+
}
2630

2731
@Override
28-
public SpanTracker get() {
29-
return tracker;
32+
public T get() {
33+
return value;
3034
}
3135

32-
void configure(SpanTracker tracker) {
33-
this.tracker = Objects.requireNonNull(tracker);
36+
void configure(T value) {
37+
this.value = Objects.requireNonNull(value);
3438
}
3539

36-
private SpanTrackerProvider() {}
40+
void reset() {
41+
this.value = defaultValue;
42+
}
3743
}

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/InterceptingContextStorageSpanTrackingActivator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void activate(TraceRegistry registry) {
3939
contextStorageWrappingFunction.accept(
4040
contextStorage -> {
4141
ActiveSpanTracker tracker = new ActiveSpanTracker(contextStorage, registry);
42-
SpanTrackerProvider.INSTANCE.configure(tracker);
42+
SpanTracker.SUPPLIER.configure(tracker);
4343
return tracker;
4444
});
4545
}

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSampler.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,26 @@ class ScheduledExecutorStackTraceSampler implements StackTraceSampler {
3838

3939
private final ConcurrentMap<String, ThreadSampler> samplers = new ConcurrentHashMap<>();
4040
private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
41-
private final StagingArea stagingArea;
41+
private final Supplier<StagingArea> stagingArea;
4242
private final Supplier<SpanTracker> spanTracker;
4343
private final Duration samplingPeriod;
44+
private volatile boolean closed = false;
4445

4546
ScheduledExecutorStackTraceSampler(
46-
StagingArea stagingArea, Supplier<SpanTracker> spanTracker, Duration samplingPeriod) {
47+
Supplier<StagingArea> stagingArea,
48+
Supplier<SpanTracker> spanTracker,
49+
Duration samplingPeriod) {
4750
this.stagingArea = stagingArea;
4851
this.spanTracker = spanTracker;
4952
this.samplingPeriod = samplingPeriod;
5053
}
5154

5255
@Override
5356
public void start(SpanContext spanContext) {
57+
if (closed) {
58+
return;
59+
}
60+
5461
samplers.computeIfAbsent(
5562
spanContext.getTraceId(), id -> new ThreadSampler(spanContext, samplingPeriod));
5663
}
@@ -62,13 +69,34 @@ public void stop(SpanContext spanContext) {
6269
(traceId, sampler) -> {
6370
if (spanContext.equals(sampler.getSpanContext())) {
6471
sampler.shutdown();
65-
stagingArea.empty(spanContext.getTraceId());
72+
waitForShutdown(sampler);
73+
stagingArea.get().empty(spanContext.getTraceId());
6674
return null;
6775
}
6876
return sampler;
6977
});
7078
}
7179

80+
@Override
81+
public void close() {
82+
closed = true;
83+
84+
samplers.values().forEach(ThreadSampler::shutdown);
85+
samplers.values().forEach(this::waitForShutdown);
86+
samplers.clear();
87+
}
88+
89+
private void waitForShutdown(ThreadSampler sampler) {
90+
try {
91+
if (!sampler.awaitTermination(samplingPeriod.multipliedBy(2))) {
92+
sampler.shutdownNow();
93+
}
94+
} catch (InterruptedException e) {
95+
sampler.shutdownNow();
96+
Thread.currentThread().interrupt();
97+
}
98+
}
99+
72100
private class ThreadSampler {
73101
private final ScheduledExecutorService scheduler;
74102
private final SpanContext spanContext;
@@ -91,6 +119,14 @@ void shutdown() {
91119
gatherer.run();
92120
}
93121

122+
void shutdownNow() {
123+
scheduler.shutdownNow();
124+
}
125+
126+
boolean awaitTermination(Duration timeout) throws InterruptedException {
127+
return scheduler.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
128+
}
129+
94130
SpanContext getSpanContext() {
95131
return spanContext;
96132
}
@@ -117,7 +153,7 @@ public void run() {
117153
String spanId = retrieveActiveSpan(thread).getSpanId();
118154
StackTrace stackTrace =
119155
StackTrace.from(Instant.now(), samplingPeriod, threadInfo, traceId, spanId);
120-
stagingArea.stage(traceId, stackTrace);
156+
stagingArea.get().stage(traceId, stackTrace);
121157
} catch (Exception e) {
122158
logger.log(Level.SEVERE, e, samplerErrorMessage(traceId, thread.getId()));
123159
} finally {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
import io.opentelemetry.context.Context;
20+
import io.opentelemetry.sdk.common.CompletableResultCode;
21+
import io.opentelemetry.sdk.trace.ReadWriteSpan;
22+
import io.opentelemetry.sdk.trace.ReadableSpan;
23+
import io.opentelemetry.sdk.trace.SpanProcessor;
24+
import java.io.Closeable;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
28+
class SdkShutdownHook implements SpanProcessor {
29+
@Override
30+
public CompletableResultCode shutdown() {
31+
List<CompletableResultCode> results = new ArrayList<>();
32+
results.add(close(StackTraceSampler.SUPPLIER.get()));
33+
results.add(close(StagingArea.SUPPLIER.get()));
34+
results.add(close(StackTraceExporter.SUPPLIER.get()));
35+
return CompletableResultCode.ofAll(results);
36+
}
37+
38+
private CompletableResultCode close(Closeable closeable) {
39+
try {
40+
closeable.close();
41+
return CompletableResultCode.ofSuccess();
42+
} catch (Exception e) {
43+
return CompletableResultCode.ofExceptionalFailure(e);
44+
}
45+
}
46+
47+
@Override
48+
public void onStart(Context parentContext, ReadWriteSpan span) {}
49+
50+
@Override
51+
public boolean isStartRequired() {
52+
return false;
53+
}
54+
55+
@Override
56+
public void onEnd(ReadableSpan span) {}
57+
58+
@Override
59+
public boolean isEndRequired() {
60+
return false;
61+
}
62+
}

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizer.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,9 @@ public SnapshotProfilingSdkCustomizer() {
4747
private static Function<ConfigProperties, StackTraceSampler> stackTraceSamplerProvider() {
4848
return properties -> {
4949
Duration samplingPeriod = Configuration.getSnapshotProfilerSamplingInterval(properties);
50-
return new ScheduledExecutorStackTraceSampler(
51-
new AccumulatingStagingArea(StackTraceExporterProvider.INSTANCE),
52-
SpanTrackerProvider.INSTANCE,
53-
samplingPeriod);
50+
ConfigurableSupplier<StagingArea> supplier = StagingArea.SUPPLIER;
51+
supplier.configure(new AccumulatingStagingArea(StackTraceExporter.SUPPLIER));
52+
return new ScheduledExecutorStackTraceSampler(supplier, SpanTracker.SUPPLIER, samplingPeriod);
5453
};
5554
}
5655

@@ -74,15 +73,28 @@ public void customize(AutoConfigurationCustomizer autoConfigurationCustomizer) {
7473
autoConfigurationCustomizer
7574
.addPropertiesCustomizer(autoConfigureSnapshotVolumePropagator())
7675
.addTracerProviderCustomizer(snapshotProfilingSpanProcessor(registry))
77-
.addPropertiesCustomizer(startTrackingActiveSpans(registry));
76+
.addPropertiesCustomizer(startTrackingActiveSpans(registry))
77+
.addTracerProviderCustomizer(addShutdownHook());
78+
}
79+
80+
private BiFunction<SdkTracerProviderBuilder, ConfigProperties, SdkTracerProviderBuilder>
81+
addShutdownHook() {
82+
return (builder, properties) -> {
83+
if (snapshotProfilingEnabled(properties)) {
84+
builder.addSpanProcessor(new SdkShutdownHook());
85+
}
86+
return builder;
87+
};
7888
}
7989

8090
private BiFunction<SdkTracerProviderBuilder, ConfigProperties, SdkTracerProviderBuilder>
8191
snapshotProfilingSpanProcessor(TraceRegistry registry) {
8292
return (builder, properties) -> {
8393
if (snapshotProfilingEnabled(properties)) {
8494
StackTraceSampler sampler = samplerProvider.apply(properties);
85-
return builder.addSpanProcessor(new SnapshotProfilingSpanProcessor(registry, sampler));
95+
ConfigurableSupplier<StackTraceSampler> supplier = StackTraceSampler.SUPPLIER;
96+
supplier.configure(sampler);
97+
return builder.addSpanProcessor(new SnapshotProfilingSpanProcessor(registry, supplier));
8698
}
8799
return builder;
88100
};

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSpanProcessor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.opentelemetry.sdk.trace.ReadWriteSpan;
2424
import io.opentelemetry.sdk.trace.ReadableSpan;
2525
import io.opentelemetry.sdk.trace.SpanProcessor;
26+
import java.util.function.Supplier;
2627

2728
/**
2829
* Custom {@link SpanProcessor} implementation that will 1. register traces for snapshot profiling
@@ -37,9 +38,9 @@
3738
*/
3839
public class SnapshotProfilingSpanProcessor implements SpanProcessor {
3940
private final TraceRegistry registry;
40-
private final StackTraceSampler sampler;
41+
private final Supplier<StackTraceSampler> sampler;
4142

42-
SnapshotProfilingSpanProcessor(TraceRegistry registry, StackTraceSampler sampler) {
43+
SnapshotProfilingSpanProcessor(TraceRegistry registry, Supplier<StackTraceSampler> sampler) {
4344
this.registry = registry;
4445
this.sampler = sampler;
4546
}
@@ -54,7 +55,7 @@ public void onStart(Context context, ReadWriteSpan span) {
5455
}
5556

5657
if (isEntry(span) && registry.isRegistered(span.getSpanContext())) {
57-
sampler.start(span.getSpanContext());
58+
sampler.get().start(span.getSpanContext());
5859
span.setAttribute(SNAPSHOT_PROFILING, true);
5960
}
6061
}
@@ -75,7 +76,7 @@ public boolean isStartRequired() {
7576
public void onEnd(ReadableSpan span) {
7677
if (isEntry(span)) {
7778
registry.unregister(span.getSpanContext());
78-
sampler.stop(span.getSpanContext());
79+
sampler.get().stop(span.getSpanContext());
7980
}
8081
}
8182

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SpanTracker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
interface SpanTracker {
2323
SpanTracker NOOP = thread -> Optional.empty();
24+
ConfigurableSupplier<SpanTracker> SUPPLIER = new ConfigurableSupplier<>(SpanTracker.NOOP);
2425

2526
Optional<SpanContext> getActiveSpan(Thread thread);
2627
}

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/StackTraceExporter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616

1717
package com.splunk.opentelemetry.profiler.snapshot;
1818

19+
import java.io.Closeable;
1920
import java.util.Collection;
2021

2122
/** Works in concert with the {@link StagingArea} to export a batch of {@link StackTrace}s */
22-
interface StackTraceExporter {
23+
interface StackTraceExporter extends Closeable {
2324
StackTraceExporter NOOP = stackTraces -> {};
25+
ConfigurableSupplier<StackTraceExporter> SUPPLIER = new ConfigurableSupplier<>(NOOP);
2426

2527
void export(Collection<StackTrace> stackTraces);
28+
29+
default void close() {}
2630
}

0 commit comments

Comments
 (0)