diff --git a/.fossa.yml b/.fossa.yml index fb2c373fe00e..54dce8319033 100644 --- a/.fossa.yml +++ b/.fossa.yml @@ -382,6 +382,15 @@ targets: - type: gradle path: ./ target: ':instrumentation:cassandra:cassandra-4.4:library' + - type: gradle + path: ./ + target: ':instrumentation:cats-effect:cats-effect-3.6:bootstrap' + - type: gradle + path: ./ + target: ':instrumentation:cats-effect:cats-effect-3.6:javaagent' + - type: gradle + path: ./ + target: ':instrumentation:cats-effect:cats-effect-common-3.6:javaagent' - type: gradle path: ./ target: ':instrumentation:couchbase:couchbase-2-common:javaagent' diff --git a/instrumentation/cats-effect/cats-effect-3.6/bootstrap/build.gradle.kts b/instrumentation/cats-effect/cats-effect-3.6/bootstrap/build.gradle.kts new file mode 100644 index 000000000000..072a96df450f --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-3.6/bootstrap/build.gradle.kts @@ -0,0 +1,3 @@ +plugins { + id("otel.javaagent-bootstrap") +} diff --git a/instrumentation/cats-effect/cats-effect-3.6/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/catseffect/v3_6/FiberLocalContextHelper.java b/instrumentation/cats-effect/cats-effect-3.6/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/catseffect/v3_6/FiberLocalContextHelper.java new file mode 100644 index 000000000000..122d8cc5fe20 --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-3.6/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/catseffect/v3_6/FiberLocalContextHelper.java @@ -0,0 +1,93 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.bootstrap.catseffect.v3_6; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** The helper stores a reference to the IOLocal#unsafeThreadLocal. */ +public final class FiberLocalContextHelper { + + private static final Logger logger = Logger.getLogger(FiberLocalContextHelper.class.getName()); + + private static final AtomicReference> fiberContextThreadLocal = + new AtomicReference<>(); + + private static final AtomicReference> isUnderFiberContextSupplier = + new AtomicReference<>(() -> false); + + public static void initialize( + ThreadLocal fiberThreadLocal, Supplier isUnderFiberContext) { + if (fiberContextThreadLocal.compareAndSet(null, fiberThreadLocal)) { + fiberContextThreadLocal.set(fiberThreadLocal); + isUnderFiberContextSupplier.set(isUnderFiberContext); + logger.fine("The fiberThreadLocalContext is configured"); + } else { + if (!fiberContextThreadLocal.get().equals(fiberThreadLocal)) { + logger.warning( + "The fiberThreadLocalContext is already configured. Ignoring subsequent calls."); + } + } + } + + public static Boolean isUnderFiberContext() { + return isUnderFiberContextSupplier.get().get(); + } + + @Nullable + public static Context current() { + ThreadLocal local = getFiberThreadLocal(); + return local != null ? local.get() : null; + } + + public static Scope attach(Context toAttach) { + ThreadLocal local = fiberContextThreadLocal.get(); + if (toAttach == null || local == null) { + return Scope.noop(); + } else { + Context beforeAttach = current(); + if (toAttach == beforeAttach) { + return Scope.noop(); + } else { + local.set(toAttach); + return new ScopeImpl(beforeAttach, toAttach); + } + } + } + + @Nullable + private static ThreadLocal getFiberThreadLocal() { + return fiberContextThreadLocal.get(); + } + + private static class ScopeImpl implements Scope { + @Nullable private final Context beforeAttach; + private final Context toAttach; + private boolean closed; + + private ScopeImpl(@Nullable Context beforeAttach, Context toAttach) { + this.beforeAttach = beforeAttach; + this.toAttach = toAttach; + } + + @Override + public void close() { + if (!this.closed && FiberLocalContextHelper.current() == this.toAttach) { + this.closed = true; + FiberLocalContextHelper.fiberContextThreadLocal.get().set(this.beforeAttach); + } else { + FiberLocalContextHelper.logger.fine( + "Trying to close scope which does not represent current context. Ignoring the call."); + } + } + } + + private FiberLocalContextHelper() {} +} diff --git a/instrumentation/cats-effect/cats-effect-3.6/javaagent/build.gradle.kts b/instrumentation/cats-effect/cats-effect-3.6/javaagent/build.gradle.kts new file mode 100644 index 000000000000..d6df32a38d4b --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-3.6/javaagent/build.gradle.kts @@ -0,0 +1,60 @@ +plugins { + id("otel.javaagent-instrumentation") + id("otel.nullaway-conventions") + id("otel.scala-conventions") +} + +val scalaVersion = "2.13" +val catsEffectVersion = "3.6.0" + +muzzle { + pass { + group.set("org.typelevel") + module.set("cats-effect_2.12") + versions.set("[$catsEffectVersion,)") + assertInverse.set(true) + extraDependency("io.opentelemetry:opentelemetry-api:1.0.0") + excludeInstrumentationName("opentelemetry-api") + } + pass { + group.set("org.typelevel") + module.set("cats-effect_2.13") + versions.set("[$catsEffectVersion,)") + assertInverse.set(true) + extraDependency("io.opentelemetry:opentelemetry-api:1.0.0") + excludeInstrumentationName("opentelemetry-api") + } + pass { + group.set("org.typelevel") + module.set("cats-effect_3") + versions.set("[$catsEffectVersion,)") + assertInverse.set(true) + extraDependency("io.opentelemetry:opentelemetry-api:1.0.0") + excludeInstrumentationName("opentelemetry-api") + } +} + +dependencies { + bootstrap(project(":instrumentation:cats-effect:cats-effect-3.6:bootstrap")) + + // we need access to the "application.io.opentelemetry.context.Context" + // to properly bridge fiber and agent context storages + compileOnly(project(":opentelemetry-api-shaded-for-instrumenting", configuration = "shadow")) + compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") + implementation(project(":instrumentation:opentelemetry-api:opentelemetry-api-1.0:javaagent")) + + implementation(project(":instrumentation:cats-effect:cats-effect-common-3.6:javaagent")) + + compileOnly("org.typelevel:cats-effect_$scalaVersion:$catsEffectVersion") + + testImplementation("org.typelevel:cats-effect_$scalaVersion:$catsEffectVersion") + + latestDepTestLibrary("org.typelevel:cats-effect_$scalaVersion:latest.release") +} + +tasks { + withType().configureEach { + jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false") + jvmArgs("-Dcats.effect.trackFiberContext=true") + } +} diff --git a/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/CatsEffectInstrumentationModule.java b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/CatsEffectInstrumentationModule.java new file mode 100644 index 000000000000..211ed5237e22 --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/CatsEffectInstrumentationModule.java @@ -0,0 +1,55 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule; +import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; +import java.util.Arrays; +import java.util.List; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(InstrumentationModule.class) +public class CatsEffectInstrumentationModule extends InstrumentationModule + implements ExperimentalInstrumentationModule { + + public CatsEffectInstrumentationModule() { + super("cats-effect", "cats-effect-3.6"); + } + + @Override + public List typeInstrumentations() { + return Arrays.asList(new IoFiberInstrumentation(), new IoRuntimeInstrumentation()); + } + + @Override + public ElementMatcher.Junction classLoaderMatcher() { + return hasClassesNamed("cats.effect.IO") + // missing before 3.6.0 + .and(hasClassesNamed("cats.effect.unsafe.metrics.IORuntimeMetrics")); + } + + @Override + public boolean defaultEnabled(ConfigProperties config) { + return super.defaultEnabled(config) + && config.getBoolean("cats.effect.trackFiberContext", false); + } + + @Override + public String getModuleGroup() { + return "opentelemetry-api-bridge"; + } + + // ensure it's the last one + @Override + public int order() { + return Integer.MAX_VALUE; + } +} diff --git a/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/FiberContextBridge.java b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/FiberContextBridge.java new file mode 100644 index 000000000000..5cbabab8b717 --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/FiberContextBridge.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextStorage; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.bootstrap.catseffect.v3_6.FiberLocalContextHelper; +import javax.annotation.Nullable; + +public class FiberContextBridge implements ContextStorage { + + private final ContextStorage agentContextStorage; + + public FiberContextBridge(ContextStorage delegate) { + this.agentContextStorage = delegate; + } + + @Override + public Scope attach(Context toAttach) { + if (FiberLocalContextHelper.isUnderFiberContext()) { + return FiberLocalContextHelper.attach(toAttach); + } else { + return agentContextStorage.attach(toAttach); + } + } + + @Nullable + @Override + public Context current() { + if (FiberLocalContextHelper.isUnderFiberContext()) { + return FiberLocalContextHelper.current(); + } else { + return agentContextStorage.current(); + } + } +} diff --git a/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/FiberContextBridgeInstaller.java b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/FiberContextBridgeInstaller.java new file mode 100644 index 000000000000..9a29b179ee25 --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/FiberContextBridgeInstaller.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6; + +import com.google.auto.service.AutoService; +import io.opentelemetry.context.ContextStorage; +import io.opentelemetry.javaagent.tooling.BeforeAgentListener; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import java.util.logging.Logger; + +/** + * A {@link BeforeAgentListener} that installs {@link FiberContextBridge} if `cats.effect.IO` is + * present in the classpath. + */ +@AutoService(BeforeAgentListener.class) +public class FiberContextBridgeInstaller implements BeforeAgentListener { + + private static final Logger logger = + Logger.getLogger(FiberContextBridgeInstaller.class.getName()); + + @Override + public void beforeAgent(AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk) { + ContextStorage.addWrapper(FiberContextBridge::new); + logger.fine("Installed Cats Effect FiberContextBridge"); + } +} diff --git a/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/IoFiberInstrumentation.java b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/IoFiberInstrumentation.java new file mode 100644 index 000000000000..0abbb7f69283 --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/IoFiberInstrumentation.java @@ -0,0 +1,48 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import application.io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.catseffect.common.v3_6.IoLocalContextSingleton; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class IoFiberInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("cats.effect.IOFiber"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and(takesArgument(0, named("scala.collection.immutable.Map"))) + .and(takesArgument(1, named("scala.Function1"))) + .and(takesArgument(2, named("cats.effect.IO"))) + .and(takesArgument(3, named("scala.concurrent.ExecutionContext"))) + .and(takesArgument(4, named("cats.effect.unsafe.IORuntime"))), + this.getClass().getName() + "$ConstructorAdvice"); + } + + @SuppressWarnings("unused") + public static final class ConstructorAdvice { + private ConstructorAdvice() {} + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(value = 2, readOnly = false) cats.effect.IO io) { + io = IoLocalContextSingleton.ioLocal.asLocal().scope(io, Context.current()); + } + } +} diff --git a/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/IoRuntimeInstrumentation.java b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/IoRuntimeInstrumentation.java new file mode 100644 index 000000000000..4df766f72871 --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/IoRuntimeInstrumentation.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import cats.effect.unsafe.IORuntime; +import io.opentelemetry.javaagent.bootstrap.catseffect.v3_6.FiberLocalContextHelper; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.catseffect.common.v3_6.IoLocalContextSingleton; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class IoRuntimeInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("cats.effect.unsafe.IORuntime"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor(), this.getClass().getName() + "$ConstructorAdvice"); + } + + @SuppressWarnings("unused") + public static final class ConstructorAdvice { + private ConstructorAdvice() {} + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter() { + FiberLocalContextHelper.initialize( + IoLocalContextSingleton.contextThreadLocal, IORuntime::isUnderFiberContext); + } + } +} diff --git a/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/CatsEffectInstrumentationTest.scala b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/CatsEffectInstrumentationTest.scala new file mode 100644 index 000000000000..ffc42309b0d7 --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-3.6/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/catseffect/v3_6/CatsEffectInstrumentationTest.scala @@ -0,0 +1,365 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.catseffect.v3_6 + +import java.util.concurrent.Executors + +import cats.effect.{Deferred, IO} +import cats.effect.unsafe.implicits.global +import io.opentelemetry.instrumentation.testing.junit._ +import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil +import io.opentelemetry.sdk.testing.assertj.{SpanDataAssert, TraceAssert} +import org.junit.jupiter.api.extension.RegisterExtension +import org.junit.jupiter.api.{Test, TestInstance} +import java.util.function.Consumer + +import cats.effect.std.Dispatcher +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.Span + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class CatsEffectInstrumentationTest { + + @RegisterExtension + val testing: InstrumentationExtension = AgentInstrumentationExtension.create() + + @Test + def respectOuterSpanWithUnsafeRunSync(): Unit = { + testing.runWithSpan[Exception]( + "main_1_span_1", + () => { + withIOSpan("fiber_1_span_1")(_ => IO.unit).unsafeRunSync() + } + ) + + testing.waitAndAssertTraces( + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("main_1_span_1").hasNoParent), + assertSpan(_.hasName("fiber_1_span_1").hasParent(trace.getSpan(0))) + ) + } + ) + } + + @Test + def respectOuterSpanAndPropagateToLiftedFuture(): Unit = { + testing.runWithSpan[Exception]( + "main_1_span_1", + () => { + withIOSpan("fiber_1_span_1") { _ => + IO.fromFuture(IO.delay { + Future(testing.runWithSpan[Exception]("future_1_span_1", () => ()))( + ExecutionContext.global + ) + }) + }.unsafeRunSync() + } + ) + + testing.waitAndAssertTraces( + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("main_1_span_1").hasNoParent), + assertSpan(_.hasName("fiber_1_span_1").hasParent(trace.getSpan(0))), + assertSpan(_.hasName("future_1_span_1").hasParent(trace.getSpan(1))) + ) + } + ) + } + + @Test + def respectOuterSpanWithUnsafeRunToFuture(): Unit = { + testing.runWithSpan[Exception]( + "main_1_span_1", + () => { + Await.result( + withIOSpan("fiber_1_span_1")(_ => IO.unit).unsafeToFuture(), + Duration.Inf + ) + } + ) + + testing.waitAndAssertTraces( + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("main_1_span_1").hasNoParent), + assertSpan(_.hasName("fiber_1_span_1").hasParent(trace.getSpan(0))) + ) + } + ) + } + + @Test + def respectOuterSpanWithDispatcher(): Unit = { + testing.runWithSpan[Exception]( + "main_1_span_1", + () => { + withIOSpan("fiber_1_span_1") { _ => + Dispatcher.sequential[IO].use { dispatcher => + IO.blocking { + dispatcher.unsafeRunSync( + withIOSpan("dispatcher_1_span_1")(_ => IO.unit) + ) + } + } + }.unsafeRunSync() + } + ) + + testing.waitAndAssertTraces( + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("main_1_span_1").hasNoParent), + assertSpan(_.hasName("fiber_1_span_1").hasParent(trace.getSpan(0))), + assertSpan( + _.hasName("dispatcher_1_span_1").hasParent(trace.getSpan(1)) + ) + ) + } + ) + } + + @Test + def traceIsPropagatedToChildFiber(): Unit = { + withIOSpan("fiber_1_span_1") { _ => + for { + child <- withIOSpan("fiber_2_span_1")(_ => IO.unit).start + _ <- child.join + } yield () + }.unsafeRunSync() + + testing.waitAndAssertTraces( + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_1_span_1").hasNoParent), + assertSpan(_.hasName("fiber_2_span_1").hasParent(trace.getSpan(0))) + ) + } + ) + } + + @Test + def traceIsPropagatedToChildFiberOnExternalExecutor(): Unit = { + withIOSpan("fiber_1_span_1") { _ => + for { + child <- withIOSpan("fiber_2_span_1")(_ => IO.unit).startOnExecutor( + Executors.newSingleThreadExecutor() + ) + _ <- child.join + } yield () + }.unsafeRunSync() + + testing.waitAndAssertTraces( + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_1_span_1").hasNoParent), + assertSpan(_.hasName("fiber_2_span_1").hasParent(trace.getSpan(0))) + ) + } + ) + } + + @Test + def traceIsPreservedWhenFiberIsInterrupted(): Unit = { + (for { + childStarted <- IO.deferred[Unit] + _ <- withIOSpan("fiber_1_span_1") { _ => + for { + child <- IO.defer( + withIOSpan("fiber_2_span_1")(_ => + (childStarted.complete(()) *> IO.never[Unit]).start + ) + ) + _ <- childStarted.get + _ <- child.cancel + } yield () + } + } yield ()).unsafeRunSync() + + testing.waitAndAssertTraces( + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_1_span_1").hasNoParent), + assertSpan(_.hasName("fiber_2_span_1").hasParent(trace.getSpan(0))) + ) + } + ) + } + + @Test + def synchronizedFibersDoNotInterfereWithEachOthersTraces(): Unit = { + + def runFiber( + fiberNumber: Int, + onStart: IO[Unit], + onEnd: IO[Unit] + ): IO[Unit] = + withIOSpan(s"fiber_${fiberNumber}_span_1") { _ => + onStart *> withIOSpan(s"fiber_${fiberNumber}_span_2")(_ => onEnd) + } + + (for { + fiber1Started <- IO.deferred[Unit] + fiber2Done <- IO.deferred[Unit] + + fiber1 <- runFiber( + fiberNumber = 1, + onStart = fiber1Started.complete(()) *> fiber2Done.get, + onEnd = IO.unit + ).start + + fiber2 <- runFiber( + fiberNumber = 2, + onStart = fiber1Started.get, + onEnd = fiber2Done.complete(()).void + ).start + + _ <- fiber1.join *> fiber2.join + } yield ()).unsafeRunSync() + + testing.waitAndAssertSortedTraces( + TelemetryDataUtil.orderByRootSpanName( + "fiber_1_span_1", + "fiber_1_span_2", + "fiber_2_span_1", + "fiber_2_span_2" + ), + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_1_span_1").hasNoParent), + assertSpan(_.hasName("fiber_1_span_2").hasParent(trace.getSpan(0))) + ) + }, + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_2_span_1").hasNoParent), + assertSpan(_.hasName("fiber_2_span_2").hasParent(trace.getSpan(0))) + ) + } + ) + } + + @Test + def concurrentFibersDoNotInterfereWithEachOthersTraces(): Unit = { + + def runFiber( + fiberNumber: Int, + start: Deferred[IO, Unit] + ): IO[Unit] = { + start.get *> + withIOSpan(s"fiber_${fiberNumber}_span_1") { _ => + IO.cede *> withIOSpan(s"fiber_${fiberNumber}_span_2")(_ => IO.unit) + } + } + + (for { + start <- IO.deferred[Unit] + fiber1 <- runFiber(1, start).start + fiber2 <- runFiber(2, start).start + fiber3 <- runFiber(3, start).start + _ <- start.complete(()) + _ <- fiber1.join *> fiber2.join *> fiber3.join + } yield ()).unsafeRunSync() + + testing.waitAndAssertSortedTraces( + TelemetryDataUtil.orderByRootSpanName( + "fiber_1_span_1", + "fiber_2_span_1", + "fiber_3_span_1" + ), + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_1_span_1").hasNoParent), + assertSpan(_.hasName("fiber_1_span_2").hasParent(trace.getSpan(0))) + ) + }, + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_2_span_1").hasNoParent), + assertSpan(_.hasName("fiber_2_span_2").hasParent(trace.getSpan(0))) + ) + }, + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_3_span_1").hasNoParent), + assertSpan(_.hasName("fiber_3_span_2").hasParent(trace.getSpan(0))) + ) + } + ) + } + + @Test + def sequentialFibersDoNotInterfereWithEachOthersTraces(): Unit = { + + def runFiber(fiberNumber: Int): IO[Unit] = + withIOSpan(s"fiber_${fiberNumber}_span_1") { _ => + withIOSpan(s"fiber_${fiberNumber}_span_2")(_ => IO.unit) + } + + (for { + fiber1 <- runFiber(1).start + _ <- fiber1.join + fiber2 <- runFiber(2).start + _ <- fiber2.join + fiber3 <- runFiber(3).start + _ <- fiber3.join + } yield ()).unsafeRunSync() + + testing.waitAndAssertSortedTraces( + TelemetryDataUtil.orderByRootSpanName( + "fiber_1_span_1", + "fiber_1_span_2", + "fiber_2_span_1", + "fiber_2_span_2", + "fiber_3_span_1", + "fiber_3_span_2" + ), + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_1_span_1").hasNoParent), + assertSpan(_.hasName("fiber_1_span_2").hasParent(trace.getSpan(0))) + ) + }, + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_2_span_1").hasNoParent), + assertSpan(_.hasName("fiber_2_span_2").hasParent(trace.getSpan(0))) + ) + }, + assertTrace { trace => + trace.hasSpansSatisfyingExactly( + assertSpan(_.hasName("fiber_3_span_1").hasNoParent), + assertSpan(_.hasName("fiber_3_span_2").hasParent(trace.getSpan(0))) + ) + } + ) + } + + private def assertTrace(f: TraceAssert => Any): Consumer[TraceAssert] = + (t: TraceAssert) => f(t) + + private def assertSpan(f: SpanDataAssert => Any): Consumer[SpanDataAssert] = + (t: SpanDataAssert) => f(t) + + private val tracer = GlobalOpenTelemetry.getTracer("test") + + private def withIOSpan[A](name: String)(f: Span => IO[A]): IO[A] = + IO.delay { + val span = tracer.spanBuilder(name).startSpan() + val scope = span.makeCurrent() + (span, scope) + }.bracket { case (span, _) => f(span) } { case (span, scope) => + IO.delay { + scope.close() + span.end() + } + } + +} diff --git a/instrumentation/cats-effect/cats-effect-common-3.6/javaagent/build.gradle.kts b/instrumentation/cats-effect/cats-effect-common-3.6/javaagent/build.gradle.kts new file mode 100644 index 000000000000..f08c64fc5a34 --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-common-3.6/javaagent/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +val scalaVersion = "2.13" +val catsEffectVersion = "3.6.0" + +dependencies { + // we need access to the "application.io.opentelemetry.context.Context" + // to properly bridge fiber and agent context storages + compileOnly(project(":opentelemetry-api-shaded-for-instrumenting", configuration = "shadow")) + compileOnly(project(":instrumentation:opentelemetry-api:opentelemetry-api-1.0:javaagent")) + + compileOnly("org.typelevel:cats-effect_$scalaVersion:$catsEffectVersion") +} diff --git a/instrumentation/cats-effect/cats-effect-common-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/common/v3_6/IoLocalContextSingleton.java b/instrumentation/cats-effect/cats-effect-common-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/common/v3_6/IoLocalContextSingleton.java new file mode 100644 index 000000000000..ec583f95782f --- /dev/null +++ b/instrumentation/cats-effect/cats-effect-common-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/catseffect/common/v3_6/IoLocalContextSingleton.java @@ -0,0 +1,38 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.catseffect.common.v3_6; + +import application.io.opentelemetry.context.Context; +import cats.effect.IOLocal; +import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage; + +public class IoLocalContextSingleton { + + private IoLocalContextSingleton() {} + + public static final IOLocal ioLocal = + IOLocal.apply(Context.root()).syncStep(100).unsafeRunSync().toOption().get(); + + private static final ThreadLocal ioLocalThreadLocal = ioLocal.unsafeThreadLocal(); + + public static final ThreadLocal contextThreadLocal = + new ThreadLocal() { + @Override + public io.opentelemetry.context.Context get() { + Context current = ioLocalThreadLocal.get(); + return current != null ? AgentContextStorage.getAgentContext(current) : null; + } + + @Override + public void set(io.opentelemetry.context.Context value) { + if (value != null) { + ioLocalThreadLocal.set(AgentContextStorage.toApplicationContext(value)); + } else { + ioLocalThreadLocal.remove(); + } + } + }; +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 22fb50e8f1f9..202f3ed1a57d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -187,6 +187,9 @@ include(":instrumentation:cassandra:cassandra-4.4:javaagent") include(":instrumentation:cassandra:cassandra-4.4:library") include(":instrumentation:cassandra:cassandra-4.4:testing") include(":instrumentation:cassandra:cassandra-4-common:testing") +include(":instrumentation:cats-effect:cats-effect-3.6:bootstrap") +include(":instrumentation:cats-effect:cats-effect-3.6:javaagent") +include(":instrumentation:cats-effect:cats-effect-common-3.6:javaagent") include(":instrumentation:cdi-testing") include(":instrumentation:clickhouse-client-0.5:javaagent") include(":instrumentation:couchbase:couchbase-2.0:javaagent")