Skip to content

Add ExecutionInfo to RequestTracker methods #1640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ private class NodeResponseCallback
// Coordinates concurrent accesses between the client and I/O threads
private final ReentrantLock lock = new ReentrantLock();

// The execution info for passing to the request tracker.
private ExecutionInfo executionInfo;

// The page queue, storing responses that we have received and have not been consumed by the
// client yet. We instantiate it lazily to avoid unnecessary allocation; this is also used to
// check if the callback ever tried to enqueue something.
Expand Down Expand Up @@ -1445,7 +1448,8 @@ private void trackNodeError(@NonNull Node node, @NonNull Throwable error) {
long latencyNanos = System.nanoTime() - this.messageStartTimeNanos;
context
.getRequestTracker()
.onNodeError(this.statement, error, latencyNanos, executionProfile, node, logPrefix);
.onNodeError(
this.statement, error, latencyNanos, executionProfile, node, logPrefix, null);
}
}

Expand Down Expand Up @@ -1562,18 +1566,21 @@ private void completeResultSetFuture(
if (nodeSuccessReported.compareAndSet(false, true)) {
context
.getRequestTracker()
.onNodeSuccess(statement, nodeLatencyNanos, executionProfile, node, logPrefix);
.onNodeSuccess(
statement, nodeLatencyNanos, executionProfile, node, logPrefix, executionInfo);
}
context
.getRequestTracker()
.onSuccess(statement, totalLatencyNanos, executionProfile, node, logPrefix);
.onSuccess(
statement, totalLatencyNanos, executionProfile, node, logPrefix, executionInfo);
}
} else {
Throwable error = (Throwable) pageOrError;
if (future.completeExceptionally(error)) {
context
.getRequestTracker()
.onError(statement, error, totalLatencyNanos, executionProfile, node, logPrefix);
.onError(
statement, error, totalLatencyNanos, executionProfile, node, logPrefix, null);
if (error instanceof DriverTimeoutException) {
throttler.signalTimeout(ContinuousRequestHandlerBase.this);
session
Expand All @@ -1590,18 +1597,20 @@ private void completeResultSetFuture(
private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Frame response) {
ByteBuffer pagingState =
result instanceof Rows ? ((Rows) result).getMetadata().pagingState : null;
return new DefaultExecutionInfo(
statement,
node,
startedSpeculativeExecutionsCount.get(),
executionIndex,
errors,
pagingState,
response,
true,
session,
context,
executionProfile);
this.executionInfo =
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an instance variable here for the ExecutionInfo. It gets populated and later passed to the RequestTracker in completeResultSetFuture.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this will always work correctly - the executionInfo field on the NodeResponseCallback is populated when the response is processed in processResultResponse, but before being used when firing the request tracker methods in completeResultSetFuture, responses may be queued. If another request comes in before the earlier response is removed from the queue the executionInfo field will be overwritten. When the earlier response is later removed from the queue, the execution info provided to the request tracker could be the one from the newer request.

Instead of a field attached to the NodeResponseCallback, perhaps you could use the execution info stored on the resultSet object in processResultResponse? - it's a shame there isn't a common interface (perhaps a useful improvement to file for another time) but it looks like both ContinuousAsyncResultSet and AsyncGraphResultSet have getters for ExecutionInfo objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On further thought, since you only have an Object at this stage, I don't know how you would get an execution info for the request tracker call if you end up with a something that isn't a ContinuousAsyncResultSet or AsyncGraphResultSet so perhaps the solution I suggested above isn't going to work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologize for calling out details you no doubt already know off the top of your head, it's just for my own benefit being new to the code.

This is a bit of a pickle. ResultSetT is just an Object and it looks like just sharpening up the definition of that type parameter is impossible because there's not a unified type structure for the different kinds of results in the internal API.

I could add a type check and cast for both AsyncPagingIterable and AsyncGraphResultSet which both provide a getExecutionInfo method.

If that works, then we might want to go a bit further and refactor a bit to add a marking interface or something to prevent other types from being passed in, but that seems like an extensive change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the ExecutionInfo instance variable and added type checks and casts for both AsyncPagingIterable and AsyncGraphResultSet to get the ExecutionInfo from there.

I attempted to run the ContinuousPagingIT test with DSE to validate, but I'm having trouble with CCM. The CCM node fails to start with this error:

Cannot start node if snitch's data center (dc1) differs from previous data center (Cassandra)

I'm setting these Java options:

-Dccm.dse=true -Dccm.version=6.8.38

Any tips for running the integration test locally are appreciated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit of work to set up. I ran all the tests against this branch, and it passed all the tests.

new DefaultExecutionInfo(
statement,
node,
startedSpeculativeExecutionsCount.get(),
executionIndex,
errors,
pagingState,
response,
true,
session,
context,
executionProfile);
return executionInfo;
}

private void logTimeoutSchedulingError(IllegalStateException timeoutError) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,19 @@ private void setFinalResult(
totalLatencyNanos = completionTimeNanos - startTimeNanos;
long nodeLatencyNanos = completionTimeNanos - callback.nodeStartTimeNanos;
requestTracker.onNodeSuccess(
callback.statement, nodeLatencyNanos, executionProfile, callback.node, logPrefix);
callback.statement,
nodeLatencyNanos,
executionProfile,
callback.node,
logPrefix,
executionInfo);
requestTracker.onSuccess(
callback.statement, totalLatencyNanos, executionProfile, callback.node, logPrefix);
callback.statement,
totalLatencyNanos,
executionProfile,
callback.node,
logPrefix,
executionInfo);
}
if (sessionMetricUpdater.isEnabled(
DseSessionMetric.GRAPH_REQUESTS, executionProfile.getName())) {
Expand Down Expand Up @@ -444,27 +454,28 @@ private void setFinalError(
GraphStatement<?> statement, Throwable error, Node node, int execution) {
DriverExecutionProfile executionProfile =
Conversions.resolveExecutionProfile(statement, context);
ExecutionInfo executionInfo =
new DefaultExecutionInfo(
statement,
node,
startedSpeculativeExecutionsCount.get(),
execution,
errors,
null,
null,
true,
session,
context,
executionProfile);
if (error instanceof DriverException) {
((DriverException) error)
.setExecutionInfo(
new DefaultExecutionInfo(
statement,
node,
startedSpeculativeExecutionsCount.get(),
execution,
errors,
null,
null,
true,
session,
context,
executionProfile));
((DriverException) error).setExecutionInfo(executionInfo);
}
if (result.completeExceptionally(error)) {
cancelScheduledTasks();
if (!(requestTracker instanceof NoopRequestTracker)) {
long latencyNanos = System.nanoTime() - startTimeNanos;
requestTracker.onError(statement, error, latencyNanos, executionProfile, node, logPrefix);
requestTracker.onError(
statement, error, latencyNanos, executionProfile, node, logPrefix, executionInfo);
}
if (error instanceof DriverTimeoutException) {
throttler.signalTimeout(this);
Expand Down Expand Up @@ -856,7 +867,8 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan
nodeResponseTimeNanos = System.nanoTime();
}
long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos;
requestTracker.onNodeError(statement, error, latencyNanos, executionProfile, node, logPrefix);
requestTracker.onNodeError(
statement, error, latencyNanos, executionProfile, node, logPrefix, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datastax.oss.driver.api.core.tracker;

import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
Expand All @@ -35,7 +36,7 @@ public interface RequestTracker extends AutoCloseable {

/**
* @deprecated This method only exists for backward compatibility. Override {@link
* #onSuccess(Request, long, DriverExecutionProfile, Node, String)} instead.
* #onSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} instead.
*/
@Deprecated
default void onSuccess(
Expand All @@ -44,6 +45,21 @@ default void onSuccess(
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node) {}

/**
* @deprecated This method only exists for backward compatibility. Override {@link
* #onSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} instead.
*/
@Deprecated
default void onSuccess(
@NonNull Request request,
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
@NonNull String requestLogPrefix) {
// If client doesn't override onSuccess with requestLogPrefix delegate call to the old method
onSuccess(request, latencyNanos, executionProfile, node);
}

/**
* Invoked each time a request succeeds.
*
Expand All @@ -52,20 +68,23 @@ default void onSuccess(
* @param executionProfile the execution profile of this request.
* @param node the node that returned the successful response.
* @param requestLogPrefix the dedicated log prefix for this request
* @param executionInfo the execution info containing the results of this request
*/
default void onSuccess(
@NonNull Request request,
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
@NonNull String requestLogPrefix) {
// If client doesn't override onSuccess with requestLogPrefix delegate call to the old method
onSuccess(request, latencyNanos, executionProfile, node);
@NonNull String requestLogPrefix,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small request: would it be possible to keep the log prefix as the last parameter? This has been kind of a tacit convention so far.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to make requestLogPrefix the last arg in all the methods.

@NonNull ExecutionInfo executionInfo) {
// If client doesn't override onSuccess with executionInfo delegate call to the old method
Copy link
Contributor

@SiyaoIsHiding SiyaoIsHiding Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukasz-antoniak and I thought we don't need the parameter request, executionProfile, and node here, because they are all contained in executionInfo object. So we can change it to:

  default void onSuccess(
      long latencyNanos,
      @NonNull ExecutionInfo executionInfo,
      @NonNull String requestLogPrefix) {
    onSuccess(executionInfo.getRequest(), latencyNanos, executionInfo.getExecutionProfile(), executionInfo.getCoordinator(), requestLogPrefix);
  }

If we leave it like it is, the user may be confused if the ones in the parameters and the ones in exeuctionInfo Object are not consistent.
@absurdfarce @adutra How do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. As long as we retain and deprecate the old method signatures for backwards-compatability, I'm good.

Copy link
Contributor

@tolbertam tolbertam Jun 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, I like the concise method signature as well!

Seeing onSuccess(long latencyNanos, @NonNull ExecutionInfo executionInfo, @NonNull String requestLogPrefix) makes me wonder why we couldn't just put latency on ExecutionInfo? That seems like it could be useful for those consuming ExecutionInfo on their result set.

I think it's fine as proposed though 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may consider putting requestLogPrefix into ExecutionInfo, too.

onSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix);
}

/**
* @deprecated This method only exists for backward compatibility. Override {@link
* #onError(Request, Throwable, long, DriverExecutionProfile, Node, String)} instead.
* #onError(Request, Throwable, long, DriverExecutionProfile, Node, String, ExecutionInfo)}
* instead.
*/
@Deprecated
default void onError(
Expand All @@ -75,6 +94,23 @@ default void onError(
@NonNull DriverExecutionProfile executionProfile,
@Nullable Node node) {}

/**
* @deprecated This method only exists for backward compatibility. Override {@link
* #onError(Request, Throwable, long, DriverExecutionProfile, Node, String, ExecutionInfo)}
* instead.
*/
@Deprecated
default void onError(
@NonNull Request request,
@NonNull Throwable error,
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@Nullable Node node,
@NonNull String requestLogPrefix) {
// If client doesn't override onError with requestLogPrefix delegate call to the old method
onError(request, error, latencyNanos, executionProfile, node);
}

/**
* Invoked each time a request fails.
*
Expand All @@ -83,21 +119,25 @@ default void onError(
* @param executionProfile the execution profile of this request.
* @param node the node that returned the error response, or {@code null} if the error occurred
* @param requestLogPrefix the dedicated log prefix for this request
* @param executionInfo the execution info being returned to the client for this request if
* available
*/
default void onError(
@NonNull Request request,
@NonNull Throwable error,
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@Nullable Node node,
@NonNull String requestLogPrefix) {
// If client doesn't override onError with requestLogPrefix delegate call to the old method
onError(request, error, latencyNanos, executionProfile, node);
@NonNull String requestLogPrefix,
@Nullable ExecutionInfo executionInfo) {
// delegate call to the old method
onError(request, error, latencyNanos, executionProfile, node, requestLogPrefix);
}

/**
* @deprecated This method only exists for backward compatibility. Override {@link
* #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, String)} instead.
* #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, String,
* ExecutionInfo)} instead.
*/
@Deprecated
default void onNodeError(
Expand All @@ -107,6 +147,23 @@ default void onNodeError(
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node) {}

/**
* @deprecated This method only exists for backward compatibility. Override {@link
* #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, String,
* ExecutionInfo)} instead.
*/
@Deprecated
default void onNodeError(
@NonNull Request request,
@NonNull Throwable error,
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
@NonNull String requestLogPrefix) {
// If client doesn't override onNodeError with requestLogPrefix delegate call to the old method
onNodeError(request, error, latencyNanos, executionProfile, node);
}

/**
* Invoked each time a request fails at the node level. Similar to {@link #onError(Request,
* Throwable, long, DriverExecutionProfile, Node, String)} but at a per node level.
Expand All @@ -116,21 +173,24 @@ default void onNodeError(
* @param executionProfile the execution profile of this request.
* @param node the node that returned the error response.
* @param requestLogPrefix the dedicated log prefix for this request
* @param executionInfo the execution info containing the results of this request if available
*/
default void onNodeError(
@NonNull Request request,
@NonNull Throwable error,
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
@NonNull String requestLogPrefix) {
@NonNull String requestLogPrefix,
@Nullable ExecutionInfo executionInfo) {
// If client doesn't override onNodeError with requestLogPrefix delegate call to the old method
onNodeError(request, error, latencyNanos, executionProfile, node);
onNodeError(request, error, latencyNanos, executionProfile, node, requestLogPrefix);
}

/**
* @deprecated This method only exists for backward compatibility. Override {@link
* #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String)} instead.
* #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)}
* instead.
*/
@Deprecated
default void onNodeSuccess(
Expand All @@ -139,6 +199,23 @@ default void onNodeSuccess(
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node) {}

/**
* @deprecated This method only exists for backward compatibility. Override {@link
* #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)}
* instead.
*/
@Deprecated
default void onNodeSuccess(
@NonNull Request request,
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
@NonNull String requestLogPrefix) {
// If client doesn't override onNodeSuccess with requestLogPrefix delegate call to the old
// method
onNodeSuccess(request, latencyNanos, executionProfile, node);
}

/**
* Invoked each time a request succeeds at the node level. Similar to {@link #onSuccess(Request,
* long, DriverExecutionProfile, Node, String)} but at per node level.
Expand All @@ -148,16 +225,17 @@ default void onNodeSuccess(
* @param executionProfile the execution profile of this request.
* @param node the node that returned the successful response.
* @param requestLogPrefix the dedicated log prefix for this request
* @param executionInfo the execution info containing the results of this request
*/
default void onNodeSuccess(
@NonNull Request request,
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
@NonNull String requestLogPrefix) {
// If client doesn't override onNodeSuccess with requestLogPrefix delegate call to the old
// method
onNodeSuccess(request, latencyNanos, executionProfile, node);
@NonNull String requestLogPrefix,
@NonNull ExecutionInfo executionInfo) {
// delegate call to the old method
onNodeSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like onSuccess(), onError() and onNodeSuccess() are updated here. We should probably update onNodeError() as well in order to keep the API internally consistent. I don't know that it's actually used in existing driver code beyond MultiplexingRequestTracker but it's still a good idea to keep the API internally consistent.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, from what I could tell, in most error cases no ExecutionInfo is present (but I'm still new to the driver code). I've started going through it again and I think CqlRequestHandler can be refactored to pass it in for some cases. I'll add the new arg without the NonNull annotation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CqlRequestHandler is a bit tangled - there's a NodeResponseCallback inner class with a trackNodeError method and on the containing class, there's a setFinalError method where the ExecutionInfo is created.
Ideally, setFinalError could call RequestTracker.setNodeError with the ExecutionInfo, but that's already being done in NodeResponseCallback.trackNodeError so I'm thinking it's appropriate to move the trackNodeError method to the containing class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that CqlRequestHandler does get a little tangled; async code certainly will do that if you don't keep an eye on it :).

My comment wasn't connected to the current implementation of CqlRequestHandler, though. Individual request handlers are created by the various request processors. These processors vary widely in implementation, and that's just for the ones we've written; they're defined by an interface so users are always free to write their own! With that in mind I'd say it's important to think about RequestTracker as a general interface which may (or may not) be used by any given handler for any given processor.

With that in mind, I'd argue it's important for RequestTracker to present a coherent interface that's consistent across all the on*() handler methods unless there's a very good reason to diverge. There doesn't appear to be any such reason in this case, thus my recommendation to update onNodeError() to keep things consistent.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree on keeping the interface consistent.

I've been going through the handlers and for the most part, calls to onNodeError with a null ExecutionInfo are feasible, so that's a good first step. Should a larger refactoring effort to route the ExecutionInfo to those methods be postponed? I suppose a good argument against the refactoring work is that the node-level events in the interface can be considered "in-progress" requests and the final ExecutionInfo instance will be delivered in onSuccess.

The only place I see where we can't honor the NonNull contract for onSuccess without significant changes is ContinuousRequestHandlerBase.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out a solution in ContinuousRequestHandlerBase.
https://github.com/datastax/java-driver/pull/1640/files#r1247216557

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this in CqlRequestHandler.NodeResponseCallback.trackNodeError?

    private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNanos) {
      if (requestTracker instanceof NoopRequestTracker) {
        return;
      }
      if (nodeResponseTimeNanos == NANOTIME_NOT_MEASURED_YET) {
        nodeResponseTimeNanos = System.nanoTime();
      }
      long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos;
      ExecutionInfo executionInfo =
              new DefaultExecutionInfo(
                      statement,
                      node,
                      startedSpeculativeExecutionsCount.get(),
                      execution,
                      errors,
                      null,
                      null,
                      true,
                      session,
                      context,
                      executionProfile);
      requestTracker.onNodeError(
          statement, error, latencyNanos, executionProfile, node, logPrefix, executionInfo);
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also explored how to pass in ExecutionInfo from ContinuousRequestHandlerBase, see here. How do you think of this approach?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to ContinuousRequestHandlerBase merged. Thank you.

}

/**
Expand Down
Loading