Skip to content

Commit 0c2000e

Browse files
committed
Add more tests and address comments
1 parent 004733b commit 0c2000e

File tree

24 files changed

+673
-195
lines changed

24 files changed

+673
-195
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -516,11 +516,16 @@ static AsyncRequestBody empty() {
516516
*/
517517
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
518518
Validate.notNull(splitConfiguration, "splitConfiguration");
519-
return new SplittingPublisher(this, splitConfiguration, false).map(r -> r);
519+
return SplittingPublisher.builder()
520+
.asyncRequestBody(this)
521+
.splitConfiguration(splitConfiguration)
522+
.retryableSubAsyncRequestBodyEnabled(false)
523+
.build()
524+
.map(r -> r);
520525
}
521526

522527
/**
523-
* Converts this {@link AsyncRequestBody} to a publisher of {@link ClosableAsyncRequestBody}s, each of which publishes
528+
* Converts this {@link AsyncRequestBody} to a publisher of {@link CloseableAsyncRequestBody}s, each of which publishes
524529
* specific portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk
525530
* size is 2MB and the default buffer size is 8MB.
526531
*
@@ -529,14 +534,18 @@ default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration
529534
* vary in different implementations.
530535
*
531536
* <p>
532-
* Caller is responsible for closing {@link ClosableAsyncRequestBody} when it is ready to be disposed to release any
537+
* Caller is responsible for closing {@link CloseableAsyncRequestBody} when it is ready to be disposed to release any
533538
* resources.
534539
*
535540
* @see AsyncRequestBodySplitConfiguration
536541
*/
537-
default SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
542+
default SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySplitConfiguration splitConfiguration) {
538543
Validate.notNull(splitConfiguration, "splitConfiguration");
539-
return new SplittingPublisher(this, splitConfiguration, false);
544+
return SplittingPublisher.builder()
545+
.asyncRequestBody(this)
546+
.splitConfiguration(splitConfiguration)
547+
.retryableSubAsyncRequestBodyEnabled(false)
548+
.build();
540549
}
541550

