diff --git a/build.sbt b/build.sbt index 0243958c7..f9a440cad 100644 --- a/build.sbt +++ b/build.sbt @@ -118,6 +118,7 @@ val instrumentationProjects = Seq[ProjectReference]( `kamon-scalaz-future`, `kamon-cats-io`, `kamon-cats-io-3`, + `kamon-zio-2`, `kamon-logback`, `kamon-jdbc`, `kamon-kafka`, @@ -268,6 +269,21 @@ lazy val `kamon-cats-io-3` = (project in file("instrumentation/kamon-cats-io-3") ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test") +lazy val `kamon-zio-2` = (project in file("instrumentation/kamon-zio-2")) + .disablePlugins(AssemblyPlugin) + .enablePlugins(JavaAgent) + .settings(instrumentationSettings) + .settings( + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version), + libraryDependencies ++= Seq( + kanelaAgent % "provided", + "dev.zio" %% "zio" % "2.0.21" % "provided", + scalatest % "test", + logbackClassic % "test" + ), + + ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test") + lazy val `kamon-logback` = (project in file("instrumentation/kamon-logback")) .disablePlugins(AssemblyPlugin) @@ -1028,6 +1044,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo `kamon-bundle-dependencies-all`, `kamon-akka-grpc`, `kamon-cats-io-3`, + `kamon-zio-2`, `kamon-finagle`, `kamon-pekko`, `kamon-pekko-http`, @@ -1066,6 +1083,7 @@ lazy val `kamon-bundle-dependencies-3` = (project in file("bundle/kamon-bundle-d `kamon-caffeine`, `kamon-aws-sdk`, `kamon-cats-io-3`, + `kamon-zio-2`, `kamon-pekko`, `kamon-pekko-http`, `kamon-pekko-grpc` diff --git a/instrumentation/kamon-zio-2/src/main/java/kamon/instrumentation/zio2/SupervisorAdvice.java b/instrumentation/kamon-zio-2/src/main/java/kamon/instrumentation/zio2/SupervisorAdvice.java new file mode 100644 index 000000000..ca13edd19 --- /dev/null +++ b/instrumentation/kamon-zio-2/src/main/java/kamon/instrumentation/zio2/SupervisorAdvice.java @@ -0,0 +1,14 @@ +package kamon.instrumentation.zio2; + +import kanela.agent.libs.net.bytebuddy.asm.Advice; +import zio.Supervisor; + +public class SupervisorAdvice { + + public static class OverrideDefaultSupervisor { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Return(readOnly = false) Supervisor supervisor) { + supervisor = supervisor.$plus$plus(new NewSupervisor()); + } + } +} diff --git a/instrumentation/kamon-zio-2/src/main/resources/reference.conf b/instrumentation/kamon-zio-2/src/main/resources/reference.conf new file mode 100644 index 000000000..e600a9095 --- /dev/null +++ b/instrumentation/kamon-zio-2/src/main/resources/reference.conf @@ -0,0 +1,20 @@ +############################################# +# Kamon ZIO 2 Reference Configuration # +############################################# +kanela.modules { + zio-2 { + name = "ZIO 2 Instrumentation" + description = "Provides instrumentation for ZIO" + + instrumentations = [ + "kamon.instrumentation.zio2.ZIO2Instrumentation" + ] + + within = [ + "zio.internal.FiberRuntime", + "zio\\.Runtime.*", + ] + } + + +} diff --git a/instrumentation/kamon-zio-2/src/main/scala/kamon/instrumentation/zio2/ZIO2Instrumentation.scala b/instrumentation/kamon-zio-2/src/main/scala/kamon/instrumentation/zio2/ZIO2Instrumentation.scala new file mode 100644 index 000000000..4e779edb4 --- /dev/null +++ b/instrumentation/kamon-zio-2/src/main/scala/kamon/instrumentation/zio2/ZIO2Instrumentation.scala @@ -0,0 +1,81 @@ +package kamon.instrumentation.zio2 + +import kamon.Kamon +import kamon.context.Storage +import kamon.instrumentation.context.HasContext +import kanela.agent.api.instrumentation.InstrumentationBuilder +import zio.{Exit, Fiber, Supervisor, UIO, Unsafe, ZEnvironment, ZIO} + + +/** + * This works as follows. + * - patches the defaultSupervisor val from Runtime to add our own supervisor. + * - Mixes in the [[HasContext.Mixin]] class so we don't have to keep a separate map of Fiber -> Context + * - Performs context shifting based on starting/suspending of fibers. + * + */ +class ZIO2Instrumentation extends InstrumentationBuilder { + + onType("zio.internal.FiberRuntime") + .mixin(classOf[HasContext.Mixin]) + .mixin(classOf[HasStorage.Mixin]) + + onType("zio.Runtime$") + .advise(method("defaultSupervisor"), classOf[SupervisorAdvice.OverrideDefaultSupervisor]) +} + +/** + * Mixin that exposes access to the scope captured by an instrumented instance. The interface exposes means of getting and more importantly + * closing of the scope. + */ +trait HasStorage { + + /** + * Returns the [[Storage.Scope]] stored in the instrumented instance. + */ + def kamonScope: Storage.Scope + + /** + * Updates the [[Storage.Scope]] stored in the instrumented instance + */ + def setKamonScope(scope: Storage.Scope): Unit + +} + +object HasStorage { + + /** + * [[HasStorage]] implementation that keeps the scope in a mutable field. + */ + class Mixin(@transient private var _scope: Storage.Scope) extends HasStorage { + + override def kamonScope: Storage.Scope = if (_scope != null) _scope else Storage.Scope.Empty + + override def setKamonScope(scope: Storage.Scope): Unit = _scope = scope + } +} + + +class NewSupervisor extends Supervisor[Any] { + + override def value(implicit trace: zio.Trace): UIO[Any] = ZIO.unit + + override def onStart[R, E, A_](environment: ZEnvironment[R], effect: ZIO[R, E, A_], parent: Option[Fiber.Runtime[Any, Any]], fiber: Fiber.Runtime[E, A_])(implicit unsafe: Unsafe): Unit = { + fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext()) + } + + override def onSuspend[E, A_](fiber: Fiber.Runtime[E, A_])(implicit unsafe: Unsafe): Unit = { + fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext()) + } + + override def onResume[E, A_](fiber: Fiber.Runtime[E, A_])(implicit unsafe: Unsafe): Unit = { + val fiberInstance = fiber.asInstanceOf[HasContext with HasStorage] + val ctx = fiberInstance.context + fiberInstance.setKamonScope(Kamon.storeContext(ctx)) + } + + override def onEnd[R, E, A_](value: Exit[E, A_], fiber: Fiber.Runtime[E, A_])(implicit unsafe: Unsafe): Unit = { + val fiberInstance = fiber.asInstanceOf[HasContext with HasStorage] + fiberInstance.kamonScope.close() + } +} diff --git a/instrumentation/kamon-zio-2/src/test/scala/kamon/instrumentation/zio2/ZIO2InstrumentationSpec.scala b/instrumentation/kamon-zio-2/src/test/scala/kamon/instrumentation/zio2/ZIO2InstrumentationSpec.scala new file mode 100644 index 000000000..3d62b3ff1 --- /dev/null +++ b/instrumentation/kamon-zio-2/src/test/scala/kamon/instrumentation/zio2/ZIO2InstrumentationSpec.scala @@ -0,0 +1,190 @@ +package kamon.instrumentation.zio2 + + +import kamon.Kamon +import kamon.context.Context +import kamon.tag.Lookups.plain +import org.scalatest.{Assertion, BeforeAndAfterEach, OptionValues} +import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import java.util.concurrent.{Executors, TimeUnit} +import scala.concurrent.{Await, ExecutionContext, Future} +import kamon.trace.Identifier.Scheme +import kamon.trace.{Identifier, Span, Trace} +import zio._ + +import scala.concurrent.duration.FiniteDuration + +class ZIO2InstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutures with PatienceConfiguration + with OptionValues with Eventually with BeforeAndAfterEach { + + protected implicit val zioRuntime: Runtime[Any] = + Runtime.default + + protected def unsafeRunZIO[E, A](r: ZIO[Any, E, A])(implicit trace: zio.Trace): A = { + Unsafe.unsafe(implicit unsafe => zioRuntime.unsafe.run(r)) match { + case Exit.Success(value) => value + case f @ Exit.Failure(cause) => fail(cause.squashWith(_ => new Exception(f.toString))) + } + } + + java.lang.System.setProperty("kamon.context.debug", "true") + + "a ZIO created when instrumentation is active" should { + "capture the active span available when created" which { + + "must capture the current context when creating and running fibers" in { + + val context = Context.of("tool", "kamon") + + val effect = for { + contextOnAnotherThread <- ZIO.succeed(Kamon.currentContext()) + _ <- ZIO.succeed(Seq("hello", "world")) + _ <- ZIO.sleep(Duration(10, TimeUnit.MILLISECONDS)) + contextAnother <- ZIO.attemptBlocking(Kamon.currentContext()) + } yield { + val currentContext = Kamon.currentContext() + currentContext shouldBe context + contextOnAnotherThread shouldBe context + currentContext + } + + val contextInsideYield = Kamon.runWithContext(context) { + // This is what would happen at the edges of the system, when Kamon has already + // started a Span in an outer layer (usually the HTTP server instrumentation) and + // when processing gets to user-level code, the users want to run their business + // logic as an effect. We should always propagate the context that was available + // at this point to the moment when the effect runs. + + unsafeRunZIO(effect) + } + + context shouldBe contextInsideYield + } + + "must allow the context to be cleaned" in { + val anotherExecutor = Executor.fromExecutionContext(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(10))) + val context = Context.of("key", "value") + + val test = + for { + _ <- ZIO.succeed(Kamon.storeContext(context)) + _ <- ZIO.onExecutor(anotherExecutor)(ZIO.sleep(10.millis)) + beforeCleaning <- ZIO.succeed(Kamon.currentContext()) + _ <- ZIO.succeed(Kamon.storeContext(Context.Empty)) + _ <- ZIO.onExecutor(anotherExecutor)(ZIO.sleep(10.millis)) + afterCleaning <- ZIO.succeed(Kamon.currentContext()) + } yield { + afterCleaning shouldBe Context.Empty + beforeCleaning shouldBe context + } + + unsafeRunZIO(test) + } + + "must be available across asynchronous boundaries" in { + val anotherExecutor: Executor = Executor.fromExecutionContext(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))) //pool 7 + val context = Context.of("key", "value") + val test = + for { + scope <- ZIO.succeed(Kamon.storeContext(context)) + len <- ZIO.succeed("Hello Kamon!").map(_.length) + _ <- ZIO.succeed(len.toString) + beforeChanging <- getKey + evalOnGlobalRes <-ZIO.sleep(Duration.Zero) *> getKey + outerSpanIdBeginning <- ZIO.succeed(Kamon.currentSpan().id.string) + innerSpan <- ZIO.succeed(Kamon.clientSpanBuilder("Foo", "attempt").context(context).start()) + innerSpanId1 <- ZIO.onExecutor(anotherExecutor)(ZIO.succeed(Kamon.currentSpan())) + innerSpanId2 <- ZIO.succeed(Kamon.currentSpan()) + _ <- ZIO.succeed(innerSpan.finish()) + outerSpanIdEnd <- ZIO.succeed(Kamon.currentSpan().id.string) + evalOnAnotherEx <-ZIO.onExecutor(anotherExecutor)(ZIO.sleep(Duration.Zero).flatMap(_ => getKey)) + } yield { + scope.close() + withClue("before changing")(beforeChanging shouldBe "value") + withClue("on the global exec context")(evalOnGlobalRes shouldBe "value") + withClue("on a different exec context")(evalOnAnotherEx shouldBe "value") + withClue("final result")(evalOnAnotherEx shouldBe "value") + withClue("inner span should be the same on different exec")(innerSpanId1 shouldBe innerSpan) + withClue("inner span should be the same on same exec")(innerSpanId2 shouldBe innerSpan) + withClue("inner and outer should be different")(outerSpanIdBeginning should not equal innerSpan) + } + + unsafeRunZIO(test) + + } + + "must allow complex Span topologies to be created" in { + val parentSpan = Span.Remote( + Scheme.Single.spanIdFactory.generate(), + Identifier.Empty, + Trace.create(Scheme.Single.traceIdFactory.generate(), Trace.SamplingDecision.Sample) + ) + val context = Context.of(Span.Key, parentSpan) + implicit val ec = ExecutionContext.global + /** + * test + * - nestedLevel0 + * - nestedUpToLevel2 + * - nestedUpToLevel2._2._1 + * - fiftyInParallel + */ + val test = for { + span <- ZIO.succeed(Kamon.currentSpan()) + nestedLevel0 <- meteredWithSpanCapture("level1-A")(ZIO.sleep(100.millis)) + nestedUpToLevel2 <- meteredWithSpanCapture("level1-B")(meteredWithSpanCapture("level2-B")(ZIO.sleep(100.millis))) + fiftyInParallel <- ZIO.foreachPar((0 to 49).toList)(i => meteredWithSpanCapture(s"operation$i")(ZIO.sleep(100.millis))) + afterCede <- meteredWithSpanCapture("yieldNow")(ZIO.yieldNow.as(Kamon.currentSpan())) + afterEverything <- ZIO.succeed(Kamon.currentSpan()) + } yield { + span.id.string should not be empty + span.id.string shouldBe nestedLevel0._1.parentId.string + span.id.string shouldBe nestedUpToLevel2._1.parentId.string + nestedUpToLevel2._1.id.string shouldBe nestedUpToLevel2._2._1.parentId.string + fiftyInParallel.map(_._1.parentId.string).toSet shouldBe Set(span.id.string) + fiftyInParallel.map(_._1.id.string).toSet should have size 50 + afterCede._1.id.string shouldBe afterCede._2.id.string //A cede should not cause the span to be lost + afterEverything.id.string shouldBe span.id.string + } + + + + val result = scala.concurrent.Future.sequence( + (1 to 100).toList.map { _ => + Unsafe.unsafe { implicit unsafe => + zioRuntime.unsafe.runToFuture { + (ZIO.succeed(Kamon.init()) *> ZIO.succeed(Kamon.storeContext(context)) *> test) + }: Future[Assertion] + } + } + ) + Await.result(result, FiniteDuration(100, "seconds")) + } + } + } + + private def getKey: UIO[String] = { + ZIO.succeed(Kamon.currentContext().getTag(plain("key"))) + } + + private def meteredWithSpanCapture[A](operation: String)(io: UIO[A]): UIO[(Span, A)] = { + ZIO.scoped { + ZIO.acquireRelease { + for { + initialCtx <- ZIO.succeed(Kamon.currentContext()) + parentSpan <- ZIO.succeed(Kamon.currentSpan()) + newSpan <- ZIO.succeed(Kamon.spanBuilder(operation).context(initialCtx).asChildOf(parentSpan).start()) + _ <- ZIO.succeed(Kamon.storeContext(initialCtx.withEntry(Span.Key, newSpan))) + } yield (initialCtx, newSpan) + }{ + case (initialCtx, span) => + for { + _ <- ZIO.succeed(span.finish()) + _ <- ZIO.succeed(Kamon.storeContext(initialCtx)) + } yield () + } *> ZIO.succeed(Kamon.currentSpan()).zipPar(io) + } + } +}