diff --git a/core/src/main/java/com/arangodb/ArangoDatabase.java b/core/src/main/java/com/arangodb/ArangoDatabase.java index b40722ddd..1c7653360 100644 --- a/core/src/main/java/com/arangodb/ArangoDatabase.java +++ b/core/src/main/java/com/arangodb/ArangoDatabase.java @@ -314,6 +314,19 @@ public interface ArangoDatabase extends ArangoSerdeAccessor { */ ArangoCursor cursor(String cursorId, Class type); + /** + * Return an cursor from the given cursor-ID if still existing + * + * @param cursorId The ID of the cursor + * @param type The type of the result (POJO or {@link com.arangodb.util.RawData}) + * @param options options + * @return cursor of the results + * @see API + * Documentation + */ + ArangoCursor cursor(String cursorId, Class type, AqlQueryOptions options); + /** * Return an cursor from the given cursor-ID if still existing * @@ -327,6 +340,20 @@ public interface ArangoDatabase extends ArangoSerdeAccessor { */ ArangoCursor cursor(String cursorId, Class type, String nextBatchId); + /** + * Return an cursor from the given cursor-ID if still existing + * + * @param cursorId The ID of the cursor + * @param type The type of the result (POJO or {@link com.arangodb.util.RawData}) + * @param nextBatchId The ID of the next cursor batch (set only if cursor allows retries, see + * {@link AqlQueryOptions#allowRetry(Boolean)} + * @param options options + * @return cursor of the results + * @see API Documentation + * @since ArangoDB 3.11 + */ + ArangoCursor cursor(String cursorId, Class type, String nextBatchId, AqlQueryOptions options); + /** * Explain an AQL query and return information about it * diff --git a/core/src/main/java/com/arangodb/ArangoDatabaseAsync.java b/core/src/main/java/com/arangodb/ArangoDatabaseAsync.java index e3b90b78b..6f4c2a366 100644 --- a/core/src/main/java/com/arangodb/ArangoDatabaseAsync.java +++ b/core/src/main/java/com/arangodb/ArangoDatabaseAsync.java @@ -156,8 +156,12 @@ public interface ArangoDatabaseAsync extends ArangoSerdeAccessor { CompletableFuture> cursor(String cursorId, Class type); + CompletableFuture> cursor(String cursorId, Class type, AqlQueryOptions options); + CompletableFuture> cursor(String cursorId, Class type, String nextBatchId); + CompletableFuture> cursor(String cursorId, Class type, String nextBatchId, AqlQueryOptions options); + /** * Asynchronous version of {@link ArangoDatabase#explainQuery(String, Map, AqlQueryExplainOptions)} */ diff --git a/core/src/main/java/com/arangodb/entity/CursorEntity.java b/core/src/main/java/com/arangodb/entity/CursorEntity.java index 7fbc591e0..71b34f31d 100644 --- a/core/src/main/java/com/arangodb/entity/CursorEntity.java +++ b/core/src/main/java/com/arangodb/entity/CursorEntity.java @@ -44,6 +44,10 @@ public String getId() { return id; } + public void setId(String id) { + this.id = id; + } + /** * @return the total number of result documents available (only available if the query was executed with the count * attribute set) diff --git a/core/src/main/java/com/arangodb/internal/ArangoDatabaseAsyncImpl.java b/core/src/main/java/com/arangodb/internal/ArangoDatabaseAsyncImpl.java index b02069c16..cf5ceeecc 100644 --- a/core/src/main/java/com/arangodb/internal/ArangoDatabaseAsyncImpl.java +++ b/core/src/main/java/com/arangodb/internal/ArangoDatabaseAsyncImpl.java @@ -193,15 +193,25 @@ public CompletableFuture> query(String query, Class @Override public CompletableFuture> cursor(final String cursorId, final Class type) { - return cursor(cursorId, type, null); + return cursor(cursorId, type, null, new AqlQueryOptions()); + } + + @Override + public CompletableFuture> cursor(String cursorId, Class type, AqlQueryOptions options) { + return cursor(cursorId, type, null, options); } @Override public CompletableFuture> cursor(final String cursorId, final Class type, final String nextBatchId) { - final HostHandle hostHandle = new HostHandle(); + return cursor(cursorId, type, nextBatchId, new AqlQueryOptions()); + } + + @Override + public CompletableFuture> cursor(String cursorId, Class type, String nextBatchId, AqlQueryOptions options) { + options.allowRetry(nextBatchId != null); + HostHandle hostHandle = new HostHandle(); return executorAsync() - .execute(() -> - queryNextRequest(cursorId, new AqlQueryOptions(), nextBatchId), + .execute(() -> queryNextRequest(cursorId, options, nextBatchId), cursorEntityDeserializer(type), hostHandle) .thenApply(res -> new ArangoCursorAsyncImpl<>(this, res, type, hostHandle, nextBatchId != null)); diff --git a/core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java b/core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java index af319c479..63fec8e66 100644 --- a/core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java +++ b/core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java @@ -186,17 +186,28 @@ public ArangoCursor query(final String query, final Class type) { @Override public ArangoCursor cursor(final String cursorId, final Class type) { - return cursor(cursorId, type, null); + return cursor(cursorId, type, null, new AqlQueryOptions()); + } + + @Override + public ArangoCursor cursor(final String cursorId, final Class type, final AqlQueryOptions options) { + return cursor(cursorId, type, null, options); } @Override public ArangoCursor cursor(final String cursorId, final Class type, final String nextBatchId) { - final HostHandle hostHandle = new HostHandle(); - final CursorEntity result = executorSync().execute( - queryNextRequest(cursorId, new AqlQueryOptions(), nextBatchId), + return cursor(cursorId, type, nextBatchId, new AqlQueryOptions()); + } + + @Override + public ArangoCursor cursor(final String cursorId, final Class type, final String nextBatchId, final AqlQueryOptions options) { + options.allowRetry(nextBatchId != null); + HostHandle hostHandle = new HostHandle(); + CursorEntity result = executorSync().execute( + queryNextRequest(cursorId, options, nextBatchId), cursorEntityDeserializer(type), hostHandle); - return createCursor(result, type, null, hostHandle); + return createCursor(result, type, options, hostHandle); } private ArangoCursor createCursor( diff --git a/core/src/main/java/com/arangodb/internal/cursor/ArangoCursorAsyncImpl.java b/core/src/main/java/com/arangodb/internal/cursor/ArangoCursorAsyncImpl.java index afe5b55f5..5e94adc43 100644 --- a/core/src/main/java/com/arangodb/internal/cursor/ArangoCursorAsyncImpl.java +++ b/core/src/main/java/com/arangodb/internal/cursor/ArangoCursorAsyncImpl.java @@ -17,6 +17,7 @@ public class ArangoCursorAsyncImpl extends InternalArangoCursor implements private final ArangoDatabaseAsyncImpl db; private final HostHandle hostHandle; + private final CursorEntity entity; public ArangoCursorAsyncImpl( final ArangoDatabaseAsyncImpl db, @@ -28,13 +29,18 @@ public ArangoCursorAsyncImpl( super(db, db.name(), entity, type, allowRetry); this.db = db; this.hostHandle = hostHandle; + this.entity = entity; } @Override public CompletableFuture> nextBatch() { if (Boolean.TRUE.equals(hasMore())) { return executorAsync().execute(this::queryNextRequest, db.cursorEntityDeserializer(getType()), hostHandle) - .thenApply(r -> new ArangoCursorAsyncImpl<>(db, r, getType(), hostHandle, allowRetry())); + .thenApply(r -> { + // needed because the latest batch does not return the cursor id + r.setId(entity.getId()); + return new ArangoCursorAsyncImpl<>(db, r, getType(), hostHandle, allowRetry()); + }); } else { CompletableFuture> cf = new CompletableFuture<>(); cf.completeExceptionally(new NoSuchElementException()); diff --git a/test-functional/src/test/java/com/arangodb/ArangoDatabaseAsyncTest.java b/test-functional/src/test/java/com/arangodb/ArangoDatabaseAsyncTest.java index 6f50b192d..0a9ed17f6 100644 --- a/test-functional/src/test/java/com/arangodb/ArangoDatabaseAsyncTest.java +++ b/test-functional/src/test/java/com/arangodb/ArangoDatabaseAsyncTest.java @@ -778,6 +778,26 @@ void queryCursor(ArangoDatabaseAsync db) throws ExecutionException, InterruptedE assertThat(result).containsExactly(1, 2, 3, 4); } + @ParameterizedTest + @MethodSource("asyncDbs") + void queryCursorInTx(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException { + StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions()).get(); + ArangoCursorAsync c1 = db.query("for i in 1..4 return i", Integer.class, + new AqlQueryOptions().batchSize(1).streamTransactionId(tx.getId())).get(); + List result = new ArrayList<>(); + result.addAll(c1.getResult()); + ArangoCursorAsync c2 = c1.nextBatch().get(); + result.addAll(c2.getResult()); + ArangoCursorAsync c3 = db.cursor(c2.getId(), Integer.class, + new AqlQueryOptions().streamTransactionId(tx.getId())).get(); + result.addAll(c3.getResult()); + ArangoCursorAsync c4 = c3.nextBatch().get(); + result.addAll(c4.getResult()); + assertThat(c4.hasMore()).isFalse(); + assertThat(result).containsExactly(1, 2, 3, 4); + db.abortStreamTransaction(tx.getId()).get(); + } + @ParameterizedTest @MethodSource("asyncDbs") void queryCursorRetry(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException { @@ -797,6 +817,28 @@ void queryCursorRetry(ArangoDatabaseAsync db) throws ExecutionException, Interru assertThat(result).containsExactly(1, 2, 3, 4); } + @ParameterizedTest + @MethodSource("asyncDbs") + void queryCursorRetryInTx(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException { + assumeTrue(isAtLeastVersion(3, 11)); + StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions()).get(); + ArangoCursorAsync c1 = db.query("for i in 1..4 return i", Integer.class, + new AqlQueryOptions().batchSize(1).allowRetry(true).streamTransactionId(tx.getId())).get(); + List result = new ArrayList<>(); + result.addAll(c1.getResult()); + ArangoCursorAsync c2 = c1.nextBatch().get(); + result.addAll(c2.getResult()); + ArangoCursorAsync c3 = db.cursor(c2.getId(), Integer.class, c2.getNextBatchId(), + new AqlQueryOptions().streamTransactionId(tx.getId())).get(); + result.addAll(c3.getResult()); + ArangoCursorAsync c4 = c3.nextBatch().get(); + result.addAll(c4.getResult()); + c4.close(); + assertThat(c4.hasMore()).isFalse(); + assertThat(result).containsExactly(1, 2, 3, 4); + db.abortStreamTransaction(tx.getId()).get(); + } + @ParameterizedTest @MethodSource("asyncDbs") void changeQueryTrackingProperties(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException { diff --git a/test-functional/src/test/java/com/arangodb/ArangoDatabaseTest.java b/test-functional/src/test/java/com/arangodb/ArangoDatabaseTest.java index f79a70144..bead8f2f5 100644 --- a/test-functional/src/test/java/com/arangodb/ArangoDatabaseTest.java +++ b/test-functional/src/test/java/com/arangodb/ArangoDatabaseTest.java @@ -848,6 +848,25 @@ void queryCursor(ArangoDatabase db) { assertThat(result).containsExactly(1, 2, 3, 4); } + @ParameterizedTest + @MethodSource("dbs") + void queryCursorInTx(ArangoDatabase db) { + StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions()); + ArangoCursor cursor = db.query("for i in 1..4 return i", Integer.class, + new AqlQueryOptions().batchSize(1).streamTransactionId(tx.getId())); + List result = new ArrayList<>(); + result.add(cursor.next()); + result.add(cursor.next()); + ArangoCursor cursor2 = db.cursor(cursor.getId(), Integer.class, + new AqlQueryOptions().streamTransactionId(tx.getId()) + ); + result.add(cursor2.next()); + result.add(cursor2.next()); + assertThat(cursor2.hasNext()).isFalse(); + assertThat(result).containsExactly(1, 2, 3, 4); + db.abortStreamTransaction(tx.getId()); + } + @ParameterizedTest @MethodSource("dbs") void queryCursorRetry(ArangoDatabase db) throws IOException { @@ -865,6 +884,27 @@ void queryCursorRetry(ArangoDatabase db) throws IOException { assertThat(result).containsExactly(1, 2, 3, 4); } + @ParameterizedTest + @MethodSource("dbs") + void queryCursorRetryInTx(ArangoDatabase db) throws IOException { + assumeTrue(isAtLeastVersion(3, 11)); + StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions()); + ArangoCursor cursor = db.query("for i in 1..4 return i", Integer.class, + new AqlQueryOptions().batchSize(1).allowRetry(true).streamTransactionId(tx.getId())); + List result = new ArrayList<>(); + result.add(cursor.next()); + result.add(cursor.next()); + ArangoCursor cursor2 = db.cursor(cursor.getId(), Integer.class, cursor.getNextBatchId(), + new AqlQueryOptions().streamTransactionId(tx.getId()) + ); + result.add(cursor2.next()); + result.add(cursor2.next()); + cursor2.close(); + assertThat(cursor2.hasNext()).isFalse(); + assertThat(result).containsExactly(1, 2, 3, 4); + db.abortStreamTransaction(tx.getId()); + } + @ParameterizedTest @MethodSource("dbs") void changeQueryTrackingProperties(ArangoDatabase db) {