542551
/**
@@ -554,12 +563,12 @@ default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfi
554563
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
555564
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
556565
*
557-
* @see #splitClosable(Consumer)
566+
* @see #splitCloseable(Consumer)
558567
*/
559-
default SdkPublisher<ClosableAsyncRequestBody> splitClosable(
568+
default SdkPublisher<CloseableAsyncRequestBody> splitCloseable(
560569
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
561570
Validate.notNull(splitConfiguration, "splitConfiguration");
562-
return splitClosable(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
571+
return splitCloseable(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
563572
}
564573

565574
@SdkProtectedApi

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BufferedSplittableAsyncRequestBody.java

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,43 @@
2020
import org.reactivestreams.Subscriber;
2121
import software.amazon.awssdk.annotations.SdkPublicApi;
2222
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
23+
import software.amazon.awssdk.utils.Validate;
2324

2425
/**
25-
* An {@link AsyncRequestBody} decorator that can be split into buffered sub {@link AsyncRequestBody}s. Each sub
26-
* {@link AsyncRequestBody} can be retried/resubscribed if all data has been successfully been published to first subscriber.
26+
* An {@link AsyncRequestBody} decorator that enables splitting into retryable sub-request bodies.
27+
*
28+
* <p>This wrapper allows any {@link AsyncRequestBody} to be split into multiple parts where each part
29+
* can be retried independently. When split, each sub-body buffers its portion of data, enabling
30+
* resubscription if a retry is needed (e.g., due to network failures or service errors).</p>
31+
*
32+
* <p><b>Retry Requirements:</b></p>
33+
* <p>Retry is only possible if all the data has been successfully buffered during the first subscription.
34+
* If the first subscriber fails to consume all the data (e.g., due to early cancellation or errors),
35+
* subsequent retry attempts will fail since the complete data set is not available for resubscription.</p>
36+
*
37+
* <p><b>Usage Example:</b></p>
38+
* <pre>{@code
39+
* AsyncRequestBody originalBody = AsyncRequestBody.fromString("Hello World");
40+
* BufferedSplittableAsyncRequestBody retryableBody =
41+
* BufferedSplittableAsyncRequestBody.create(originalBody);
42+
*
43+
* AsyncRequestBodySplitConfiguration config = AsyncRequestBodySplitConfiguration.builder()
44+
* .chunkSizeInBytes(1024)
45+
* .bufferSizeInBytes(2048)
46+
* .build();
47+
*
48+
* SdkPublisher<ClosableAsyncRequestBody> parts = retryableBody.splitClosable(config);
49+
* }</pre>
50+
*
51+
* <p><b>Performance Considerations:</b></p>
52+
* <p>This implementation buffers data in memory to enable retries, but memory usage is controlled by
53+
* the {@code bufferSizeInBytes} configuration. However, this buffering limits the ability to request
54+
* more data from the original AsyncRequestBody until buffered data is consumed (i.e., when subscribers
55+
* closes sub-body), which may increase latency compared to non-buffered implementations.
56+
*
57+
* @see AsyncRequestBody
58+
* @see AsyncRequestBodySplitConfiguration
59+
* @see CloseableAsyncRequestBody
2760
*/
2861
@SdkPublicApi
2962
public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBody {
@@ -33,7 +66,15 @@ private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate) {
3366
this.delegate = delegate;
3467
}
3568

69+
/**
70+
* Creates a new {@link BufferedSplittableAsyncRequestBody} that wraps the provided {@link AsyncRequestBody}.
71+
*
72+
* @param delegate the {@link AsyncRequestBody} to wrap and make retryable. Must not be null.
73+
* @return a new {@link BufferedSplittableAsyncRequestBody} instance
74+
* @throws NullPointerException if delegate is null
75+
*/
3676
public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate) {
77+
Validate.paramNotNull(delegate, "delegate");
3778
return new BufferedSplittableAsyncRequestBody(delegate);
3879
}
3980

@@ -42,9 +83,29 @@ public Optional<Long> contentLength() {
4283
return delegate.contentLength();
4384
}
4485

86+
/**
87+
* Splits this request body into multiple retryable parts based on the provided configuration.
88+
*
89+
* <p>Each part returned by the publisher will be a {@link CloseableAsyncRequestBody} that buffers
90+
* its portion of data, enabling resubscription for retry scenarios. This is the key difference from non-buffered splitting -
91+
* each part can be safely retried without data loss.
92+
*
93+
* <p>The splitting process respects the chunk size and buffer size specified in the configuration
94+
* to optimize memory usage.
95+
*
96+
* <p>The subscriber MUST close each {@link CloseableAsyncRequestBody} to ensure resource is released
97+
*
98+
* @param splitConfiguration configuration specifying how to split the request body
99+
* @return a publisher that emits retryable closable request body parts
100+
* @see AsyncRequestBodySplitConfiguration
101+
*/
45102
@Override
46-
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
47-
return new SplittingPublisher(this, splitConfiguration, true);
103+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySplitConfiguration splitConfiguration) {
104+
return SplittingPublisher.builder()
105+
.asyncRequestBody(this)
106+
.splitConfiguration(splitConfiguration)
107+
.retryableSubAsyncRequestBodyEnabled(true)
108+
.build();
48109
}
49110

50111
@Override

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ClosableAsyncRequestBody.java renamed to core/sdk-core/src/main/java/software/amazon/awssdk/core/async/CloseableAsyncRequestBody.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,5 @@
2222
* An extension of {@link AsyncRequestBody} that is closable.
2323
*/
2424
@SdkPublicApi
25-
public interface ClosableAsyncRequestBody extends AsyncRequestBody, SdkAutoCloseable {
25+
public interface CloseableAsyncRequestBody extends AsyncRequestBody, SdkAutoCloseable {
2626
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import software.amazon.awssdk.annotations.SdkProtectedApi;
2424
import software.amazon.awssdk.core.async.AsyncRequestBody;
2525
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
26-
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
26+
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
2727
import software.amazon.awssdk.core.async.SdkPublisher;
2828
import software.amazon.awssdk.utils.Logger;
2929
import software.amazon.awssdk.utils.Validate;
@@ -78,14 +78,14 @@ public SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfig
7878
}
7979

8080
@Override
81-
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
82-
return delegate.splitClosable(splitConfiguration);
81+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySplitConfiguration splitConfiguration) {
82+
return delegate.splitCloseable(splitConfiguration);
8383
}
8484

8585
@Override
86-
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(
86+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(
8787
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
88-
return delegate.splitClosable(splitConfiguration);
88+
return delegate.splitCloseable(splitConfiguration);
8989
}
9090

9191
@Override

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import software.amazon.awssdk.annotations.SdkInternalApi;
3535
import software.amazon.awssdk.core.async.AsyncRequestBody;
3636
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
37-
import software.amazon.awssdk.core.async.ClosableAsyncRequestBody;
37+
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
3838
import software.amazon.awssdk.core.async.SdkPublisher;
3939
import software.amazon.awssdk.core.internal.util.Mimetype;
4040
import software.amazon.awssdk.core.internal.util.NoopSubscription;
@@ -88,7 +88,7 @@ public SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration s
8888
}
8989

9090
@Override
91-
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
91+
public SdkPublisher<CloseableAsyncRequestBody> splitCloseable(AsyncRequestBodySplitConfiguration splitConfiguration) {
9292
return split(splitConfiguration).map(body -> new ClosableAsyncRequestBodyWrapper(body));
9393
}
9494

@@ -443,7 +443,7 @@ private static AsynchronousFileChannel openInputChannel(Path path) throws IOExce
443443
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
444444
}
445445

