Skip to content

Commit 69842ee

Browse files
committed
refactoring
1 parent 365c04e commit 69842ee

File tree

1 file changed

+63
-48
lines changed
  • java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk

1 file changed

+63
-48
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 63 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -363,51 +363,22 @@ public void flush() {
363363

364364
if (failedRequestsCanRetry.isEmpty() || backoffPolicy.equals(BackoffPolicy.noBackoff())) {
365365
// Total success! ...or there's no retry policy implemented. Either way, can call
366-
// listener after bulk
367-
if (listener != null) {
368-
listenerInProgressCount.incrementAndGet();
369-
scheduler.submit(() -> {
370-
try {
371-
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
372-
} finally {
373-
if (listenerInProgressCount.decrementAndGet() == 0) {
374-
closeCondition.signalIfReady();
375-
}
376-
}
377-
});
378-
}
366+
listenerAfterBulkSuccess(resp, exec);
379367
} else {
380368
// Partial success, retrying failed requests if policy allows it
381-
// Keeping list of retryables, to exclude them for calling listener later
369+
// Keeping list of retryable requests/responses, to exclude them for calling
370+
// listener later
382371
List<BulkOperationRepeatable<Context>> retryableReq = new ArrayList<>();
383372
List<BulkOperationRepeatable<Context>> refires = new ArrayList<>();
384373
List<BulkResponseItem> retryableResp = new ArrayList<>();
374+
385375
for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) {
386376
int index = resp.items().indexOf(bulkItemResponse);
387-
// Getting original failed, requests and keeping successful ones to send to the
388-
// listener
389-
BulkOperationRepeatable<Context> original = sentRequests.get(index);
390-
if (original.canRetry()) {
391-
retryableResp.add(bulkItemResponse);
392-
Iterator<Long> retries =
393-
Optional.ofNullable(original.getRetries()).orElse(backoffPolicy.iterator());
394-
BulkOperationRepeatable<Context> refire =
395-
new BulkOperationRepeatable<>(original.getOperation(),
396-
original.getContext(), retries);
397-
retryableReq.add(original);
398-
refires.add(refire);
399-
addRetry(refire);
400-
logger.warn("Added failed request back in queue, retrying in : " + refire.getCurrentRetryTimeDelay() + " ms");
401-
// TODO remove after checking
402-
assert (bulkItemResponse.operationType().toString().equals(sentRequests.get(index).getOperation()._kind().toString()));
403-
} else {
404-
logger.warn("Retries finished for request: " + original.getOperation()._kind().toString());
405-
}
377+
selectingRetries(index, bulkItemResponse, sentRequests, retryableResp,
378+
retryableReq, refires);
406379
}
407380
// Scheduling flushes for just sent out retryable requests
408381
if (!refires.isEmpty()) {
409-
// if size <= 3, all times
410-
// if size > 3, schedule just first, last and median
411382
scheduleRetries(refires);
412383
}
413384
// Retrieving list of remaining successful or not retryable requests
@@ -450,18 +421,7 @@ public void flush() {
450421
}
451422
} else {
452423
// Failure
453-
if (listener != null) {
454-
listenerInProgressCount.incrementAndGet();
455-
scheduler.submit(() -> {
456-
try {
457-
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
458-
} finally {
459-
if (listenerInProgressCount.decrementAndGet() == 0) {
460-
closeCondition.signalIfReady();
461-
}
462-
}
463-
});
464-
}
424+
listenerAfterBulkException(thr, exec);
465425
}
466426

467427
sendRequestCondition.signalIfReadyAfter(() -> {
@@ -473,6 +433,58 @@ public void flush() {
473433
}
474434
}
475435

436+
private void selectingRetries(int index, BulkResponseItem bulkItemResponse,
437+
List<BulkOperationRepeatable<Context>> sentRequests,
438+
List<BulkResponseItem> retryableResp,
439+
List<BulkOperationRepeatable<Context>> retryableReq,
440+
List<BulkOperationRepeatable<Context>> refires) {
441+
442+
// Getting original failed, requests and keeping successful ones to send to the listener
443+
BulkOperationRepeatable<Context> original = sentRequests.get(index);
444+
if (original.canRetry()) {
445+
retryableResp.add(bulkItemResponse);
446+
Iterator<Long> retries =
447+
Optional.ofNullable(original.getRetries()).orElse(backoffPolicy.iterator());
448+
BulkOperationRepeatable<Context> refire = new BulkOperationRepeatable<>(original.getOperation(), original.getContext(), retries);
449+
retryableReq.add(original);
450+
refires.add(refire);
451+
addRetry(refire);
452+
logger.warn("Added failed request back in queue, retrying in : " + refire.getCurrentRetryTimeDelay() + " ms");
453+
} else {
454+
logger.warn("Retries finished for request: " + original.getOperation()._kind().toString());
455+
}
456+
}
457+
458+
private void listenerAfterBulkException(Throwable thr, RequestExecution<Context> exec) {
459+
if (listener != null) {
460+
listenerInProgressCount.incrementAndGet();
461+
scheduler.submit(() -> {
462+
try {
463+
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
464+
} finally {
465+
if (listenerInProgressCount.decrementAndGet() == 0) {
466+
closeCondition.signalIfReady();
467+
}
468+
}
469+
});
470+
}
471+
}
472+
473+
private void listenerAfterBulkSuccess(BulkResponse resp, RequestExecution<Context> exec) {
474+
if (listener != null) {
475+
listenerInProgressCount.incrementAndGet();
476+
scheduler.submit(() -> {
477+
try {
478+
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
479+
} finally {
480+
if (listenerInProgressCount.decrementAndGet() == 0) {
481+
closeCondition.signalIfReady();
482+
}
483+
}
484+
});
485+
}
486+
}
487+
476488
private void scheduleRetries(List<BulkOperationRepeatable<Context>> retryableReq) {
477489
List<Long> sortedDelays = retryableReq.stream()
478490
.map(BulkOperationRepeatable::getCurrentRetryTimeDelay)
@@ -696,7 +708,10 @@ public Builder<Context> listener(BulkListener<Context> listener) {
696708
return this;
697709
}
698710

699-
711+
/**
712+
* Sets the backoff policy that will handle retries for error 429: too many requests.
713+
* All the times are defined in milliseconds.
714+
*/
700715
public Builder<Context> backoffPolicy(BackoffPolicy backoffPolicy) {
701716
this.backoffPolicy = backoffPolicy;
702717
return this;

0 commit comments

Comments
 (0)