Skip to content

Commit 34f9a7f

Browse files
committed
fix: allow configuring max parallel write requests
1 parent 59ee8dc commit 34f9a7f

File tree

5 files changed

+68
-18
lines changed

5 files changed

+68
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## [Unreleased](https://github.com/openfga/java-sdk/compare/v0.8.3...HEAD)
4+
- fix: allow configuring maxParallelRequests for non-transaction writes (#187)
45

56
## v0.8.3
67

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ Convenience `WriteTuples` and `DeleteTuples` methods are also available.
542542

543543
###### Non-transaction mode
544544

545-
The SDK will split the writes into separate requests and send them sequentially to avoid violating rate limits.
545+
The SDK will split the writes into smaller transactions and send them with limited parallelization to avoid violating rate limits.
546546

547547
> Passing `ClientWriteOptions` with `.disableTransactions(true)` is required to use non-transaction mode.
548548
> All other fields of `ClientWriteOptions` are optional.
@@ -570,7 +570,8 @@ var options = new ClientWriteOptions()
570570
// You can rely on the model id set in the configuration or override it for this specific request
571571
.authorizationModelId("01GXSA8YR785C4FYS3C0RTG7B1")
572572
.disableTransactions(true)
573-
.transactionChunkSize(5); // Maximum number of requests to be sent in a transaction in a particular chunk
573+
.transactionChunkSize(5) // Maximum number of requests per transaction chunk, defaults to 1
574+
.maxParallelRequests(5); // Max number of requests to issue in parallel, defaults to 10
574575

575576
var response = fgaClient.write(request, options).get();
576577
```

src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,9 @@ private CompletableFuture<ClientWriteResponse> writeNonTransaction(
414414

415415
var options = writeOptions != null
416416
? writeOptions
417-
: new ClientWriteOptions().transactionChunkSize(DEFAULT_MAX_METHOD_PARALLEL_REQS);
417+
: new ClientWriteOptions()
418+
.transactionChunkSize(1)
419+
.maxParallelRequests(DEFAULT_MAX_METHOD_PARALLEL_REQS);
418420

419421
if (options.getAdditionalHeaders() == null) {
420422
options.additionalHeaders(new HashMap<>());
@@ -434,19 +436,50 @@ private CompletableFuture<ClientWriteResponse> writeNonTransaction(
434436
return this.writeTransactions(storeId, emptyTransaction, writeOptions);
435437
}
436438

437-
var futureResponse = this.writeTransactions(storeId, transactions.get(0), options);
438-
439-
for (int i = 1; i < transactions.size(); i++) {
440-
final int index = i; // Must be final in this scope for closure.
439+
int maxParallelRequests = options.getMaxParallelRequests() != null
440+
? options.getMaxParallelRequests()
441+
: DEFAULT_MAX_METHOD_PARALLEL_REQS;
441442

442-
// The resulting completable future of this chain will result in either:
443-
// 1. The first exception thrown in a failed completion. Other thenCompose() will not be evaluated.
444-
// 2. The final successful ClientWriteResponse.
445-
futureResponse = futureResponse.thenCompose(
446-
_response -> this.writeTransactions(storeId, transactions.get(index), options));
443+
if (maxParallelRequests <= 1) {
444+
var futureResponse = this.writeTransactions(storeId, transactions.get(0), options);
445+
for (int i = 1; i < transactions.size(); i++) {
446+
final int index = i;
447+
futureResponse = futureResponse.thenCompose(
448+
_response -> this.writeTransactions(storeId, transactions.get(index), options));
449+
}
450+
return futureResponse;
447451
}
448452

449-
return futureResponse;
453+
var executor = Executors.newScheduledThreadPool(maxParallelRequests);
454+
var latch = new CountDownLatch(transactions.size());
455+
var failure = new AtomicReference<Throwable>();
456+
var lastResponse = new AtomicReference<ClientWriteResponse>();
457+
458+
Consumer<ClientWriteRequest> singleWriteRequest =
459+
tx -> this.writeTransactions(storeId, tx, options).whenComplete((response, throwable) -> {
460+
try {
461+
if (throwable != null) {
462+
failure.compareAndSet(null, throwable);
463+
} else {
464+
lastResponse.set(response);
465+
}
466+
} finally {
467+
latch.countDown();
468+
}
469+
});
470+
471+
try {
472+
transactions.forEach(tx -> executor.execute(() -> singleWriteRequest.accept(tx)));
473+
latch.await();
474+
if (failure.get() != null) {
475+
return CompletableFuture.failedFuture(failure.get());
476+
}
477+
return CompletableFuture.completedFuture(lastResponse.get());
478+
} catch (Exception e) {
479+
return CompletableFuture.failedFuture(e);
480+
} finally {
481+
executor.shutdown();
482+
}
450483
}
451484

452485
private <T> Stream<List<T>> chunksOf(int chunkSize, List<T> list) {

src/main/java/dev/openfga/sdk/api/configuration/ClientWriteOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class ClientWriteOptions implements AdditionalHeadersSupplier {
1919
private String authorizationModelId;
2020
private Boolean disableTransactions = false;
2121
private int transactionChunkSize;
22+
private Integer maxParallelRequests;
2223

2324
public ClientWriteOptions additionalHeaders(Map<String, String> additionalHeaders) {
2425
this.additionalHeaders = additionalHeaders;
@@ -56,4 +57,13 @@ public ClientWriteOptions transactionChunkSize(int transactionChunkSize) {
5657
public int getTransactionChunkSize() {
5758
return transactionChunkSize > 0 ? transactionChunkSize : 1;
5859
}
60+
61+
public ClientWriteOptions maxParallelRequests(Integer maxParallelRequests) {
62+
this.maxParallelRequests = maxParallelRequests;
63+
return this;
64+
}
65+
66+
public Integer getMaxParallelRequests() {
67+
return maxParallelRequests;
68+
}
5969
}

src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,8 +1216,10 @@ public void writeTest_nonTransaction() throws Exception {
12161216
ClientWriteRequest request = new ClientWriteRequest()
12171217
.writes(List.of(writeTuple, writeTuple, writeTuple, writeTuple, writeTuple))
12181218
.deletes(List.of(tuple, tuple, tuple, tuple, tuple));
1219-
ClientWriteOptions options =
1220-
new ClientWriteOptions().disableTransactions(true).transactionChunkSize(2);
1219+
ClientWriteOptions options = new ClientWriteOptions()
1220+
.disableTransactions(true)
1221+
.transactionChunkSize(2)
1222+
.maxParallelRequests(1);
12211223

12221224
// When
12231225
var response = fga.write(request, options).get();
@@ -1284,8 +1286,10 @@ public void writeTest_nonTransactionsWithFailure() {
12841286
.user(user)
12851287
.condition(DEFAULT_CONDITION))
12861288
.collect(Collectors.toList()));
1287-
ClientWriteOptions options =
1288-
new ClientWriteOptions().disableTransactions(true).transactionChunkSize(1);
1289+
ClientWriteOptions options = new ClientWriteOptions()
1290+
.disableTransactions(true)
1291+
.transactionChunkSize(1)
1292+
.maxParallelRequests(1);
12891293

12901294
// When
12911295
var execException = assertThrows(
@@ -2005,7 +2009,8 @@ public void shouldSplitBatchesSuccessfully(WireMockRuntimeInfo wireMockRuntimeIn
20052009
.correlationId("cor-3");
20062010
ClientBatchCheckRequest request = new ClientBatchCheckRequest().checks(List.of(item1, item2, item3));
20072011

2008-
ClientBatchCheckOptions options = new ClientBatchCheckOptions().maxBatchSize(2);
2012+
ClientBatchCheckOptions options =
2013+
new ClientBatchCheckOptions().maxBatchSize(2).maxParallelRequests(1);
20092014

20102015
// When
20112016
ClientBatchCheckResponse response = fga.batchCheck(request, options).join();

0 commit comments

Comments
 (0)