Skip to content

Commit 738ab89

Browse files
psnepivantopo
authored andcommitted
Fix akka-http instrumentation for HTTP2
1 parent f4898d9 commit 738ab89

File tree

5 files changed

+68
-77
lines changed

5 files changed

+68
-77
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2013-2024 The Kamon Project <https://kamon.io>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kamon.instrumentation.akka.http;
18+
19+
import akka.NotUsed;
20+
import akka.http.scaladsl.model.HttpRequest;
21+
import akka.http.scaladsl.model.HttpResponse;
22+
import akka.stream.scaladsl.Flow;
23+
import kanela.agent.libs.net.bytebuddy.asm.Advice;
24+
25+
public class Http2BlueprintAsyncAdvice {
26+
27+
public static class EndpointInfo {
28+
public final String listenInterface;
29+
public final int listenPort;
30+
31+
public EndpointInfo(String listenInterface, int listenPort) {
32+
this.listenInterface = listenInterface;
33+
this.listenPort = listenPort;
34+
}
35+
}
36+
37+
public static ThreadLocal<EndpointInfo> currentEndpoint = new ThreadLocal<>();
38+
39+
@Advice.OnMethodExit
40+
public static void onExit(@Advice.Return(readOnly = false) Flow<HttpRequest, HttpResponse, NotUsed> returnedFlow) {
41+
EndpointInfo bindAndHandlerEndpoint = currentEndpoint.get();
42+
43+
if(bindAndHandlerEndpoint != null) {
44+
returnedFlow = ServerFlowWrapper.apply(
45+
returnedFlow,
46+
bindAndHandlerEndpoint.listenInterface,
47+
bindAndHandlerEndpoint.listenPort
48+
);
49+
}
50+
}
51+
}

instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,20 @@
1616

1717
package kamon.instrumentation.akka.http;
1818

19-
import akka.http.scaladsl.model.HttpRequest;
20-
import akka.http.scaladsl.model.HttpResponse;
2119
import kanela.agent.libs.net.bytebuddy.asm.Advice;
22-
import scala.Function1;
23-
import scala.concurrent.Future;
2420

2521
public class Http2ExtBindAndHandleAdvice {
2622

2723
@Advice.OnMethodEnter(suppress = Throwable.class)
28-
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Function1<HttpRequest, Future<HttpResponse>> handler,
29-
@Advice.Argument(1) String iface,
30-
@Advice.Argument(2) Integer port) {
24+
public static void onEnter(@Advice.Argument(1) String iface, @Advice.Argument(2) Integer port) {
3125

3226
FlowOpsMapAsyncAdvice.currentEndpoint.set(new FlowOpsMapAsyncAdvice.EndpointInfo(iface, port));
33-
handler = new Http2BlueprintInterceptor.HandlerWithEndpoint(iface, port, handler);
27+
Http2BlueprintAsyncAdvice.currentEndpoint.set(new Http2BlueprintAsyncAdvice.EndpointInfo(iface, port));
3428
}
3529

3630
@Advice.OnMethodExit
3731
public static void onExit() {
3832
FlowOpsMapAsyncAdvice.currentEndpoint.remove();
33+
Http2BlueprintAsyncAdvice.currentEndpoint.remove();
3934
}
4035
}

instrumentation/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
4545

4646
/**
4747
* For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow
48-
* creation happen at different times we are wrapping the handler with the interface/port data and reading that
49-
* information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1.
48+
* creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port
49+
* data and reading that information on method exit to wrap it the same way we would for HTTP/1.
5050
*/
5151
onType("akka.http.scaladsl.Http2Ext")
5252
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])
5353

5454
onType("akka.http.impl.engine.http2.Http2Blueprint$")
55-
.intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor)
55+
.advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice])
5656

5757
/**
5858
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
@@ -306,6 +306,7 @@ object PathDirectivesRawPathPrefixInterceptor {
306306
}
307307
}
308308

309+
309310
object Http2BlueprintInterceptor {
310311

311312
case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
@@ -315,10 +316,8 @@ object Http2BlueprintInterceptor {
315316
}
316317

317318
@RuntimeType
318-
def handleWithStreamIdHeader(
319-
@Argument(1) handler: HttpRequest => Future[HttpResponse],
320-
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]
321-
): Flow[HttpRequest, HttpResponse, NotUsed] = {
319+
def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse],
320+
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = {
322321

323322
handler match {
324323
case HandlerWithEndpoint(interface, port, _) =>

instrumentation/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,15 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
6262

6363
/**
6464
* For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow
65-
* creation happen at different times we are wrapping the handler with the interface/port data and reading that
66-
* information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1.
65+
* creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port
66+
* data and reading that information on method exit to wrap it the same way we would for HTTP/1.
6767
*/
6868

