Skip to content

Limit calls to Conversions.resolveExecutionProfile #1623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -648,12 +648,13 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
}
} else {
LOG.trace("[{}] Request sent on {}", logPrefix, channel);
if (scheduleSpeculativeExecution && Conversions.resolveIdempotence(statement, context)) {
if (scheduleSpeculativeExecution
&& Conversions.resolveIdempotence(statement, executionProfile)) {
int nextExecution = executionIndex + 1;
// Note that `node` is the first node of the execution, it might not be the "slow" one
// if there were retries, but in practice retries are rare.
long nextDelay =
Conversions.resolveSpeculativeExecutionPolicy(statement, context)
Conversions.resolveSpeculativeExecutionPolicy(context, executionProfile)
.nextExecution(node, keyspace, statement, nextExecution);
if (nextDelay >= 0) {
scheduleSpeculativeExecution(nextExecution, nextDelay);
Expand Down Expand Up @@ -787,12 +788,12 @@ public void onFailure(@NonNull Throwable error) {
cancelTimeout(pageTimeout);
LOG.trace(String.format("[%s] Request failure", logPrefix), error);
RetryVerdict verdict;
if (!Conversions.resolveIdempotence(statement, context)
if (!Conversions.resolveIdempotence(statement, executionProfile)
|| error instanceof FrameTooLongException) {
verdict = RetryVerdict.RETHROW;
} else {
try {
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(statement, context);
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile);
verdict = retryPolicy.onRequestAbortedVerdict(statement, error, retryCount);
} catch (Throwable cause) {
abort(
Expand Down Expand Up @@ -945,7 +946,7 @@ private void processRecoverableError(@NonNull CoordinatorException error) {
assert lock.isHeldByCurrentThread();
NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater();
RetryVerdict verdict;
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(statement, context);
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile);
if (error instanceof ReadTimeoutException) {
ReadTimeoutException readTimeout = (ReadTimeoutException) error;
verdict =
Expand All @@ -964,7 +965,7 @@ private void processRecoverableError(@NonNull CoordinatorException error) {
DefaultNodeMetric.IGNORES_ON_READ_TIMEOUT);
} else if (error instanceof WriteTimeoutException) {
WriteTimeoutException writeTimeout = (WriteTimeoutException) error;
if (Conversions.resolveIdempotence(statement, context)) {
if (Conversions.resolveIdempotence(statement, executionProfile)) {
verdict =
retryPolicy.onWriteTimeoutVerdict(
statement,
Expand Down Expand Up @@ -999,7 +1000,7 @@ private void processRecoverableError(@NonNull CoordinatorException error) {
DefaultNodeMetric.IGNORES_ON_UNAVAILABLE);
} else {
verdict =
Conversions.resolveIdempotence(statement, context)
Conversions.resolveIdempotence(statement, executionProfile)
? retryPolicy.onErrorResponseVerdict(statement, error, retryCount)
: RetryVerdict.RETHROW;
updateErrorMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,12 +557,13 @@ public void operationComplete(Future<java.lang.Void> future) {
cancel();
} else {
inFlightCallbacks.add(this);
if (scheduleNextExecution && Conversions.resolveIdempotence(statement, context)) {
if (scheduleNextExecution
&& Conversions.resolveIdempotence(statement, executionProfile)) {
int nextExecution = execution + 1;
long nextDelay;
try {
nextDelay =
Conversions.resolveSpeculativeExecutionPolicy(statement, context)
Conversions.resolveSpeculativeExecutionPolicy(context, executionProfile)
.nextExecution(node, null, statement, nextExecution);
} catch (Throwable cause) {
// This is a bug in the policy, but not fatal since we have at least one other
Expand Down Expand Up @@ -678,7 +679,7 @@ private void processErrorResponse(Error errorMessage) {
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
setFinalError(statement, error, node, execution);
} else {
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(statement, context);
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile);
RetryVerdict verdict;
if (error instanceof ReadTimeoutException) {
ReadTimeoutException readTimeout = (ReadTimeoutException) error;
Expand All @@ -699,7 +700,7 @@ private void processErrorResponse(Error errorMessage) {
} else if (error instanceof WriteTimeoutException) {
WriteTimeoutException writeTimeout = (WriteTimeoutException) error;
verdict =
Conversions.resolveIdempotence(statement, context)
Conversions.resolveIdempotence(statement, executionProfile)
? retryPolicy.onWriteTimeoutVerdict(
statement,
writeTimeout.getConsistencyLevel(),
Expand Down Expand Up @@ -731,7 +732,7 @@ private void processErrorResponse(Error errorMessage) {
DefaultNodeMetric.IGNORES_ON_UNAVAILABLE);
} else {
verdict =
Conversions.resolveIdempotence(statement, context)
Conversions.resolveIdempotence(statement, executionProfile)
? retryPolicy.onErrorResponseVerdict(statement, error, retryCount)
: RetryVerdict.RETHROW;
updateErrorMetrics(
Expand Down Expand Up @@ -810,12 +811,12 @@ public void onFailure(Throwable error) {
}
LOG.trace("[{}] Request failure, processing: {}", logPrefix, error);
RetryVerdict verdict;
if (!Conversions.resolveIdempotence(statement, context)
if (!Conversions.resolveIdempotence(statement, executionProfile)
|| error instanceof FrameTooLongException) {
verdict = RetryVerdict.RETHROW;
} else {
try {
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(statement, context);
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile);
verdict = retryPolicy.onRequestAbortedVerdict(statement, error, retryCount);
} catch (Throwable cause) {
setFinalError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,29 +535,59 @@ public static CoordinatorException toThrowable(
}
}

/** Use {@link #resolveIdempotence(Request, DriverExecutionProfile)} instead. */
@Deprecated
public static boolean resolveIdempotence(Request request, InternalDriverContext context) {
return resolveIdempotence(request, resolveExecutionProfile(request, context));
}

public static boolean resolveIdempotence(
Request request, DriverExecutionProfile executionProfile) {
Boolean requestIsIdempotent = request.isIdempotent();
DriverExecutionProfile executionProfile = resolveExecutionProfile(request, context);
return (requestIsIdempotent == null)
? executionProfile.getBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE)
: requestIsIdempotent;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach provides a nice optimization for callers which have already resolved the DriverExecutionProfile but it does lead to a lot of code duplication. Seems like if we pursue this path resolveIdempotence(Request, InternalDriverContext) should be replaced with:

 public static boolean resolveIdempotence(Request request, InternalDriverContext context) {
    return resolveIdempotence(request, resolveExecutionProfile(request, context));
  }

Similar argument for resolveRequestTimeout() as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth noting that the implication of the change referenced above (and indeed this entire PR really) is that a user of this API will only see the benefit of reduced resolution of ExecutionProfiles if they use the new API; anyone using the old method (resolveIdempotence(Request, InternalDriverContext) will still see just as many resolution operations as before.

Seems like at a minimum this older method should be marked as deprecated (in favor of the new one) but it doesn't seem like a huge leap from there to say if we're going to go down this path let's just remove the old method and make all callers responsible for managing DriverExecutionProfile as state. This will require some additional work; based on a quick check it looks like the DSE-specific request handlers (such as com.datastax.dse.driver.internal.core.cql.continuous.ContinuousRequestHandlerBase and com.datastax.dse.driver.internal.core.graph.GraphRequestHandler) also use resolveIdempotence(). These calls would also need to be refactored if we choose to remove the old method calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I proposed two fixups for implementing those suggested changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since resolving an DriverExecutionProfile from an InternalDriverContext is expensive enough, I agree with marking the existing methods with Deprecated and maybe even removing them in the next minor version (as the project does not ensure compatibility between versions (reference), but I think it may just be best to keep these methods deprecated.

/** Use {@link #resolveRequestTimeout(Request, DriverExecutionProfile)} instead. */
@Deprecated
public static Duration resolveRequestTimeout(Request request, InternalDriverContext context) {
DriverExecutionProfile executionProfile = resolveExecutionProfile(request, context);
return request.getTimeout() != null
? request.getTimeout()
return resolveRequestTimeout(request, resolveExecutionProfile(request, context));
}

public static Duration resolveRequestTimeout(
Request request, DriverExecutionProfile executionProfile) {
Duration timeout = request.getTimeout();
return timeout != null
? timeout
: executionProfile.getDuration(DefaultDriverOption.REQUEST_TIMEOUT);
}

/** Use {@link #resolveRetryPolicy(InternalDriverContext, DriverExecutionProfile)} instead. */
@Deprecated
public static RetryPolicy resolveRetryPolicy(Request request, InternalDriverContext context) {
DriverExecutionProfile executionProfile = resolveExecutionProfile(request, context);
return context.getRetryPolicy(executionProfile.getName());
}

public static RetryPolicy resolveRetryPolicy(
InternalDriverContext context, DriverExecutionProfile executionProfile) {
return context.getRetryPolicy(executionProfile.getName());
}

/**
* Use {@link #resolveSpeculativeExecutionPolicy(InternalDriverContext, DriverExecutionProfile)}
* instead.
*/
@Deprecated
public static SpeculativeExecutionPolicy resolveSpeculativeExecutionPolicy(
Request request, InternalDriverContext context) {
DriverExecutionProfile executionProfile = resolveExecutionProfile(request, context);
return context.getSpeculativeExecutionPolicy(executionProfile.getName());
}

public static SpeculativeExecutionPolicy resolveSpeculativeExecutionPolicy(
InternalDriverContext context, DriverExecutionProfile executionProfile) {
return context.getSpeculativeExecutionPolicy(executionProfile.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class CqlPrepareHandler implements Throttled {
private final Timeout scheduledTimeout;
private final RequestThrottler throttler;
private final Boolean prepareOnAllNodes;
private final DriverExecutionProfile executionProfile;
private volatile InitialPrepareCallback initialCallback;

// The errors on the nodes that were already tried (lazily initialized on the first error).
Expand All @@ -111,7 +112,7 @@ protected CqlPrepareHandler(
this.initialRequest = request;
this.session = session;
this.context = context;
DriverExecutionProfile executionProfile = Conversions.resolveExecutionProfile(request, context);
executionProfile = Conversions.resolveExecutionProfile(request, context);
this.queryPlan =
context
.getLoadBalancingPolicyWrapper()
Expand All @@ -131,7 +132,7 @@ protected CqlPrepareHandler(
});
this.timer = context.getNettyOptions().getTimer();

Duration timeout = Conversions.resolveRequestTimeout(request, context);
Duration timeout = Conversions.resolveRequestTimeout(request, executionProfile);
this.scheduledTimeout = scheduleTimeout(timeout);
this.prepareOnAllNodes = executionProfile.getBoolean(DefaultDriverOption.PREPARE_ON_ALL_NODES);

Expand Down Expand Up @@ -292,7 +293,7 @@ private CompletionStage<Void> prepareOnOtherNode(PrepareRequest request, Node no
false,
toPrepareMessage(request),
request.getCustomPayload(),
Conversions.resolveRequestTimeout(request, context),
Conversions.resolveRequestTimeout(request, executionProfile),
throttler,
session.getMetricUpdater(),
logPrefix);
Expand Down Expand Up @@ -419,7 +420,7 @@ private void processErrorResponse(Error errorMessage) {
} else {
// Because prepare requests are known to always be idempotent, we call the retry policy
// directly, without checking the flag.
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(request, context);
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile);
RetryVerdict verdict = retryPolicy.onErrorResponseVerdict(request, error, retryCount);
processRetryVerdict(verdict, error);
}
Expand Down Expand Up @@ -457,7 +458,7 @@ public void onFailure(Throwable error) {
LOG.trace("[{}] Request failure, processing: {}", logPrefix, error.toString());
RetryVerdict verdict;
try {
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(request, context);
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile);
verdict = retryPolicy.onRequestAbortedVerdict(request, error, retryCount);
} catch (Throwable cause) {
setFinalError(
Expand Down
Loading