Skip to content

Commit 305b2c5

Browse files
authored
feat: add pekko-grpc instrumentation (#1307)
1 parent a96a353 commit 305b2c5

File tree

10 files changed

+316
-0
lines changed

10 files changed

+316
-0
lines changed

build.sbt

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ val instrumentationProjects = Seq[ProjectReference](
139139
`kamon-akka-grpc`,
140140
`kamon-pekko`,
141141
`kamon-pekko-http`,
142+
`kamon-pekko-grpc`,
142143
`kamon-play`,
143144
`kamon-okhttp`,
144145
`kamon-tapir`,
@@ -526,6 +527,31 @@ lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http
526527
),
527528
)).dependsOn(`kamon-pekko`, `kamon-testkit` % "test")
528529

530+
lazy val `kamon-pekko-grpc` = (project in file("instrumentation/kamon-pekko-grpc"))
531+
.enablePlugins(JavaAgent, PekkoGrpcPlugin)
532+
.disablePlugins(AssemblyPlugin)
533+
.settings(instrumentationSettings)
534+
.settings(Seq(
535+
PB.additionalDependencies := Seq.empty,
536+
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
537+
libraryDependencies ++= Seq(
538+
kanelaAgent % "provided",
539+
540+
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % "provided",
541+
"org.apache.pekko" %% "pekko-stream" % "1.0.1" % "provided",
542+
"org.apache.pekko" %% "pekko-discovery"% "1.0.0" % "provided",
543+
544+
"com.thesamet.scalapb" %% "scalapb-runtime" % "0.11.8" % "provided",
545+
"org.apache.pekko" %% "pekko-grpc-runtime" % "1.0.0" % "provided",
546+
"io.grpc" % "grpc-stub" % "1.43.2" % "provided",
547+
548+
549+
scalatest % "test",
550+
slf4jApi % "test",
551+
logbackClassic % "test",
552+
)
553+
)).dependsOn(`kamon-pekko-http`, `kamon-testkit` % "test")
554+
529555
lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc"))
530556
.enablePlugins(JavaAgent, AkkaGrpcPlugin)
531557
.disablePlugins(AssemblyPlugin)
@@ -998,6 +1024,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo
9981024
`kamon-finagle`,
9991025
`kamon-pekko`,
10001026
`kamon-pekko-http`,
1027+
`kamon-pekko-grpc`,
10011028
`kamon-tapir`,
10021029
`kamon-alpakka-kafka`
10031030
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package kamon.instrumentation.pekko.grpc;
2+
3+
import org.apache.pekko.http.javadsl.model.HttpEntity;
4+
import kamon.Kamon;
5+
import kamon.context.Context;
6+
import kanela.agent.libs.net.bytebuddy.asm.Advice;
7+
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.CompletionStage;
10+
import java.util.function.Function;
11+
12+
public class PekkoGRPCUnmarshallingContextPropagation {
13+
14+
@Advice.OnMethodExit()
15+
public static void onExit(
16+
@Advice.Return(readOnly = false) CompletionStage<?> returnValue,
17+
@Advice.Argument(0) Object firstArgument) {
18+
19+
if(firstArgument instanceof HttpEntity && returnValue instanceof CompletableFuture) {
20+
final Context currentContext = Kamon.currentContext();
21+
22+
// NOTES: The wrapper is only overriding thenCompose because it is the only function that gets called
23+
// after GrpcMarshalling.unmarshall in the auto-generated HandlerFactory for gRPC services. In
24+
// the future this might be removed if we instrument CompletionStage directly.
25+
returnValue = new ContextPropagatingCompletionStage<>((CompletableFuture) returnValue, currentContext);
26+
}
27+
}
28+
29+
30+
public static class ContextPropagatingCompletionStage<T> extends CompletableFuture<T> {
31+
private final CompletableFuture<T> wrapped;
32+
private final Context context;
33+
34+
public ContextPropagatingCompletionStage(CompletableFuture<T> wrapped, Context context) {
35+
this.wrapped = wrapped;
36+
this.context = context;
37+
}
38+
39+
@Override
40+
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
41+
Function<? super T, ? extends CompletionStage<U>> wrapperFunction = (t) -> {
42+
return Kamon.runWithContext(context, () -> fn.apply(t));
43+
};
44+
45+
return wrapped.thenCompose(wrapperFunction);
46+
}
47+
}
48+
49+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# ======================================= #
2+
# Kamon-Pekko-gRPC Reference Configuration #
3+
# ======================================= #
4+
5+
kanela.modules {
6+
pekko-grpc {
7+
name = "Pekko gRPC Instrumentation"
8+
description = "Context propagation and tracing for Pekko gRPC"
9+
enabled = yes
10+
11+
instrumentations = [
12+
"kamon.instrumentation.pekko.grpc.PekkoGrpcServerInstrumentation"
13+
]
14+
15+
within = [
16+
"^org.apache.pekko.grpc.internal..*",
17+
"^org.apache.pekko.grpc.scaladsl.GrpcMarshalling$"
18+
]
19+
}
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kamon.instrumentation.pekko.grpc
18+
19+
import kamon.Kamon
20+
import kanela.agent.api.instrumentation.InstrumentationBuilder
21+
import kanela.agent.libs.net.bytebuddy.asm.Advice
22+
23+
class PekkoGrpcServerInstrumentation extends InstrumentationBuilder {
24+
25+
/**
26+
* Support for Pekko gRPC servers.
27+
*
28+
* gRPC requests get their spans started by the ServerFlowWrapper in the Pekko HTTP instrumentation like any other
29+
* requests, but they never go through any instrumentation that gives a good operation name to the Span and forces
30+
* taking a sampling decision.
31+
*
32+
* This instrumentation gives a proper name and tags to the span when it matches one of the exposed services,
33+
* otherwise the span remains unchanged. Assumes no actual implementation of `pekko.grpc.internal.TelemetrySpi` is
34+
* configured.
35+
*/
36+
onType("org.apache.pekko.grpc.internal.NoOpTelemetry$")
37+
.advise(method("onRequest"), PekkoGRPCServerRequestHandler)
38+
39+
40+
onType("org.apache.pekko.grpc.scaladsl.GrpcMarshalling")
41+
.advise(method("unmarshal"), classOf[PekkoGRPCUnmarshallingContextPropagation])
42+
}
43+
44+
object PekkoGRPCServerRequestHandler {
45+
46+
@Advice.OnMethodEnter()
47+
def enter(@Advice.Argument(0) serviceName: String, @Advice.Argument(1) method: String): Unit = {
48+
val fullSpanName = serviceName + "/" + method
49+
Kamon.currentSpan()
50+
.name(fullSpanName)
51+
.tagMetrics("component", "pekko.grpc.server")
52+
.tagMetrics("rpc.system", "grpc")
53+
.tagMetrics("rpc.service", serviceName)
54+
.tagMetrics("rpc.method", method)
55+
.takeSamplingDecision()
56+
}
57+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
syntax = "proto3";
2+
3+
option java_multiple_files = true;
4+
option java_package = "kamon.instrumentation.pekko.grpc";
5+
option java_outer_classname = "HelloWorldProto";
6+
7+
package helloworld;
8+
9+
////////////////////////////////////// The greeting service definition.
10+
service GreeterService {
11+
//////////////////////
12+
// Sends a greeting //
13+
////////*****/////////
14+
// HELLO //
15+
////////*****/////////
16+
rpc SayHello (HelloRequest) returns (HelloReply) {}
17+
18+
// Comment spanning
19+
// on several lines
20+
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
21+
22+
/*
23+
* C style comments
24+
*/
25+
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
26+
27+
/* C style comments
28+
* on several lines
29+
* with non-empty heading/trailing line */
30+
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
31+
}
32+
33+
// The request message containing the user's name.
34+
message HelloRequest {
35+
string name = 1;
36+
}
37+
38+
// The response message containing the greetings
39+
message HelloReply {
40+
string message = 1;
41+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pekko.http.server.preview.enable-http2 = on
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<configuration>
2+
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
3+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
4+
<encoder>
5+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
6+
</encoder>
7+
</appender>
8+
9+
<root level="INFO">
10+
<appender-ref ref="STDOUT"/>
11+
</root>
12+
</configuration>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kamon.instrumentation.pekko.grpc
18+
19+
import scala.concurrent.Future
20+
import org.apache.pekko.NotUsed
21+
import org.apache.pekko.stream.Materializer
22+
import org.apache.pekko.stream.scaladsl.Sink
23+
import org.apache.pekko.stream.scaladsl.Source
24+
25+
26+
class GreeterServiceImpl(implicit mat: Materializer) extends GreeterService {
27+
import mat.executionContext
28+
29+
override def sayHello(in: HelloRequest): Future[HelloReply] = {
30+
Future.successful(HelloReply(s"Hello, ${in.name}"))
31+
}
32+
33+
override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
34+
in.runWith(Sink.seq).map(elements => HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}"))
35+
}
36+
37+
override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
38+
Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString))
39+
}
40+
41+
override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
42+
in.map(request => HelloReply(s"Hello, ${request.name}"))
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kamon.instrumentation.pekko.grpc
18+
19+
import org.apache.pekko.actor.ActorSystem
20+
import org.apache.pekko.grpc.GrpcClientSettings
21+
import org.apache.pekko.http.scaladsl.Http
22+
import kamon.tag.Lookups.plain
23+
import kamon.testkit.{InitAndStopKamonAfterAll, TestSpanReporter}
24+
import org.scalatest.OptionValues
25+
import org.scalatest.concurrent.Eventually
26+
import org.scalatest.matchers.should.Matchers
27+
import org.scalatest.wordspec.AnyWordSpec
28+
29+
import scala.concurrent.duration._
30+
31+
class PekkoGrpcTracingSpec extends AnyWordSpec with InitAndStopKamonAfterAll with Matchers with Eventually
32+
with TestSpanReporter with OptionValues {
33+
34+
implicit val system = ActorSystem("pekko-grpc-instrumentation")
35+
implicit val ec = system.dispatcher
36+
37+
val greeterService = GreeterServiceHandler(new GreeterServiceImpl())
38+
val serverBinding = Http()
39+
.newServerAt("127.0.0.1", 8598)
40+
.bind(greeterService)
41+
42+
43+
val client = GreeterServiceClient(GrpcClientSettings.connectToServiceAt("127.0.0.1", 8598).withTls(false))
44+
45+
"the Pekko gRPC instrumentation" should {
46+
"create spans for the server-side" in {
47+
client.sayHello(HelloRequest("kamon"))
48+
49+
eventually(timeout(5 seconds)) {
50+
val span = testSpanReporter().nextSpan().value
51+
span.operationName shouldBe "helloworld.GreeterService/SayHello"
52+
span.metricTags.get(plain("component")) shouldBe "pekko.grpc.server"
53+
span.metricTags.get(plain("rpc.system")) shouldBe "grpc"
54+
span.metricTags.get(plain("rpc.service")) shouldBe "helloworld.GreeterService"
55+
span.metricTags.get(plain("rpc.method")) shouldBe "SayHello"
56+
}
57+
}
58+
}
59+
60+
override protected def beforeAll(): Unit = {
61+
super.beforeAll()
62+
enableFastSpanFlushing()
63+
}
64+
}

project/plugins.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
1414
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.5.0")
1515

1616
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.1.3")
17+
addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.0.0")

0 commit comments

Comments
 (0)