Skip to content

Commit c497318

Browse files
davidh44zoewangg
andauthored
Retry support for Java-based S3 multipart client for multipart GET to bytes (#6328)
* Override split method in byte array based response transformer * Rename duplicate test * Update tests and refactor * Clean up comments * Remove unused import * Refactor tests * Revert logging enabled * Revert logging enabled * Revert logging enabled * Add changelog * Update javadocs * Use maximumBufferSize in ByteArraySplittingTransformer * Revert "Use maximumBufferSize in ByteArraySplittingTransformer" This reverts commit 1d136e5. * Address comments and refactor tests * Address comments * Remove unused import * Address comments and avoid copying buffer * Address comments * Add check for total parts --------- Co-authored-by: Zoe Wang <[email protected]>
1 parent 4886655 commit c497318

File tree

14 files changed

+894
-275
lines changed

14 files changed

+894
-275
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "Amazon S3",
4+
"contributor": "",
5+
"description": "Add retry support for Java based S3 multipart client download to Byte array"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.reactivestreams.Subscription;
2525
import software.amazon.awssdk.annotations.SdkInternalApi;
2626
import software.amazon.awssdk.core.ResponseBytes;
27+
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
2728
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2829
import software.amazon.awssdk.core.async.SdkPublisher;
2930
import software.amazon.awssdk.utils.BinaryUtils;
@@ -67,6 +68,17 @@ public void exceptionOccurred(Throwable throwable) {
6768
cf.completeExceptionally(throwable);
6869
}
6970

71+
@Override
72+
public SplitResult<ResponseT, ResponseBytes<ResponseT>> split(SplittingTransformerConfiguration splitConfig) {
73+
CompletableFuture<ResponseBytes<ResponseT>> future = new CompletableFuture<>();
74+
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer =
75+
new ByteArraySplittingTransformer<>(this, future);
76+
return AsyncResponseTransformer.SplitResult.<ResponseT, ResponseBytes<ResponseT>>builder()
77+
.publisher(transformer)
78+
.resultFuture(future)
79+
.build();
80+
}
81+
7082
@Override
7183
public String name() {
7284
return TransformerType.BYTES.getName();
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Map;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
import org.reactivestreams.Subscriber;
27+
import org.reactivestreams.Subscription;
28+
import software.amazon.awssdk.annotations.SdkInternalApi;
29+
import software.amazon.awssdk.core.ResponseBytes;
30+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
31+
import software.amazon.awssdk.core.async.SdkPublisher;
32+
import software.amazon.awssdk.core.exception.SdkClientException;
33+
import software.amazon.awssdk.utils.CompletableFutureUtils;
34+
import software.amazon.awssdk.utils.Logger;
35+
import software.amazon.awssdk.utils.async.SimplePublisher;
36+
37+
/**
38+
* A splitting transformer that creates individual {@link ByteArrayAsyncResponseTransformer} instances for each part of a
39+
* multipart download. This is necessary to support retries of individual part downloads.
40+
*
41+
* <p>
42+
* This class is created by {@link ByteArrayAsyncResponseTransformer#split} and used internally by the multipart
43+
* download logic.
44+
*/
45+
46+
@SdkInternalApi
47+
public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> {
48+
private static final Logger log = Logger.loggerFor(ByteArraySplittingTransformer.class);
49+
private final AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> upstreamResponseTransformer;
50+
private final CompletableFuture<ResponseBytes<ResponseT>> resultFuture;
51+
private Subscriber<? super AsyncResponseTransformer<ResponseT, ResponseT>> downstreamSubscriber;
52+
private final AtomicInteger nextPartNumber = new AtomicInteger(1);
53+
private final AtomicReference<ResponseT> responseT = new AtomicReference<>();
54+
55+
private final SimplePublisher<ByteBuffer> publisherToUpstream = new SimplePublisher<>();
56+
/**
57+
* The amount requested by the downstream subscriber that is still left to fulfill. Updated when the
58+
* {@link Subscription#request(long) request} method is called on the downstream subscriber's subscription. Corresponds to the
59+
* number of {@code AsyncResponseTransformer} that will be published to the downstream subscriber.
60+
*/
61+
private final AtomicLong outstandingDemand = new AtomicLong(0);
62+
63+
/**
64+
* This flag stops the current thread from publishing transformers while another thread is already publishing.
65+
*/
66+
private final AtomicBoolean emitting = new AtomicBoolean(false);
67+
68+
/**
69+
* Synchronization lock that protects the {@code onStreamCalled} flag and cancellation
70+
* workflow from concurrent access. Ensures thread-safety of subscription cancellation.
71+
*/
72+
private final Object lock = new Object();
73+
74+
/**
75+
* Set to true once {@code .onStream()} is called on the upstreamResponseTransformer
76+
*/
77+
private boolean onStreamCalled;
78+
79+
/**
80+
* Set to true once {@code .cancel()} is called in the subscription of the downstream subscriber, or if the
81+
* {@code resultFuture} is cancelled.
82+
*/
83+
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
84+
85+
private final Map<Integer, ByteBuffer> buffers;
86+
87+
public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
88+
upstreamResponseTransformer,
89+
CompletableFuture<ResponseBytes<ResponseT>> resultFuture) {
90+
this.upstreamResponseTransformer = upstreamResponseTransformer;
91+
this.resultFuture = resultFuture;
92+
this.buffers = new ConcurrentHashMap<>();
93+
}
94+
95+
@Override
96+
public void subscribe(Subscriber<? super AsyncResponseTransformer<ResponseT, ResponseT>> subscriber) {
97+
this.downstreamSubscriber = subscriber;
98+
subscriber.onSubscribe(new DownstreamSubscription());
99+
}
100+
101+
private final class DownstreamSubscription implements Subscription {
102+
103+
@Override
104+
public void request(long n) {
105+
if (n <= 0) {
106+
downstreamSubscriber.onError(new IllegalArgumentException("Amount requested must be positive"));
107+
return;
108+
}
109+
long newDemand = outstandingDemand.updateAndGet(current -> {
110+
if (Long.MAX_VALUE - current < n) {
111+
return Long.MAX_VALUE;
112+
}
113+
return current + n;
114+
});
115+
log.trace(() -> String.format("new outstanding demand: %s", newDemand));
116+
emit();
117+
}
118+
119+
@Override
120+
public void cancel() {
121+
log.trace(() -> String.format("received cancel signal. Current cancel state is 'isCancelled=%s'", isCancelled.get()));
122+
if (isCancelled.compareAndSet(false, true)) {
123+
handleSubscriptionCancel();
124+
}
125+
}
126+
}
127+
128+
private void emit() {
129+
do {
130+
if (!emitting.compareAndSet(false, true)) {
131+
return;
132+
}
133+
try {
134+
if (doEmit()) {
135+
return;
136+
}
137+
} finally {
138+
emitting.compareAndSet(true, false);
139+
}
140+
} while (outstandingDemand.get() > 0);
141+
}
142+
143+
private boolean doEmit() {
144+
long demand = outstandingDemand.get();
145+
146+
while (demand > 0) {
147+
if (isCancelled.get()) {
148+
return true;
149+
}
150+
151+
demand = outstandingDemand.decrementAndGet();
152+
downstreamSubscriber.onNext(new IndividualTransformer(nextPartNumber.getAndIncrement()));
153+
}
154+
return false;
155+
}
156+
157+
/**
158+
* Handle the {@code .cancel()} signal received from the downstream subscription. Data that is being sent to the upstream
159+
* transformer need to finish processing before we complete. One typical use case for this is completing the multipart
160+
* download, the subscriber having reached the final part will signal that it doesn't need more
161+
* {@link AsyncResponseTransformer}s by calling {@code .cancel()} on the subscription.
162+
*/
163+
private void handleSubscriptionCancel() {
164+
synchronized (lock) {
165+
if (downstreamSubscriber == null) {
166+
log.trace(() -> "downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()");
167+
return;
168+
}
169+
if (!onStreamCalled) {
170+
// we never subscribe publisherToUpstream to the upstream, it would not complete
171+
downstreamSubscriber = null;
172+
return;
173+
}
174+
175+
// if result future is already complete (likely by exception propagation), skip.
176+
if (resultFuture.isDone()) {
177+
return;
178+
}
179+
180+
try {
181+
CompletableFuture<ResponseBytes<ResponseT>> upstreamPrepareFuture = upstreamResponseTransformer.prepare();
182+
CompletableFutureUtils.forwardResultTo(upstreamPrepareFuture, resultFuture);
183+
184+
upstreamResponseTransformer.onResponse(responseT.get());
185+
186+
int totalPartCount = nextPartNumber.get() - 1;
187+
if (buffers.size() != totalPartCount) {
188+
resultFuture.completeExceptionally(
189+
SdkClientException.create(String.format("Number of parts in buffer [%d] does not match total part count"
190+
+ " [%d], some parts did not complete successfully.",
191+
buffers.size(), totalPartCount)));
192+
return;
193+
}
194+
for (int i = 1; i <= totalPartCount; ++i) {
195+
publisherToUpstream.send(buffers.get(i)).exceptionally(ex -> {
196+
resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", ex));
197+
return null;
198+
});
199+
}
200+
201+
publisherToUpstream.complete().exceptionally(ex -> {
202+
resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", ex));
203+
return null;
204+
});
205+
upstreamResponseTransformer.onStream(SdkPublisher.adapt(publisherToUpstream));
206+
207+
} catch (Throwable throwable) {
208+
resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", throwable));
209+
}
210+
}
211+
}
212+
213+
private final class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> {
214+
private final int partNumber;
215+
private final ByteArrayAsyncResponseTransformer<ResponseT> delegate = new ByteArrayAsyncResponseTransformer<>();
216+
217+
private IndividualTransformer(int partNumber) {
218+
this.partNumber = partNumber;
219+
}
220+
221+
@Override
222+
public CompletableFuture<ResponseT> prepare() {
223+
CompletableFuture<ResponseBytes<ResponseT>> prepare = delegate.prepare();
224+
return prepare.thenApply(responseBytes -> {
225+
buffers.put(partNumber, responseBytes.asByteBuffer());
226+
return responseBytes.response();
227+
});
228+
}
229+
230+
@Override
231+
public void onResponse(ResponseT response) {
232+
responseT.compareAndSet(null, response);
233+
delegate.onResponse(response);
234+
}
235+
236+
@Override
237+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
238+
delegate.onStream(publisher);
239+
synchronized (lock) {
240+
if (!onStreamCalled) {
241+
onStreamCalled = true;
242+
}
243+
}
244+
}
245+
246+
@Override
247+
public void exceptionOccurred(Throwable error) {
248+
delegate.exceptionOccurred(error);
249+
}
250+
}
251+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.tck.PublisherVerification;
21+
import org.reactivestreams.tck.TestEnvironment;
22+
import software.amazon.awssdk.core.ResponseBytes;
23+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
24+
import software.amazon.awssdk.core.async.SdkPublisher;
25+
26+
public class ByteArraySplittingTransformerTckTest extends PublisherVerification<AsyncResponseTransformer<Object, Object>> {
27+
28+
public ByteArraySplittingTransformerTckTest() {
29+
super(new TestEnvironment());
30+
}
31+
32+
@Override
33+
public Publisher<AsyncResponseTransformer<Object, Object>> createPublisher(long l) {
34+
CompletableFuture<ResponseBytes<Object>> future = new CompletableFuture<>();
35+
AsyncResponseTransformer<Object, ResponseBytes<Object>> upstreamTransformer = AsyncResponseTransformer.toBytes();
36+
ByteArraySplittingTransformer<Object> transformer = new ByteArraySplittingTransformer<>(upstreamTransformer, future);
37+
return SdkPublisher.adapt(transformer).limit(Math.toIntExact(l));
38+
}
39+
40+
@Override
41+
public Publisher<AsyncResponseTransformer<Object, Object>> createFailedPublisher() {
42+
return null;
43+
}
44+
45+
@Override
46+
public long maxElementsFromPublisher() {
47+
return Long.MAX_VALUE;
48+
}
49+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import software.amazon.awssdk.services.s3.S3AsyncClient;
2424
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
2525
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
26+
import software.amazon.awssdk.utils.CompletableFutureUtils;
2627
import software.amazon.awssdk.utils.Logger;
2728

2829
@SdkInternalApi
@@ -49,7 +50,9 @@ public <T> CompletableFuture<T> downloadObject(
4950
.build());
5051
MultipartDownloaderSubscriber subscriber = subscriber(getObjectRequest);
5152
split.publisher().subscribe(subscriber);
52-
return split.resultFuture();
53+
CompletableFuture<T> splitFuture = split.resultFuture();
54+
CompletableFutureUtils.forwardExceptionTo(subscriber.future(), splitFuture);
55+
return splitFuture;
5356
}
5457

5558
private MultipartDownloaderSubscriber subscriber(GetObjectRequest getObjectRequest) {

0 commit comments

Comments
 (0)