Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Release Notes.
* Add plugin to support ClickHouse JDBC driver (0.3.2.*).
* Refactor kotlin coroutine plugin with CoroutineContext.
* Fix OracleURLParser ignoring actual port when :SID is absent.
* Change gRPC instrumentation point to fix plugin not working for server side.

#### Documentation
* Update docs of Tracing APIs, reorganize the API docs into six parts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

public class AbstractServerImplBuilderInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

public static final String ENHANCE_CLASS = "io.grpc.internal.AbstractServerImplBuilder";
public static final String ENHANCE_METHOD = "addService";
public static final String ENHANCE_METHOD = "build";
public static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.grpc.v1.server.AbstractServerImplBuilderInterceptor";
public static final String ARGUMENT_TYPE = "io.grpc.ServerServiceDefinition";

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
Expand All @@ -47,7 +46,7 @@ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD).and(takesArgumentWithType(0, ARGUMENT_TYPE));
return named(ENHANCE_METHOD).and(takesNoArguments());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

package org.apache.skywalking.apm.plugin.grpc.v1.server;

import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerBuilder;

import java.lang.reflect.Method;

Expand All @@ -34,7 +33,12 @@ public class AbstractServerImplBuilderInterceptor implements InstanceMethodsArou
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) {
allArguments[0] = ServerInterceptors.intercept((ServerServiceDefinition) allArguments[0], new ServerInterceptor());
if (objInst.getSkyWalkingDynamicField() == null) {
ServerBuilder<?> builder = (ServerBuilder) objInst;
ServerInterceptor interceptor = new ServerInterceptor();
builder.intercept(interceptor);
objInst.setSkyWalkingDynamicField(interceptor);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,27 @@

package org.apache.skywalking.apm.plugin.grpc.v1.server;

import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.util.StringUtil;

import static org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil.formatOperationName;

public class ServerInterceptor implements io.grpc.ServerInterceptor {

static final Context.Key<ContextSnapshot> CONTEXT_SNAPSHOT_KEY = Context.key("skywalking-grpc-context-snapshot");
static final Context.Key<AbstractSpan> ACTIVE_SPAN_KEY = Context.key("skywalking-grpc-active-span");

@Override
public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall<REQUEST, RESPONSE> call,
Metadata headers, ServerCallHandler<REQUEST, RESPONSE> handler) {
Expand All @@ -38,7 +51,26 @@ public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall
next.setHeadValue(contextValue);
}
}
return new TracingServerCallListener<>(handler.startCall(new TracingServerCall<>(call), headers), call
.getMethodDescriptor(), contextCarrier);

final AbstractSpan span = ContextManager
.createEntrySpan(formatOperationName(call.getMethodDescriptor()), contextCarrier);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
ContextSnapshot contextSnapshot = ContextManager.capture();
AbstractSpan asyncSpan = span.prepareForAsync();

Context context = Context.current().withValues(CONTEXT_SNAPSHOT_KEY, contextSnapshot, ACTIVE_SPAN_KEY, asyncSpan);

ServerCall.Listener<REQUEST> listener = Contexts.interceptCall(
context,
new TracingServerCall<>(call),
headers,
(serverCall, metadata) -> new TracingServerCallListener<>(
handler.startCall(serverCall, metadata),
serverCall.getMethodDescriptor()
)
);
ContextManager.stopSpan(asyncSpan);
return listener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void sendMessage(RESPONSE message) {
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_MESSAGE_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);

ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
try {
super.sendMessage(message);
} catch (Throwable t) {
Expand All @@ -68,6 +68,7 @@ public void close(Status status, Metadata trailers) {
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_CLOSE_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
switch (status.getCode()) {
case OK:
break;
Expand All @@ -93,6 +94,7 @@ public void close(Status status, Metadata trailers) {
break;
}
Tags.RPC_RESPONSE_STATUS_CODE.set(span, status.getCode().name());
ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish();

try {
super.close(status, trailers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import io.grpc.ForwardingServerCallListener;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
Expand All @@ -36,18 +34,11 @@
public class TracingServerCallListener<REQUEST> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<REQUEST> {
private final MethodDescriptor.MethodType methodType;
private final String operationPrefix;
private final String operation;
private final ContextCarrier contextCarrier;

private AbstractSpan asyncSpan;
private ContextSnapshot contextSnapshot;

protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor, ContextCarrier contextCarrier) {
protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor) {
super(delegate);
this.methodType = descriptor.getType();
this.operationPrefix = OperationNameFormatUtil.formatOperationName(descriptor) + SERVER;
this.operation = OperationNameFormatUtil.formatOperationName(descriptor);
this.contextCarrier = contextCarrier;
}

@Override
Expand All @@ -57,7 +48,7 @@ public void onMessage(REQUEST message) {
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_MESSAGE_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
ContextManager.continued(contextSnapshot);
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
try {
super.onMessage(message);
} catch (Throwable t) {
Expand All @@ -73,21 +64,18 @@ public void onMessage(REQUEST message) {

@Override
public void onCancel() {
if (contextSnapshot == null) {
return;
}
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_CANCEL_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
ContextManager.continued(contextSnapshot);
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
try {
super.onCancel();
} catch (Throwable t) {
ContextManager.activeSpan().log(t);
throw t;
} finally {
ContextManager.stopSpan();
asyncSpan.asyncFinish();
ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish();
}
}

Expand All @@ -96,7 +84,7 @@ public void onHalfClose() {
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_HALF_CLOSE_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
ContextManager.continued(contextSnapshot);
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
try {
super.onHalfClose();
} catch (Throwable t) {
Expand All @@ -110,18 +98,10 @@ public void onHalfClose() {
@Override
public void onComplete() {
super.onComplete();
asyncSpan.asyncFinish();
}

@Override
public void onReady() {
final AbstractSpan span = ContextManager.createEntrySpan(operation, contextCarrier);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
contextSnapshot = ContextManager.capture();
asyncSpan = span.prepareForAsync();
ContextManager.stopSpan(asyncSpan);

super.onReady();
}
}