Skip to content

Commit 8fa088c

Browse files
committed
Refactoring based on API surface are review
1 parent 3c91cc9 commit 8fa088c

19 files changed

+674
-412
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import software.amazon.awssdk.annotations.SdkPublicApi;
3434
import software.amazon.awssdk.core.FileRequestBodyConfiguration;
3535
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
36-
import software.amazon.awssdk.core.internal.async.ClosableAsyncRequestBodyAdaptor;
3736
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3837
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
3938
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
@@ -501,22 +500,23 @@ static AsyncRequestBody empty() {
501500
return fromBytes(new byte[0]);
502501
}
503502

504-
505503
/**
506504
* Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
507505
* portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size
508506
* is 2MB and the default buffer size is 8MB.
509507
*
510508
* <p>
511-
* Each divided {@link AsyncRequestBody} is sent after the entire content for that chunk is buffered.
509+
* By default, if content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is
510+
* delivered to the subscriber right after it's initialized. On the other hand, if content length is null, it is sent after
511+
* the entire content for that chunk is buffered. In this case, the configured {@code maxMemoryUsageInBytes} must be larger
512+
* than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this
513+
* interface overrides this method.
512514
*
513515
* @see AsyncRequestBodySplitConfiguration
514-
* @deprecated Use {@link #splitV2(AsyncRequestBodySplitConfiguration)} instead.
515516
*/
516-
@Deprecated
517517
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
518518
Validate.notNull(splitConfiguration, "splitConfiguration");
519-
return splitV2(splitConfiguration).map(body -> new ClosableAsyncRequestBodyAdaptor(body));
519+
return new SplittingPublisher(this, splitConfiguration, false).map(r -> r);
520520
}
521521

522522
/**
@@ -525,27 +525,26 @@ default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration
525525
* size is 2MB and the default buffer size is 8MB.
526526
*
527527
* <p>
528-
* Each divided {@link ClosableAsyncRequestBody} is sent after the entire content for that chunk is buffered. This behavior
529-
* may be different if a specific implementation of this interface overrides this method.
528+
* The default implementation behaves the same as {@link #split(AsyncRequestBodySplitConfiguration)}. This behavior may
529+
* vary in different implementations.
530530
*
531531
* <p>
532-
* Each {@link ClosableAsyncRequestBody} MUST be closed by the user when it is ready to be disposed.
532+
* Caller is responsible for closing {@link ClosableAsyncRequestBody} when it is ready to be disposed to release any
533+
* resources.
533534
*
534535
* @see AsyncRequestBodySplitConfiguration
535536
*/
536-
default SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
537+
default SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
537538
Validate.notNull(splitConfiguration, "splitConfiguration");
538-
return new SplittingPublisher(this, splitConfiguration);
539+
return new SplittingPublisher(this, splitConfiguration, false);
539540
}
540541

