Skip to content

Commit 48d67a2

Browse files
committed
Add support for RxJava 2
This commit adds support for RxJava 2 Completable, Single, Observable and Flowable types (io.reactivex package). Issue: SPR-14628
1 parent b4641b2 commit 48d67a2

File tree

8 files changed

+263
-0
lines changed

8 files changed

+263
-0
lines changed

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ configure(allprojects) { project ->
8080
ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT'
8181
ext.romeVersion = "1.6.0"
8282
ext.rxjavaVersion = '1.1.9'
83+
ext.rxjava2Version = '2.0.0-RC1'
8384
ext.rxnettyVersion = '0.5.2-rc.3'
8485
ext.servletVersion = "3.1.0"
8586
ext.slf4jVersion = "1.7.21"
@@ -389,6 +390,7 @@ project("spring-core") {
389390
optional("org.reactivestreams:reactive-streams:${reactivestreamsVersion}")
390391
optional("io.projectreactor:reactor-core:${reactorCoreVersion}")
391392
optional "io.reactivex:rxjava:${rxjavaVersion}"
393+
optional "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
392394
optional("io.netty:netty-buffer:${nettyVersion}")
393395
testCompile("javax.xml.bind:jaxb-api:${jaxbVersion}")
394396
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
@@ -721,6 +723,7 @@ project("spring-web") {
721723
exclude group: 'io.reactivex', module: 'rxjava'
722724
}
723725
optional("io.reactivex:rxjava:${rxjavaVersion}")
726+
optional "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
724727
optional("io.undertow:undertow-core:${undertowVersion}")
725728
optional("org.jboss.xnio:xnio-api:${xnioVersion}")
726729
optional("io.netty:netty-buffer:${nettyVersion}") // Temporarily for JsonObjectDecoder
@@ -797,6 +800,7 @@ project("spring-web-reactive") {
797800
exclude group: 'io.reactivex', module: 'rxjava'
798801
}
799802
testCompile("io.reactivex:rxjava:${rxjavaVersion}")
803+
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
800804
testCompile("io.undertow:undertow-core:${undertowVersion}")
801805
testCompile("org.jboss.xnio:xnio-api:${xnioVersion}")
802806
testCompile("com.fasterxml:aalto-xml:1.0.0")

spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import java.util.function.Function;
2424
import java.util.function.Predicate;
2525

26+
import io.reactivex.BackpressureStrategy;
27+
import io.reactivex.Flowable;
2628
import org.reactivestreams.Publisher;
2729
import reactor.adapter.RxJava1Adapter;
30+
import reactor.adapter.RxJava2Adapter;
2831
import reactor.core.publisher.Flux;
2932
import reactor.core.publisher.Mono;
3033
import rx.Completable;
@@ -48,6 +51,9 @@ public class ReactiveAdapterRegistry {
4851
private static final boolean rxJava1Present =
4952
ClassUtils.isPresent("rx.Observable", ReactiveAdapterRegistry.class.getClassLoader());
5053

54+
private static final boolean rxJava2Present =
55+
ClassUtils.isPresent("io.reactivex.Flowable", ReactiveAdapterRegistry.class.getClassLoader());
56+
5157
private final Map<Class<?>, ReactiveAdapter> adapterMap = new LinkedHashMap<>(4);
5258

5359

@@ -72,6 +78,9 @@ public ReactiveAdapterRegistry() {
7278
if (rxJava1Present) {
7379
new RxJava1AdapterRegistrar().register(this);
7480
}
81+
if (rxJava2Present) {
82+
new RxJava2AdapterRegistrar().register(this);
83+
}
7584
}
7685

7786

@@ -269,4 +278,28 @@ public void register(ReactiveAdapterRegistry registry) {
269278
}
270279
}
271280

281+
private static class RxJava2AdapterRegistrar {
282+
283+
public void register(ReactiveAdapterRegistry registry) {
284+
registry.registerFluxAdapter(Flowable.class,
285+
source -> RxJava2Adapter.flowableToFlux((Flowable<?>) source),
286+
RxJava2Adapter::fluxToFlowable
287+
);
288+
registry.registerFluxAdapter(io.reactivex.Observable.class,
289+
source -> RxJava2Adapter.observableToFlux((io.reactivex.Observable<?>) source, BackpressureStrategy.BUFFER),
290+
RxJava2Adapter::fluxToObservable
291+
);
292+
registry.registerMonoAdapter(io.reactivex.Single.class,
293+
source -> RxJava2Adapter.singleToMono((io.reactivex.Single<?>) source),
294+
RxJava2Adapter::monoToSingle,
295+
new ReactiveAdapter.Descriptor(false, false, false)
296+
);
297+
registry.registerMonoAdapter(io.reactivex.Completable.class,
298+
source -> RxJava2Adapter.completableToMono((io.reactivex.Completable) source),
299+
RxJava2Adapter::monoToCompletable,
300+
new ReactiveAdapter.Descriptor(false, true, true)
301+
);
302+
}
303+
}
304+
272305
}

spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.concurrent.CompletableFuture;
1919

20+
import io.reactivex.Flowable;
2021
import org.junit.Before;
2122
import org.junit.Test;
2223
import org.reactivestreams.Publisher;
@@ -55,6 +56,10 @@ public void getDefaultAdapters() throws Exception {
5556
testFluxAdapter(Observable.class);
5657
testMonoAdapter(Single.class);
5758
testMonoAdapter(Completable.class);
59+
testFluxAdapter(Flowable.class);
60+
testFluxAdapter(io.reactivex.Observable.class);
61+
testMonoAdapter(io.reactivex.Single.class);
62+
testMonoAdapter(io.reactivex.Completable.class);
5863
}
5964

6065
private void testFluxAdapter(Class<?> adapteeType) {

spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/HttpEntityArgumentResolverTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import java.util.List;
2424
import java.util.concurrent.CompletableFuture;
2525

26+
import io.reactivex.BackpressureStrategy;
27+
import io.reactivex.Flowable;
2628
import org.junit.Before;
2729
import org.junit.Test;
2830
import reactor.adapter.RxJava1Adapter;
31+
import reactor.adapter.RxJava2Adapter;
2932
import reactor.core.publisher.Flux;
3033
import reactor.core.publisher.Mono;
3134
import rx.Observable;
@@ -67,6 +70,7 @@
6770
* {@link MessageReaderArgumentResolverTests}.
6871
*
6972
* @author Rossen Stoyanchev
73+
* @author Sebastien Deleuze
7074
*/
7175
public class HttpEntityArgumentResolverTests {
7276

@@ -98,9 +102,12 @@ public void supports() throws Exception {
98102
testSupports(httpEntityType(String.class));
99103
testSupports(httpEntityType(forClassWithGenerics(Mono.class, String.class)));
100104
testSupports(httpEntityType(forClassWithGenerics(Single.class, String.class)));
105+
testSupports(httpEntityType(forClassWithGenerics(io.reactivex.Single.class, String.class)));
101106
testSupports(httpEntityType(forClassWithGenerics(CompletableFuture.class, String.class)));
102107
testSupports(httpEntityType(forClassWithGenerics(Flux.class, String.class)));
103108
testSupports(httpEntityType(forClassWithGenerics(Observable.class, String.class)));
109+
testSupports(httpEntityType(forClassWithGenerics(io.reactivex.Observable.class, String.class)));
110+
testSupports(httpEntityType(forClassWithGenerics(Flowable.class, String.class)));
104111
testSupports(forClassWithGenerics(RequestEntity.class, String.class));
105112
}
106113

@@ -153,6 +160,16 @@ public void emptyBodyWithSingle() throws Exception {
153160
.assertError(ServerWebInputException.class);
154161
}
155162

163+
@Test
164+
public void emptyBodyWithRxJava2Single() throws Exception {
165+
ResolvableType type = httpEntityType(forClassWithGenerics(io.reactivex.Single.class, String.class));
166+
HttpEntity<io.reactivex.Single<String>> entity = resolveValueWithEmptyBody(type);
167+
168+
TestSubscriber.subscribe(RxJava2Adapter.singleToMono(entity.getBody()))
169+
.assertNoValues()
170+
.assertError(ServerWebInputException.class);
171+
}
172+
156173
@Test
157174
public void emptyBodyWithObservable() throws Exception {
158175
ResolvableType type = httpEntityType(forClassWithGenerics(Observable.class, String.class));
@@ -164,6 +181,28 @@ public void emptyBodyWithObservable() throws Exception {
164181
.assertNoValues();
165182
}
166183

184+
@Test
185+
public void emptyBodyWithRxJava2Observable() throws Exception {
186+
ResolvableType type = httpEntityType(forClassWithGenerics(io.reactivex.Observable.class, String.class));
187+
HttpEntity<io.reactivex.Observable<String>> entity = resolveValueWithEmptyBody(type);
188+
189+
TestSubscriber.subscribe(RxJava2Adapter.observableToFlux(entity.getBody(), BackpressureStrategy.BUFFER))
190+
.assertNoError()
191+
.assertComplete()
192+
.assertNoValues();
193+
}
194+
195+
@Test
196+
public void emptyBodyWithFlowable() throws Exception {
197+
ResolvableType type = httpEntityType(forClassWithGenerics(Flowable.class, String.class));
198+
HttpEntity<Flowable<String>> entity = resolveValueWithEmptyBody(type);
199+
200+
TestSubscriber.subscribe(RxJava2Adapter.flowableToFlux(entity.getBody()))
201+
.assertNoError()
202+
.assertComplete()
203+
.assertNoValues();
204+
}
205+
167206
@Test
168207
public void emptyBodyWithCompletableFuture() throws Exception {
169208
ResolvableType type = httpEntityType(forClassWithGenerics(CompletableFuture.class, String.class));
@@ -205,6 +244,16 @@ public void httpEntityWithSingleBody() throws Exception {
205244
assertEquals("line1", httpEntity.getBody().toBlocking().value());
206245
}
207246

247+
@Test
248+
public void httpEntityWithRxJava2SingleBody() throws Exception {
249+
String body = "line1";
250+
ResolvableType type = httpEntityType(forClassWithGenerics(io.reactivex.Single.class, String.class));
251+
HttpEntity<io.reactivex.Single<String>> httpEntity = resolveValue(type, body);
252+
253+
assertEquals(this.request.getHeaders(), httpEntity.getHeaders());
254+
assertEquals("line1", httpEntity.getBody().blockingGet());
255+
}
256+
208257
@Test
209258
public void httpEntityWithCompletableFutureBody() throws Exception {
210259
String body = "line1";
@@ -295,7 +344,10 @@ void handle(
295344
HttpEntity<Mono<String>> monoBody,
296345
HttpEntity<Flux<String>> fluxBody,
297346
HttpEntity<Single<String>> singleBody,
347+
HttpEntity<io.reactivex.Single<String>> xJava2SingleBody,
298348
HttpEntity<Observable<String>> observableBody,
349+
HttpEntity<io.reactivex.Observable<String>> rxJava2ObservableBody,
350+
HttpEntity<Flowable<String>> flowableBody,
299351
HttpEntity<CompletableFuture<String>> completableFutureBody,
300352
RequestEntity<String> requestEntity) {}
301353

spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/MessageReaderArgumentResolverTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.CompletableFuture;
3232
import javax.xml.bind.annotation.XmlRootElement;
3333

34+
import io.reactivex.Flowable;
3435
import org.junit.Before;
3536
import org.junit.Test;
3637
import reactor.core.publisher.Flux;
@@ -144,6 +145,16 @@ public void singleTestBean() throws Exception {
144145
assertEquals(new TestBean("f1", "b1"), single.toBlocking().value());
145146
}
146147

148+
@Test
149+
public void rxJava2SingleTestBean() throws Exception {
150+
String body = "{\"bar\":\"b1\",\"foo\":\"f1\"}";
151+
ResolvableType type = forClassWithGenerics(io.reactivex.Single.class, TestBean.class);
152+
MethodParameter param = this.testMethod.resolveParam(type);
153+
io.reactivex.Single<TestBean> single = resolveValue(param, body);
154+
155+
assertEquals(new TestBean("f1", "b1"), single.blockingGet());
156+
}
157+
147158
@Test
148159
public void observableTestBean() throws Exception {
149160
String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
@@ -155,6 +166,28 @@ public void observableTestBean() throws Exception {
155166
observable.toList().toBlocking().first());
156167
}
157168

169+
@Test
170+
public void rxJava2ObservableTestBean() throws Exception {
171+
String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
172+
ResolvableType type = forClassWithGenerics(io.reactivex.Observable.class, TestBean.class);
173+
MethodParameter param = this.testMethod.resolveParam(type);
174+
io.reactivex.Observable<?> observable = resolveValue(param, body);
175+
176+
assertEquals(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")),
177+
observable.toList().blockingFirst());
178+
}
179+
180+
@Test
181+
public void flowableTestBean() throws Exception {
182+
String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
183+
ResolvableType type = forClassWithGenerics(Flowable.class, TestBean.class);
184+
MethodParameter param = this.testMethod.resolveParam(type);
185+
Flowable<?> flowable = resolveValue(param, body);
186+
187+
assertEquals(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")),
188+
flowable.toList().blockingFirst());
189+
}
190+
158191
@Test
159192
public void futureTestBean() throws Exception {
160193
String body = "{\"bar\":\"b1\",\"foo\":\"f1\"}";
@@ -288,7 +321,10 @@ private void handle(
288321
@Validated Mono<TestBean> monoTestBean,
289322
@Validated Flux<TestBean> fluxTestBean,
290323
Single<TestBean> singleTestBean,
324+
io.reactivex.Single<TestBean> rxJava2SingleTestBean,
291325
Observable<TestBean> observableTestBean,
326+
io.reactivex.Observable<TestBean> rxJava2ObservableTestBean,
327+
Flowable<TestBean> flowableTestBean,
292328
CompletableFuture<TestBean> futureTestBean,
293329
TestBean testBean,
294330
Map<String, String> map,

spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/MessageWriterResultHandlerTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import com.fasterxml.jackson.annotation.JsonTypeInfo;
3131
import com.fasterxml.jackson.annotation.JsonTypeName;
32+
import io.reactivex.Flowable;
3233
import org.junit.Before;
3334
import org.junit.Test;
3435
import reactor.core.publisher.Flux;
@@ -112,8 +113,11 @@ public void voidReturnType() throws Exception {
112113
testVoidReturnType(null, ResolvableType.forType(void.class));
113114
testVoidReturnType(Mono.empty(), ResolvableType.forClassWithGenerics(Mono.class, Void.class));
114115
testVoidReturnType(Completable.complete(), ResolvableType.forClass(Completable.class));
116+
testVoidReturnType(io.reactivex.Completable.complete(), ResolvableType.forClass(io.reactivex.Completable.class));
115117
testVoidReturnType(Flux.empty(), ResolvableType.forClassWithGenerics(Flux.class, Void.class));
116118
testVoidReturnType(Observable.empty(), ResolvableType.forClassWithGenerics(Observable.class, Void.class));
119+
testVoidReturnType(io.reactivex.Observable.empty(), ResolvableType.forClassWithGenerics(io.reactivex.Observable.class, Void.class));
120+
testVoidReturnType(Flowable.empty(), ResolvableType.forClassWithGenerics(Flowable.class, Void.class));
117121
}
118122

119123
private void testVoidReturnType(Object body, ResolvableType type) {
@@ -273,10 +277,16 @@ void voidReturn() { }
273277

274278
Completable completable() { return null; }
275279

280+
io.reactivex.Completable rxJava2Completable() { return null; }
281+
276282
Flux<Void> fluxVoid() { return null; }
277283

278284
Observable<Void> observableVoid() { return null; }
279285

286+
io.reactivex.Observable<Void> rxJava2ObservableVoid() { return null; }
287+
288+
Flowable<Void> flowableVoid() { return null; }
289+
280290
OutputStream outputStream() { return null; }
281291

282292
List<ParentClass> listParentClass() { return null; }

spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestBodyArgumentResolverTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,17 @@ void handle(
250250
@RequestBody Mono<String> mono,
251251
@RequestBody Flux<String> flux,
252252
@RequestBody Single<String> single,
253+
@RequestBody io.reactivex.Single<String> rxJava2Single,
253254
@RequestBody Observable<String> obs,
255+
@RequestBody io.reactivex.Observable<String> rxjava2Obs,
254256
@RequestBody CompletableFuture<String> future,
255257
@RequestBody(required = false) String stringNotRequired,
256258
@RequestBody(required = false) Mono<String> monoNotRequired,
257259
@RequestBody(required = false) Flux<String> fluxNotRequired,
258260
@RequestBody(required = false) Single<String> singleNotRequired,
261+
@RequestBody(required = false) io.reactivex.Single<String> rxJava2SingleNotRequired,
259262
@RequestBody(required = false) Observable<String> obsNotRequired,
263+
@RequestBody(required = false) io.reactivex.Observable<String> rxjava2ObsNotRequired,
260264
@RequestBody(required = false) CompletableFuture<String> futureNotRequired,
261265
String notAnnotated) {}
262266

0 commit comments

Comments
 (0)