Skip to content

Commit 86a2cb7

Browse files
committed
#56: Support async/await in Rust -> Java direction.
Implemented the invoke_async function that returns a Future, which is completed via the Receiver of a oneshot channel. In Java side, the methods that are allowed to be invoked by invoke_async, are the ones that return a Java Future. When the Java Future completes, the Java side of j4rs invokes native Rust code that completes the Rust pending Future with either success or failure, using the Sender of the oneshot channel that was created when the invoke_async was called. Please note that it is best that the Java methods that are invoked by invoke_async function return a CompletableFuture, as this improves performance. j4rs handles simple Java Futures with polling, using an internal one-threaded ScheduledExecutorService. This has apparent performance issues.
1 parent d74c8fb commit 86a2cb7

18 files changed

+603
-139
lines changed

java/src/main/java/org/astonbitecode/j4rs/api/Instance.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,19 @@ public interface Instance<T> extends ObjectValue, JsonValue {
4141

4242
/**
4343
* Invokes asynchronously a method of the instance of the class that is set for this {@link Instance}.
44-
* The result of the invocation should be provided later using the performCallback method of a {@link org.astonbitecode.j4rs.api.invocation.NativeCallbackSupport} class.
45-
* Any possible returned objects from the actual synchronous invocation of the defined method will be dropped.
44+
* The result of the invocation must be a {@link java.util.concurrent.Future}.
45+
* When the Future returned from the invocation completes, j4rs will invoke native Rust code to either send a success value or a failure.
46+
* <p>
47+
* Please note that it is best that this function returns a {@link java.util.concurrent.CompletableFuture}, as this improves performance.
48+
* j4rs handles simple {@link java.util.concurrent.Future}s with polling using an internal {@link java.util.concurrent.ScheduledExecutorService} with one thread and this has apparent performance issues.
49+
* You may have a look at {@link org.astonbitecode.j4rs.api.async.J4rsPolledFuture} for more details.
4650
*
47-
* @param functionPointerAddress The address of the function pointer that will be used later in the native side in order to actually paerform the callback.
51+
* @param functionPointerAddress The address of the function pointer that will be used when the {@link java.util.concurrent.Future} completes, in the native side, in order to actually perform the callback
52+
* and complete a Future that is created in Rust and awaits for the Java Future to complete.
4853
* @param methodName The method name
4954
* @param args The arguments to use when invoking the callback method (the functionPointer)
5055
*/
51-
void invokeAsync(long functionPointerAddress, String methodName, InvocationArg... args);
56+
void invokeAsyncToChannel(long functionPointerAddress, String methodName, InvocationArg... args);
5257

5358
/**
5459
* Invokes a method of the instance of the class that is set for this {@link Instance}.
@@ -108,7 +113,7 @@ static <T> Instance cloneInstance(Instance from) {
108113
default T getOrDeserializeJavaObject() {
109114
boolean isSerialized = false;
110115
if (InvocationArg.class.isAssignableFrom(this.getClass())) {
111-
isSerialized = ((InvocationArg)this).isSerialized();
116+
isSerialized = ((InvocationArg) this).isSerialized();
112117
}
113118
if (!isSerialized) {
114119
return (T) this.getObject();
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2023 astonbitecode
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
package org.astonbitecode.j4rs.api.async;
16+
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.ScheduledExecutorService;
19+
import java.util.concurrent.TimeUnit;
20+
21+
class J4rsAsyncContext {
22+
private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();
23+
24+
static void schedule(Runnable r) {
25+
SERVICE.schedule(r, 10, TimeUnit.NANOSECONDS);
26+
}
27+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2023 astonbitecode
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
package org.astonbitecode.j4rs.api.async;
16+
17+
import java.util.concurrent.CompletableFuture;
18+
import java.util.concurrent.ExecutionException;
19+
import java.util.concurrent.Future;
20+
21+
/**
22+
* A {@link CompletableFuture} that completes by polling a {@link Future}.
23+
* @param <T>
24+
*/
25+
public class J4rsPolledFuture<T> extends CompletableFuture<T> {
26+
private Future<T> future;
27+
28+
public J4rsPolledFuture(Future<T> future) {
29+
this.future = future;
30+
J4rsAsyncContext.schedule(this::tryToComplete);
31+
}
32+
33+
private void tryToComplete() {
34+
if (future.isDone()) {
35+
try {
36+
complete(future.get());
37+
} catch (InterruptedException error) {
38+
completeExceptionally(error);
39+
} catch (ExecutionException error) {
40+
completeExceptionally(error.getCause());
41+
}
42+
return;
43+
}
44+
45+
if (future.isCancelled()) {
46+
cancel(true);
47+
return;
48+
}
49+
50+
J4rsAsyncContext.schedule(this::tryToComplete);
51+
}
52+
}

java/src/main/java/org/astonbitecode/j4rs/api/dtos/InvocationArg.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ public Instance invokeStatic(String methodName, InvocationArg... args) {
126126
}
127127

128128
@Override
129-
public void invokeAsync(long functionPointerAddress, String methodName, InvocationArg... args) {
129+
public void invokeAsyncToChannel(long channelAddress, String methodName, InvocationArg... args) {
130130
if (getInstance() != null) {
131-
getInstance().invokeAsync(functionPointerAddress, methodName, args);
131+
getInstance().invokeAsyncToChannel(channelAddress, methodName, args);
132132
}
133133
}
134134

java/src/main/java/org/astonbitecode/j4rs/api/invocation/EagerJsonInvocationImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public Instance invokeStatic(String methodName, InvocationArg... arg) {
4242
}
4343

4444
@Override
45-
public void invokeAsync(long functionPointer, String methodName, InvocationArg... args) {
45+
public void invokeAsyncToChannel(long channelAddress, String methodName, InvocationArg... args) {
4646
throw new RuntimeException("Not implemented yet. Please use the JsonInvocationImpl instead");
4747
}
4848

java/src/main/java/org/astonbitecode/j4rs/api/invocation/JavaFxInvocation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public Instance invokeStatic(String methodName, InvocationArg... args) {
6060
}
6161

6262
@Override
63-
public void invokeAsync(long functionPointerAddress, String methodName, InvocationArg... args) {
64-
Platform.runLater(() -> jsonInvocation.invokeAsync(functionPointerAddress, methodName, args));
63+
public void invokeAsyncToChannel(long channelAddress, String methodName, InvocationArg... args) {
64+
Platform.runLater(() -> jsonInvocation.invokeAsyncToChannel(channelAddress, methodName, args));
6565
}
6666

6767
@Override

java/src/main/java/org/astonbitecode/j4rs/api/invocation/JsonInvocationImpl.java

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import org.astonbitecode.j4rs.api.Instance;
1818
import org.astonbitecode.j4rs.api.JsonValue;
19+
import org.astonbitecode.j4rs.api.async.J4rsPolledFuture;
1920
import org.astonbitecode.j4rs.api.dtos.GeneratedArg;
2021
import org.astonbitecode.j4rs.api.dtos.InvocationArg;
2122
import org.astonbitecode.j4rs.api.dtos.InvocationArgGenerator;
@@ -34,6 +35,8 @@
3435
import java.util.HashSet;
3536
import java.util.List;
3637
import java.util.Set;
38+
import java.util.concurrent.CompletableFuture;
39+
import java.util.concurrent.Future;
3740
import java.util.stream.Collectors;
3841

3942
public class JsonInvocationImpl<T> implements Instance<T> {
@@ -65,7 +68,6 @@ public JsonInvocationImpl(T instance, Class<T> clazz, List<Type> classGenTypes)
6568

6669
@Override
6770
public Instance invoke(String methodName, InvocationArg... args) {
68-
// Invoke the instance
6971
try {
7072
CreatedInstance createdInstance = invokeMethod(methodName, gen.generateArgObjects(args));
7173
return InstanceGenerator.create(createdInstance.object, createdInstance.clazz, createdInstance.classGenTypes);
@@ -85,18 +87,32 @@ public Instance invokeStatic(String methodName, InvocationArg... args) {
8587
}
8688

8789
@Override
88-
public void invokeAsync(long functionPointerAddress, String methodName, InvocationArg... args) {
89-
// Check that the class of the invocation extends the NativeCallbackSupport
90-
if (!NativeCallbackSupport.class.isAssignableFrom(this.clazz)) {
91-
throw new InvocationException("Cannot invoke asynchronously the class " + this.clazz.getName() + ". The class does not extend the class " + NativeCallbackSupport.class.getName());
92-
} else {
93-
// Initialize the pointer
94-
((NativeCallbackSupport) object).initPointer(new RustPointer(functionPointerAddress));
95-
// Invoke (any possible returned objects will be dropped)
96-
invoke(methodName, args);
90+
public void invokeAsyncToChannel(final long channelAddress, final String methodName, final InvocationArg... args) {
91+
try {
92+
NativeCallbackToRustFutureSupport callback = newCallbackForAsyncToChannel(channelAddress);
93+
invokeAsyncMethod(methodName, gen.generateArgObjects(args)).handle((o, t) -> {
94+
if (o != null) {
95+
callback.doCallbackSuccess(o);
96+
} else if (t != null) {
97+
callback.doCallbackFailure(t);
98+
} else {
99+
String message = String.format("Error while handling the future returned by the invocation of method %s of class %s",
100+
methodName, getObjectClassName());
101+
throw new InvocationException(message);
102+
}
103+
return Void.TYPE;
104+
});
105+
} catch (Exception error) {
106+
throw new InvocationException("While invoking method " + methodName + " of Class " + getObjectClassName(), error);
97107
}
98108
}
99109

110+
NativeCallbackToRustFutureSupport newCallbackForAsyncToChannel(long channelAddress) {
111+
NativeCallbackToRustFutureSupport callback = new NativeCallbackToRustFutureSupport();
112+
callback.initPointer(new RustPointer(channelAddress));
113+
return callback;
114+
}
115+
100116
@Override
101117
public void invokeToChannel(long channelAddress, String methodName, InvocationArg... args) {
102118
initializeCallbackChannel(channelAddress);
@@ -185,6 +201,56 @@ CreatedInstance invokeMethod(String methodName, GeneratedArg[] generatedArgs) th
185201
return new CreatedInstance(invokedMethodReturnType, returnedObject, retClassGenTypes);
186202
}
187203

204+
CompletableFuture<Object> invokeAsyncMethod(String methodName, GeneratedArg[] generatedArgs) throws Exception {
205+
Class[] argTypes = Arrays.stream(generatedArgs)
206+
.map(invGeneratedArg -> {
207+
try {
208+
return invGeneratedArg.getClazz();
209+
} catch (Exception error) {
210+
throw new InvocationException("Cannot parse the parameter types while invoking async method", error);
211+
}
212+
})
213+
.toArray(size -> new Class[size]);
214+
Object[] argObjects = Arrays.stream(generatedArgs)
215+
.map(invGeneratedArg -> {
216+
try {
217+
return invGeneratedArg.getObject();
218+
} catch (Exception error) {
219+
throw new InvocationException("Cannot parse the parameter objects while invoking async method", error);
220+
}
221+
})
222+
.toArray(size -> new Object[size]);
223+
224+
Method methodToInvoke = findMethodInHierarchy(this.clazz, methodName, argTypes);
225+
List<Type> retClassGenTypes = new ArrayList<>();
226+
227+
Type returnType = methodToInvoke.getGenericReturnType();
228+
229+
if (returnType instanceof ParameterizedType) {
230+
ParameterizedType type = (ParameterizedType) returnType;
231+
retClassGenTypes = Arrays.asList(type.getActualTypeArguments());
232+
}
233+
234+
CompletableFuture<Object> future;
235+
Class<?> invokedMethodReturnType = methodToInvoke.getReturnType();
236+
if (!invokedMethodReturnType.isAssignableFrom(Future.class)) {
237+
String message = String.format("Attempted to asynchronously invoke method %s of class %s that returns %s instead of returning Future",
238+
methodName,
239+
this.clazz.getName(),
240+
returnType.getTypeName());
241+
throw new InvocationException(message);
242+
}
243+
Future<Object> invocationReturnedFuture = (Future<Object>) methodToInvoke.invoke(this.object, argObjects);
244+
245+
if (invocationReturnedFuture instanceof CompletableFuture) {
246+
future = (CompletableFuture<Object>) invocationReturnedFuture;
247+
} else {
248+
future = new J4rsPolledFuture<>(invocationReturnedFuture);
249+
}
250+
251+
return future;
252+
}
253+
188254
Method findMethodInHierarchy(Class clazz, String methodName, Class[] argTypes) throws NoSuchMethodException {
189255
// Get the declared and methods defined in the interfaces of the class.
190256
Set<Method> methods = new HashSet<>(Arrays.asList(clazz.getDeclaredMethods()));

java/src/main/java/org/astonbitecode/j4rs/api/invocation/NativeCallbackSupport.java

Lines changed: 0 additions & 51 deletions
This file was deleted.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2023 astonbitecode
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
package org.astonbitecode.j4rs.api.invocation;
16+
17+
import org.astonbitecode.j4rs.api.Instance;
18+
import org.astonbitecode.j4rs.errors.InvocationException;
19+
import org.astonbitecode.j4rs.rust.RustPointer;
20+
21+
import java.io.PrintWriter;
22+
import java.io.StringWriter;
23+
import java.util.Optional;
24+
25+
/**
26+
* Performs native callbacks to Rust channels that are transformed to Rust Futures
27+
*/
28+
class NativeCallbackToRustFutureSupport {
29+
private static native int docallbacktochannel(long channelPointerAddress, Instance inv);
30+
private static native int failcallbacktochannel(long channelPointerAddress, String stacktrace);
31+
32+
private Optional<RustPointer> channelPointerOpt = Optional.empty();
33+
34+
static void initialize(String libname) {
35+
try {
36+
System.loadLibrary(libname);
37+
} catch (UnsatisfiedLinkError error) {
38+
System.err.println("The Callbacks are not initialized because the j4rs lib was not found. You may ignore this error if you don't use callbacks.");
39+
error.printStackTrace();
40+
}
41+
}
42+
43+
/**
44+
* Perform a callback to signal successful operation
45+
*
46+
* @param obj The {@link Object} to pass in the callback.
47+
*/
48+
public void doCallbackSuccess(Object obj) {
49+
if (channelPointerOpt.isPresent() && obj != null) {
50+
docallbacktochannel(channelPointerOpt.get().getAddress(), InstanceGenerator.create(obj, obj.getClass()));
51+
} else {
52+
throw new InvocationException("Cannot do callback. Please make sure that you don't try to access this method while being in the constructor of your class (that extends NativeCallbackSupport)");
53+
}
54+
}
55+
56+
/**
57+
* Perform a callback to signal failure
58+
* @param error The error
59+
*/
60+
public void doCallbackFailure(Throwable error) {
61+
if (channelPointerOpt.isPresent() && error != null) {
62+
StringWriter sw = new StringWriter();
63+
PrintWriter pw = new PrintWriter(sw);
64+
error.printStackTrace(pw);
65+
String stringStackTrace = sw.toString();
66+
failcallbacktochannel(channelPointerOpt.get().getAddress(), stringStackTrace);
67+
} else {
68+
throw new InvocationException("Cannot do callback for failure. Please make sure that you don't try to access this method while being in the constructor of your class (that extends NativeCallbackSupport). The failure was: ", error);
69+
}
70+
}
71+
72+
final void initPointer(RustPointer p) {
73+
this.channelPointerOpt = Optional.of(p);
74+
}
75+
}

0 commit comments

Comments
 (0)