Skip to content

make split methode workable with Pattern class #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
127 changes: 69 additions & 58 deletions src/main/java/rx/observables/StringObservable.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2014 Netflix, Inc.
*
* <p/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't change the license comment.

* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -32,12 +32,7 @@
import java.io.Reader;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.*;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
Expand All @@ -48,7 +43,7 @@ public class StringObservable {
* {@code byte[]}s. Supports backpressure.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
*
*
* @param i
* Source {@link InputStream}
* @return the Observable containing read byte arrays from the input
Expand All @@ -69,11 +64,11 @@ public static interface UnsafeFunc0<R> extends Callable<R> {

/**
* Helps in creating an Observable that automatically calls {@link Closeable#close()} on completion, error or unsubscribe.
*
*
* <pre>
* StringObservable.using(() -> new FileReader(file), (reader) -> StringObservable.from(reader))
* </pre>
*
*
* @param resourceFactory
* Generates a new {@link Closeable} resource for each new subscription to the returned Observable
* @param observableFactory
Expand All @@ -82,7 +77,7 @@ public static interface UnsafeFunc0<R> extends Callable<R> {
* An {@link Observable} that automatically closes the resource when done.
*/
public static <R, S extends Closeable> Observable<R> using(final UnsafeFunc0<S> resourceFactory,
final Func1<S, Observable<R>> observableFactory) {
final Func1<S, Observable<R>> observableFactory) {
return Observable.using(new Func0<S>() {
@Override
public S call() {
Expand All @@ -109,7 +104,7 @@ public void call(S resource) {
* {@code byte[]}s. Supports backpressure.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
*
*
* @param is
* Source {@link InputStream}
* @param size
Expand All @@ -125,7 +120,7 @@ public static Observable<byte[]> from(final InputStream is, final int size) {
* {@link String}s. Supports backpressure.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
*
*
* @param i
* Source {@link Reader}
* @return the Observable of Strings read from the source
Expand All @@ -139,7 +134,7 @@ public static Observable<String> from(final Reader i) {
* {@link String}s. Supports backpressure.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
*
*
* @param i
* Source {@link Reader}
* @param size
Expand All @@ -155,7 +150,7 @@ public static Observable<String> from(final Reader reader, final int size) {
* and where handles when a multibyte character spans two chunks.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png" alt="">
*
*
* @param src
* @param charsetName
* @return the Observable returning a stream of decoded strings
Expand All @@ -169,7 +164,7 @@ public static Observable<String> decode(Observable<byte[]> src, String charsetNa
* and where handles when a multibyte character spans two chunks.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png" alt="">
*
*
* @param src
* @param charset
* @return the Observable returning a stream of decoded strings
Expand All @@ -184,7 +179,7 @@ public static Observable<String> decode(Observable<byte[]> src, Charset charset)
* This method allows for more control over how malformed and unmappable characters are handled.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png" alt="">
*
*
* @param src
* @param charsetDecoder
* @return the Observable returning a stream of decoded strings
Expand Down Expand Up @@ -225,16 +220,13 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
bb.put(last);
bb.put(next);
bb.flip();
}
else { // next == null
} else { // next == null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid whitespace and code style changes as much as possible; it makes the review more difficult. If you fix or introduce a method, you are free to code in your own style but don't change any unaffected code style.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you right, sorry intelliJ have make some magic foo after the commit.

bb = last;
}
}
else { // last == null
} else { // last == null
if (next != null) {
bb = ByteBuffer.wrap(next);
}
else { // next == null
} else { // next == null
return true;
}
}
Expand All @@ -246,17 +238,15 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
if (cr.isError()) {
try {
cr.throwException();
}
catch (CharacterCodingException e) {
} catch (CharacterCodingException e) {
o.onError(e);
return false;
}
}

if (bb.remaining() > 0) {
leftOver = bb;
}
else {
} else {
leftOver = null;
}

Expand All @@ -275,7 +265,7 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
* Encodes a possibly infinite stream of strings into an Observable of byte arrays.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.encode.png" alt="">
*
*
* @param src
* @param charsetName
* @return the Observable with a stream of encoded byte arrays
Expand All @@ -288,7 +278,7 @@ public static Observable<byte[]> encode(Observable<String> src, String charsetNa
* Encodes a possibly infinite stream of strings into an Observable of byte arrays.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.encode.png" alt="">
*
*
* @param src
* @param charset
* @return the Observable with a stream of encoded byte arrays
Expand All @@ -302,7 +292,7 @@ public static Observable<byte[]> encode(Observable<String> src, Charset charset)
* This method allows for more control over how malformed and unmappable characters are handled.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.encode.png" alt="">
*
*
* @param src
* @param charsetEncoder
* @return the Observable with a stream of encoded byte arrays
Expand All @@ -328,7 +318,7 @@ public byte[] call(String str) {
* this on infinite streams.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.stringConcat.png" alt="">
*
*
* @param src
* @return the Observable returing all strings concatenated as a single string
*/
Expand All @@ -342,7 +332,7 @@ public StringBuilder call(StringBuilder a, String b) {
}

/**
* Maps {@link Observable}&lt;{@link Object}&gt; to {@link Observable}&lt;{@link String}&gt; by using {@link String#valueOf(Object)}
* Maps {@link Observable}&lt;{@link Object}&gt; to {@link Observable}&lt;{@link String}&gt; by using {@link String#valueOf(Object)}
* @param src
* @return An {@link Observable} of only {@link String}s.
*/
Expand All @@ -357,23 +347,22 @@ public String call(Object obj) {

/**
* Rechunks the strings based on a regex pattern and works on infinite stream.
*
*
* <pre>
* split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
* split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
* </pre>
*
*
* See {@link Pattern}
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.split.png" alt="">
*
*
* @param src
* @param regex
* @param pattern
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add proper javadoc comments to the parameters.

* @return the Observable streaming the split values
*/
public static Observable<String> split(final Observable<String> src, String regex) {
final Pattern pattern = Pattern.compile(regex);

public static Observable<String> split(final Observable<String> src, final Pattern pattern) {
return src.lift(new Operator<String, String>() {
@Override
public Subscriber<? super String> call(final Subscriber<? super String> o) {
Expand All @@ -382,15 +371,15 @@ public Subscriber<? super String> call(final Subscriber<? super String> o) {

@Override
public void onCompleted() {
if (leftOver!=null)
if (leftOver != null)
output(leftOver);
if (!o.isUnsubscribed())
o.onCompleted();
}

@Override
public void onError(Throwable e) {
if (leftOver!=null)
if (leftOver != null)
output(leftOver);
if (!o.isUnsubscribed())
o.onError(e);
Expand All @@ -413,14 +402,13 @@ public void onNext(String segment) {

/**
* when limit == 0 trailing empty parts are not emitted.
*
*
* @param part
*/
private void output(String part) {
if (part.isEmpty()) {
emptyPartCount++;
}
else {
} else {
for (; emptyPartCount > 0; emptyPartCount--)
if (!o.isUnsubscribed())
o.onNext("");
Expand All @@ -433,6 +421,28 @@ private void output(String part) {
});
}

/**
* Rechunks the strings based on a regex pattern and works on infinite stream.
*
* <pre>
* split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
* split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
* </pre>
*
* See {@link Pattern}
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.split.png" alt="">
*
* @param src
* @param regex
* @return the Observable streaming the split values
*/

public static Observable<String> split(final Observable<String> src, String regex) {
final Pattern pattern = Pattern.compile(regex);
return StringObservable.split(src,pattern);
}

/**
* Concatenates the sequence of values by adding a separator
* between them and emitting the result once the source completes.
Expand All @@ -443,14 +453,14 @@ private void output(String part) {
* {@link java.lang.String#valueOf(java.lang.Object)} calls.
* <p>
* For example:
*
*
* <pre>
* Observable&lt;Object&gt; source = Observable.from(&quot;a&quot;, 1, &quot;c&quot;);
* Observable&lt;String&gt; result = join(source, &quot;, &quot;);
* </pre>
*
*
* will yield a single element equal to "a, 1, c".
*
*
* @param source
* the source sequence of CharSequence values
* @param separator
Expand All @@ -467,22 +477,23 @@ public Subscriber<String> call(final Subscriber<? super String> child) {
child.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0) {
parent.requestAll();
}
}});
if (n > 0) {
parent.requestAll();
}
}
});
return parent;
}
});
}

private static final class JoinParentSubscriber extends Subscriber<String> {

private final Subscriber<? super String> child;
private final CharSequence separator;
private boolean mayAddSeparator;
private StringBuilder b = new StringBuilder();

JoinParentSubscriber(Subscriber<? super String> child, CharSequence separator) {
this.child = child;
this.separator = separator;
Expand All @@ -491,7 +502,7 @@ private static final class JoinParentSubscriber extends Subscriber<String> {
void requestAll() {
request(Long.MAX_VALUE);
}

@Override
public void onStart() {
request(0);
Expand Down Expand Up @@ -522,14 +533,14 @@ public void onNext(String t) {
mayAddSeparator = true;
b.append(t);
}

}

/**
* Splits the {@link Observable} of Strings by lines and numbers them (zero based index)
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.byLine.png" alt="">
*
*
* @param source
* @return the Observable conaining the split lines of the source
*/
Expand Down