6969
onType("akka.http.impl.engine.http2.Http2Ext")
7070
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])
7171

7272
onType("akka.http.impl.engine.http2.Http2Blueprint$")
73-
.intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor)
73+
.advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice])
7474

7575
/**
7676
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
@@ -329,27 +329,3 @@ object PathDirectivesRawPathPrefixInterceptor {
329329
}
330330
}
331331
}
332-
333-
object Http2BlueprintInterceptor {
334-
335-
case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
336-
extends (HttpRequest => Future[HttpResponse]) {
337-
338-
override def apply(request: HttpRequest): Future[HttpResponse] = handler(request)
339-
}
340-
341-
@RuntimeType
342-
def handleWithStreamIdHeader(
343-
@Argument(1) handler: HttpRequest => Future[HttpResponse],
344-
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]
345-
): Flow[HttpRequest, HttpResponse, NotUsed] = {
346-
347-
handler match {
348-
case HandlerWithEndpoint(interface, port, _) =>
349-
ServerFlowWrapper(zuper.call(), interface, port)
350-
351-
case _ =>
352-
zuper.call()
353-
}
354-
}
355-
}

instrumentation/kamon-akka-http/src/main/scala-2.13+/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,22 @@ package kamon.instrumentation.akka.http
33
import java.util.concurrent.Callable
44
import akka.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable, ToResponseMarshaller}
55
import akka.http.scaladsl.model.StatusCodes.Redirection
6-
import akka.http.scaladsl.model.{HttpHeader, HttpRequest, HttpResponse, StatusCode, Uri}
6+
import akka.http.scaladsl.model.{HttpHeader, StatusCode, Uri}
77
import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched}
88
import akka.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet}
99
import akka.http.scaladsl.server.directives.RouteDirectives.reject
1010
import akka.http.scaladsl.server._
1111
import akka.http.scaladsl.server.util.{Tuple, Tupler}
12-
import akka.http.scaladsl.util.FastFuture
1312
import kamon.Kamon
1413
import kamon.instrumentation.akka.http.HasMatchingContext.PathMatchingContext
15-
import kamon.instrumentation.context.{HasContext, InvokeWithCapturedContext}
1614
import kanela.agent.api.instrumentation.InstrumentationBuilder
1715
import kanela.agent.api.instrumentation.mixin.Initializer
1816
import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation._
1917

20-
import scala.concurrent.{Batchable, ExecutionContext, Future, Promise}
21-
import scala.util.control.NonFatal
18+
import scala.concurrent.Future
2219
import scala.util.{Failure, Success, Try}
2320
import java.util.regex.Pattern
24-
import akka.NotUsed
2521
import akka.http.scaladsl.server.RouteResult.Rejected
26-
import akka.stream.scaladsl.Flow
2722
import kamon.context.Context
2823
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic
2924

@@ -46,15 +41,15 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
4641

4742
/**
4843
* For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow
49-
* creation happen at different times we are wrapping the handler with the interface/port data and reading that
50-
* information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1.
44+
* creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port
45+
* data and reading that information on method exit to wrap it the same way we would for HTTP/1.
5146
*
5247
*/
5348
onType("akka.http.impl.engine.http2.Http2Ext")
5449
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])
5550

5651
onType("akka.http.impl.engine.http2.Http2Blueprint$")
57-
.intercept(method("handleWithStreamIdHeader"), classOf[Http2BlueprintInterceptor])
52+
.advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice])
5853

5954
/**
6055
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
@@ -314,28 +309,3 @@ object PathDirectivesRawPathPrefixInterceptor {
314309
}
315310
}
316311
}
317-
318-
class Http2BlueprintInterceptor
319-
object Http2BlueprintInterceptor {
320-
321-
case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
322-
extends (HttpRequest => Future[HttpResponse]) {
323-
324-
override def apply(request: HttpRequest): Future[HttpResponse] = handler(request)
325-
}
326-
327-
@RuntimeType
328-
@static def handleWithStreamIdHeader(
329-
@Argument(1) handler: HttpRequest => Future[HttpResponse],
330-
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]
331-
): Flow[HttpRequest, HttpResponse, NotUsed] = {
332-
333-
handler match {
334-
case HandlerWithEndpoint(interface, port, _) =>
335-
ServerFlowWrapper(zuper.call(), interface, port)
336-
337-
case _ =>
338-
zuper.call()
339-
}
340-
}
341-
}

0 commit comments

Comments
 (0)