32
32
import co .elastic .clients .json .JsonpMapper ;
33
33
import co .elastic .clients .json .JsonpUtils ;
34
34
import co .elastic .clients .json .SimpleJsonpMapper ;
35
- import co .elastic .clients .transport .BackoffPolicy ;
36
35
import co .elastic .clients .transport .ElasticsearchTransport ;
37
36
import co .elastic .clients .transport .Endpoint ;
38
37
import co .elastic .clients .transport .TransportOptions ;
@@ -142,7 +141,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,
142
141
for (int i = 0 ; i < numThreads ; i ++) {
143
142
new Thread (() -> {
144
143
try {
145
- Thread .sleep ((long ) (Math .random () * 100 ));
144
+ Thread .sleep ((long )(Math .random () * 100 ));
146
145
} catch (InterruptedException e ) {
147
146
throw new RuntimeException (e );
148
147
}
@@ -169,8 +168,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,
169
168
assertEquals (expectedOperations , listener .operations .get ());
170
169
assertEquals (expectedOperations , transport .operations .get ());
171
170
172
- int expectedRequests =
173
- expectedOperations / maxOperations + ((expectedOperations % maxOperations == 0 ) ? 0 : 1 );
171
+ int expectedRequests = expectedOperations / maxOperations + ((expectedOperations % maxOperations == 0 ) ? 0 : 1 ) ;
174
172
175
173
assertEquals (expectedRequests , ingester .requestCount ());
176
174
assertEquals (expectedRequests , listener .requests .get ());
@@ -185,8 +183,7 @@ public void multiThreadStressTest() throws InterruptedException, IOException {
185
183
186
184
// DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme
187
185
// situation where the number of adding threads greatly exceeds the number of concurrent requests
188
- // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests
189
- // accordingly.
186
+ // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests accordingly.
190
187
BulkIngester <?> ingester = BulkIngester .of (b -> b
191
188
.client (client )
192
189
.globalSettings (s -> s .index (index ))
@@ -212,22 +209,21 @@ public void multiThreadStressTest() throws InterruptedException, IOException {
212
209
executor .submit (thread );
213
210
}
214
211
215
- executor .awaitTermination (10 , TimeUnit .SECONDS );
212
+ executor .awaitTermination (10 ,TimeUnit .SECONDS );
216
213
ingester .close ();
217
214
218
215
client .indices ().refresh ();
219
216
220
217
IndicesStatsResponse indexStats = client .indices ().stats (g -> g .index (index ));
221
218
222
- assertTrue (indexStats .indices ().get (index ).primaries ().docs ().count () == 100000 );
219
+ assertTrue (indexStats .indices ().get (index ).primaries ().docs ().count ()== 100000 );
223
220
}
224
221
225
222
@ Test
226
223
public void sizeLimitTest () throws Exception {
227
224
TestTransport transport = new TestTransport ();
228
225
229
- long operationSize = IngesterOperation .of (new BulkOperationRepeatable <>(operation , null , null ),
230
- transport .jsonpMapper ()).size ();
226
+ long operationSize = IngesterOperation .of (new BulkOperationRepeatable <>(operation , null , null ), transport .jsonpMapper ()).size ();
231
227
232
228
BulkIngester <?> ingester = BulkIngester .of (b -> b
233
229
.client (new ElasticsearchAsyncClient (transport ))
@@ -257,7 +253,7 @@ public void periodicFlushTest() throws Exception {
257
253
// Disable other flushing limits
258
254
.maxSize (-1 )
259
255
.maxOperations (-1 )
260
- .maxConcurrentRequests (Integer .MAX_VALUE - 1 )
256
+ .maxConcurrentRequests (Integer .MAX_VALUE - 1 )
261
257
);
262
258
263
259
// Add an operation every 100 ms to give time
@@ -297,8 +293,7 @@ public void beforeBulk(long executionId, BulkRequest request, List<Void> context
297
293
}
298
294
299
295
@ Override
300
- public void afterBulk (long executionId , BulkRequest request , List <Void > contexts ,
301
- BulkResponse response ) {
296
+ public void afterBulk (long executionId , BulkRequest request , List <Void > contexts , BulkResponse response ) {
302
297
if (executionId == 2 ) {
303
298
// Fail after the request is sent
304
299
failureCount .incrementAndGet ();
@@ -307,8 +302,7 @@ public void afterBulk(long executionId, BulkRequest request, List<Void> contexts
307
302
}
308
303
309
304
@ Override
310
- public void afterBulk (long executionId , BulkRequest request , List <Void > contexts ,
311
- Throwable failure ) {
305
+ public void afterBulk (long executionId , BulkRequest request , List <Void > contexts , Throwable failure ) {
312
306
313
307
}
314
308
};
@@ -361,13 +355,11 @@ public void beforeBulk(long executionId, BulkRequest request, List<Integer> cont
361
355
}
362
356
363
357
@ Override
364
- public void afterBulk (long executionId , BulkRequest request , List <Integer > contexts ,
365
- BulkResponse response ) {
358
+ public void afterBulk (long executionId , BulkRequest request , List <Integer > contexts , BulkResponse response ) {
366
359
}
367
360
368
361
@ Override
369
- public void afterBulk (long executionId , BulkRequest request , List <Integer > contexts ,
370
- Throwable failure ) {
362
+ public void afterBulk (long executionId , BulkRequest request , List <Integer > contexts , Throwable failure ) {
371
363
}
372
364
};
373
365
@@ -381,7 +373,7 @@ public void afterBulk(long executionId, BulkRequest request, List<Integer> conte
381
373
for (int i = 0 ; i < 10 ; i ++) {
382
374
for (int j = 0 ; j < 10 ; j ++) {
383
375
// Set a context only after 5, so that we test filling with nulls.
384
- Integer context = j < 5 ? null : i * 10 + j ;
376
+ Integer context = j < 5 ? null : i * 10 + j ;
385
377
ingester .add (operation , context );
386
378
}
387
379
}
@@ -399,7 +391,7 @@ public void afterBulk(long executionId, BulkRequest request, List<Integer> conte
399
391
if (j < 5 ) {
400
392
assertNull (contexts .get (j ));
401
393
} else {
402
- assertEquals (contexts .get (j ), i * 10 + j );
394
+ assertEquals (contexts .get (j ), i * 10 + j );
403
395
}
404
396
}
405
397
}
@@ -441,8 +433,7 @@ public void beforeBulk(long executionId, BulkRequest request, List<Void> context
441
433
442
434
@ Test
443
435
public void pipelineTest () {
444
- String json = "{\" create\" :{\" _id\" :\" some_id\" ,\" _index\" :\" some_idx\" ,\" pipeline\" :\" pipe\" ," +
445
- "\" require_alias\" :true}}" ;
436
+ String json = "{\" create\" :{\" _id\" :\" some_id\" ,\" _index\" :\" some_idx\" ,\" pipeline\" :\" pipe\" ,\" require_alias\" :true}}" ;
446
437
JsonpMapper mapper = new SimpleJsonpMapper ();
447
438
448
439
BulkOperation create = BulkOperation .of (o -> o .create (c -> c
@@ -456,8 +447,7 @@ public void pipelineTest() {
456
447
String createStr = JsonpUtils .toJsonString (create , mapper );
457
448
assertEquals (json , createStr );
458
449
459
- BulkOperation create1 = IngesterOperation .of (new BulkOperationRepeatable <>(create , null , null ),
460
- mapper ).operation ().getOperation ();
450
+ BulkOperation create1 = IngesterOperation .of (new BulkOperationRepeatable <>(create , null , null ), mapper ).repeatableOperation ().getOperation ();
461
451
462
452
String create1Str = JsonpUtils .toJsonString (create1 , mapper );
463
453
assertEquals (json , create1Str );
@@ -546,15 +536,13 @@ public void testConfigValidation() {
546
536
private static class CountingListener implements BulkListener <Void > {
547
537
public final AtomicInteger operations = new AtomicInteger ();
548
538
public final AtomicInteger requests = new AtomicInteger ();
549
-
550
539
@ Override
551
540
public void beforeBulk (long executionId , BulkRequest request , List <Void > contexts ) {
552
541
553
542
}
554
543
555
544
@ Override
556
- public void afterBulk (long executionId , BulkRequest request , List <Void > contexts ,
557
- BulkResponse response ) {
545
+ public void afterBulk (long executionId , BulkRequest request , List <Void > contexts , BulkResponse response ) {
558
546
operations .addAndGet (request .operations ().size ());
559
547
requests .incrementAndGet ();
560
548
}
@@ -607,7 +595,7 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
607
595
});
608
596
609
597
@ SuppressWarnings ("unchecked" )
610
- CompletableFuture <ResponseT > result = (CompletableFuture <ResponseT >) response ;
598
+ CompletableFuture <ResponseT > result = (CompletableFuture <ResponseT >)response ;
611
599
return result ;
612
600
}
613
601
0 commit comments