541542
/**
542543
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
543544
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
544545
*
545546
* @see #split(AsyncRequestBodySplitConfiguration)
546-
* @deprecated Use {@link #splitV2(Consumer)} instead.
547547
*/
548-
@Deprecated
549548
default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
550549
Validate.notNull(splitConfiguration, "splitConfiguration");
551550
return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
@@ -555,12 +554,12 @@ default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfi
555554
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
556555
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
557556
*
558-
* @see #splitV2(Consumer)
557+
* @see #splitClosable(Consumer)
559558
*/
560-
default SdkPublisher<ClosableAsyncRequestBody> splitV2(
559+
default SdkPublisher<ClosableAsyncRequestBody> splitClosable(
561560
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
562561
Validate.notNull(splitConfiguration, "splitConfiguration");
563-
return splitV2(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
562+
return splitClosable(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
564563
}
565564

566565
@SdkProtectedApi
@@ -569,6 +568,8 @@ enum BodyType {
569568
BYTES("Bytes", "b"),
570569
STREAM("Stream", "s"),
571570
PUBLISHER("Publisher", "p"),
571+
RETRYABLE("RetryableSub", "rs"),
572+
NON_RETRYABLE("NonRetryableSub", "nrs"),
572573
UNKNOWN("Unknown", "u");
573574

574575
private static final Map<String, BodyType> VALUE_MAP =
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import org.reactivestreams.Subscriber;
21+
import software.amazon.awssdk.annotations.SdkPublicApi;
22+
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
23+
24+
/**
25+
* An {@link AsyncRequestBody} decorator that can be split into buffered sub {@link AsyncRequestBody}s. Each sub
26+
* {@link AsyncRequestBody} can be retried/resubscribed.
27+
*/
28+
@SdkPublicApi
29+
public final class BufferedSplittableAsyncRequestBody implements AsyncRequestBody {
30+
private final AsyncRequestBody delegate;
31+
32+
private BufferedSplittableAsyncRequestBody(AsyncRequestBody delegate) {
33+
this.delegate = delegate;
34+
}
35+
36+
public static BufferedSplittableAsyncRequestBody create(AsyncRequestBody delegate) {
37+
return new BufferedSplittableAsyncRequestBody(delegate);
38+
}
39+
40+
@Override
41+
public Optional<Long> contentLength() {
42+
return delegate.contentLength();
43+
}
44+
45+
@Override
46+
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
47+
return new SplittingPublisher(this, splitConfiguration, true);
48+
}
49+
50+
@Override
51+
public void subscribe(Subscriber<? super ByteBuffer> s) {
52+
delegate.subscribe(s);
53+
}
54+
55+
@Override
56+
public String body() {
57+
return delegate.body();
58+
}
59+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,14 @@ public SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfig
7878
}
7979

8080
@Override
81-
public SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
82-
return delegate.splitV2(splitConfiguration);
81+
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
82+
return delegate.splitClosable(splitConfiguration);
8383
}
8484

8585
@Override
86-
public SdkPublisher<ClosableAsyncRequestBody> splitV2(
86+
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(
8787
Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
88-
return delegate.splitV2(splitConfiguration);
88+
return delegate.splitClosable(splitConfiguration);
8989
}
9090

9191
@Override

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ public String body() {
123123
return BodyType.BYTES.getName();
124124
}
125125

126+
public static ByteBuffersAsyncRequestBody of(List<ByteBuffer> buffers, long length) {
127+
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
128+
}
129+
126130
public static ByteBuffersAsyncRequestBody of(List<ByteBuffer> buffers) {
127131
long length = buffers.stream()
128132
.mapToLong(ByteBuffer::remaining)

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ClosableAsyncRequestBodyAdaptor.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration s
8888
}
8989

9090
@Override
91-
public SdkPublisher<ClosableAsyncRequestBody> splitV2(AsyncRequestBodySplitConfiguration splitConfiguration) {
91+
public SdkPublisher<ClosableAsyncRequestBody> splitClosable(AsyncRequestBodySplitConfiguration splitConfiguration) {
9292
return split(splitConfiguration).map(body -> new ClosableAsyncRequestBodyWrapper(body));
9393
}
9494

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.function.Consumer;
22+
import org.reactivestreams.Subscriber;
23+
import software.amazon.awssdk.annotations.SdkInternalApi;
24+
import software.amazon.awssdk.core.exception.NonRetryableException;
25+
import software.amazon.awssdk.core.internal.util.NoopSubscription;
26+
import software.amazon.awssdk.utils.Logger;
27+
import software.amazon.awssdk.utils.async.SimplePublisher;
28+
29+
/**
30+
* A {@link SubAsyncRequestBody} implementation that doesn't resubscribe/retry
31+
*/
32+
@SdkInternalApi
33+
public final class NonRetryableSubAsyncRequestBody implements SubAsyncRequestBody {
34+
private static final Logger log = Logger.loggerFor(NonRetryableSubAsyncRequestBody.class);
35+
private final long maxLength;
36+
private final Long totalLength;
37+
private final SimplePublisher<ByteBuffer> delegate = new SimplePublisher<>();
38+
private final int partNumber;
39+
private final AtomicBoolean subscribeCalled = new AtomicBoolean(false);
40+
private final Consumer<Long> onNumBytesReceived;
41+
private final Consumer<Long> onNumBytesConsumed;
42+
private final boolean contentLengthKnown;
43+
private final String sourceBodyName;
44+
private volatile long bufferedLength = 0;
45+
46+
// TODO: builder pattern
47+
public NonRetryableSubAsyncRequestBody(boolean contentLengthKnown,
48+
long maxLength,
49+
int partNumber,
50+
Consumer<Long> onNumBytesReceived,
51+
Consumer<Long> onNumBytesConsumed, String sourceBodyName) {
52+
this.contentLengthKnown = contentLengthKnown;
53+
this.totalLength = contentLengthKnown ? maxLength : null;
54+
this.maxLength = maxLength;
55+
this.partNumber = partNumber;
56+
this.onNumBytesReceived = onNumBytesReceived;
57+
this.onNumBytesConsumed = onNumBytesConsumed;
58+
this.sourceBodyName = sourceBodyName;
59+
}
60+
61+
@Override
62+
public Optional<Long> contentLength() {
63+
return totalLength != null ? Optional.of(totalLength) : Optional.of(bufferedLength);
64+
}
65+
66+
public void send(ByteBuffer data) {
67+
log.debug(() -> String.format("Sending bytebuffer %s to part %d", data, partNumber));
68+
int length = data.remaining();
69+
bufferedLength += length;
70+
onNumBytesReceived.accept((long) length);
71+
delegate.send(data).whenComplete((r, t) -> {
72+
onNumBytesConsumed.accept((long) length);
73+
if (t != null) {
74+
error(t);
75+
}
76+
});
77+
}
78+
79+
public void complete() {
80+
log.debug(() -> "Received complete() for part number: " + partNumber + " length " + bufferedLength);
81+
delegate.complete().whenComplete((r, t) -> {
82+
if (t != null) {
83+
error(t);
84+
}
85+
});
86+
}
87+
88+
@Override
89+
public long maxLength() {
90+
return maxLength;
91+
}
92+
93+
@Override
94+
public long receivedBytesLength() {
95+
return bufferedLength;
96+
}
97+
98+
@Override
99+
public boolean contentLengthKnown() {
100+
return contentLengthKnown;
101+
}
102+
103+
@Override
104+
public int partNumber() {
105+
return partNumber;
106+
}
107+
108+
public void error(Throwable error) {
109+
delegate.error(error);
110+
}
111+
112+
@Override
113+
public void subscribe(Subscriber<? super ByteBuffer> s) {
114+
if (subscribeCalled.compareAndSet(false, true)) {
115+
delegate.subscribe(s);
116+
} else {
117+
s.onSubscribe(new NoopSubscription(s));
118+
s.onError(NonRetryableException.create(
119+
"A retry was attempted, but the provided source AsyncRequestBody does not "
120+
+ "support splitting to retryable AsyncRequestBody. Consider using BufferedSplittableAsyncRequestBody. "));
121+
}
122+
}
123+
124+
@Override
125+
public String body() {
126+
return sourceBodyName;
127+
}
128+
}

0 commit comments

Comments
 (0)