446-
private static class ClosableAsyncRequestBodyWrapper implements ClosableAsyncRequestBody {
446+
private static class ClosableAsyncRequestBodyWrapper implements CloseableAsyncRequestBody {
447447
private final AsyncRequestBody delegate;
448448

449449
ClosableAsyncRequestBodyWrapper(AsyncRequestBody body) {

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ public Optional<Long> contentLength() {
6262

6363
public void send(ByteBuffer data) {
6464
log.debug(() -> String.format("Sending bytebuffer %s to part %d", data, partNumber));
65-
int length = data.remaining();
65+
long length = data.remaining();
6666
bufferedLength += length;
67-
onNumBytesReceived.accept((long) length);
67+
onNumBytesReceived.accept(length);
6868
delegate.send(data).whenComplete((r, t) -> {
69-
onNumBytesConsumed.accept((long) length);
69+
onNumBytesConsumed.accept(length);
7070
if (t != null) {
7171
error(t);
7272
}
@@ -92,11 +92,6 @@ public long receivedBytesLength() {
9292
return bufferedLength;
9393
}
9494

95-
@Override
96-
public boolean contentLengthKnown() {
97-
return contentLengthKnown;
98-
}
99-
10095
@Override
10196
public int partNumber() {
10297
return partNumber;
@@ -113,8 +108,10 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
113108
} else {
114109
s.onSubscribe(new NoopSubscription(s));
115110
s.onError(NonRetryableException.create(
116-
"A retry was attempted, but the provided source AsyncRequestBody does not "
117-
+ "support splitting to retryable AsyncRequestBody. Consider using BufferedSplittableAsyncRequestBody."));
111+
"Multiple subscribers detected. This could happen due to a retry attempt. The AsyncRequestBody implementation"
112+
+ " provided does not support splitting to retryable/resubscribable AsyncRequestBody. If you need retry "
113+
+ "capability or multiple subscriptions, consider using BufferedSplittableAsyncRequestBody to wrap your "
114+
+ "AsyncRequestBody."));
118115
}
119116
}
120117

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ public Optional<Long> contentLength() {
7575
@Override
7676
public void send(ByteBuffer data) {
7777
log.trace(() -> String.format("Sending bytebuffer %s to part number %d", data, partNumber));
78-
int length = data.remaining();
78+
long length = data.remaining();
7979
bufferedLength += length;
8080

81-
onNumBytesReceived.accept((long) length);
81+
onNumBytesReceived.accept(length);
8282
delegate.send(data.asReadOnlyBuffer()).whenComplete((r, t) -> {
8383
if (t != null) {
8484
delegate.error(t);
@@ -94,7 +94,9 @@ public void complete() {
9494
log.debug(() -> "Received complete() for part number: " + partNumber);
9595
// ByteBuffersAsyncRequestBody MUST be created before we complete the current
9696
// request because retry may happen right after
97-
bufferedAsyncRequestBody = ByteBuffersAsyncRequestBody.of(buffers, bufferedLength);
97+
synchronized (buffersLock) {
98+
bufferedAsyncRequestBody = ByteBuffersAsyncRequestBody.of(buffers, bufferedLength);
99+
}
98100
delegate.complete().exceptionally(e -> {
99101
delegate.error(e);
100102
return null;
@@ -121,7 +123,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
121123
if (bufferedAsyncRequestBody == null) {
122124
s.onSubscribe(new NoopSubscription(s));
123125
s.onError(NonRetryableException.create(
124-
"A retry was attempted, but data is not buffered successfully for retry, partNumber " + partNumber));
126+
"A retry was attempted, but data is not buffered successfully for retry for partNumber: " + partNumber));
125127
return;
126128
}
127129
bufferedAsyncRequestBody.subscribe(s);
@@ -139,19 +141,14 @@ public void close() {
139141
buffers = null;
140142
}
141143
bufferedAsyncRequestBody.close();
142-
log.debug(() -> "requesting data after closing" + partNumber);
144+
bufferedAsyncRequestBody = null;
143145
}
144146
} catch (Throwable e) {
145147
log.warn(() -> String.format("Unexpected error thrown from cleaning up AsyncRequestBody for part number %d, "
146148
+ "resource may be leaked", partNumber));
147149
}
148150
}
149151

150-
@Override
151-
public boolean contentLengthKnown() {
152-
return contentLengthKnown;
153-
}
154-
155152
@Override
156153
public int partNumber() {
157154
return partNumber;

0 commit comments

Comments
 (0)