Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 34 additions & 83 deletions src/main/java/rx/observables/StringObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,40 +55,6 @@ public static Observable<byte[]> from(final InputStream i) {
return from(i, 8 * 1024);
}

/**
* Converts an String into an Observable that emits the chars in the String.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/from.png" alt="">
*
* @param str
* the source String
* @return an Observable that emits each char in the source String
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#from">RxJava wiki: from</a>
*/
public final static Observable<String> from(final String str) {
return Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
// Start emitting token's char
for (Character c : str.toCharArray()) {
subscriber.onNext(c.toString());
}

// Notify on completed
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch (Throwable t) {
// Notify on error
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}
}
});
}

/**
* Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations.
* @see StringObservable#using(UnsafeFunc0, Func1)
Expand Down Expand Up @@ -566,50 +532,6 @@ public void onNext(String t) {
});
}

public final static class Line {
private final int number;
private final String text;

public Line(int number, String text) {
this.number = number;
this.text = text;
}

public int getNumber() {
return number;
}

public String getText() {
return text;
}

@Override
public int hashCode() {
int result = 31 + number;
result = 31 * result + (text == null ? 0 : text.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Line))
return false;
Line other = (Line) obj;
if (number != other.number)
return false;
if (other.text == text)
return true;
if (text == null)
return false;
return text.equals(other.text);
}

@Override
public String toString() {
return number + ":" + text;
}
}

/**
* Splits the {@link Observable} of Strings by lines and numbers them (zero based index)
* <p>
Expand All @@ -618,13 +540,42 @@ public String toString() {
* @param source
* @return the Observable conaining the split lines of the source
*/
public static Observable<Line> byLine(Observable<String> source) {
return split(source, System.getProperty("line.separator")).map(new Func1<String, Line>() {
int lineNumber = 0;
public static Observable<String> byLine(Observable<String> source) {
return split(source, System.getProperty("line.separator"));
}

/**
* Converts an String into an Observable that emits the chars in the String.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/from.png" alt="">
*
* @param str
* the source String
* @return an Observable that emits each char in the source String
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#from">RxJava wiki: from</a>
*/
public static Observable<String> byCharacter(Observable<String> source) {
return source.lift(new Operator<String, String>() {
@Override
public Line call(String text) {
return new Line(lineNumber++, text);
public Subscriber<? super String> call(final Subscriber<? super String> subscriber) {
return new Subscriber<String>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(String str) {
for (char c : str.toCharArray()) {
subscriber.onNext(Character.toString(c));
}
}
};
}
});
}
Expand Down
23 changes: 10 additions & 13 deletions src/test/java/rx/observables/StringObservableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static rx.observables.StringObservable.byLine;
import static rx.observables.StringObservable.byCharacter;
import static rx.observables.StringObservable.decode;
import static rx.observables.StringObservable.encode;
import static rx.observables.StringObservable.from;
Expand All @@ -52,7 +53,6 @@
import rx.Observable;
import rx.Observer;
import rx.functions.Func1;
import rx.observables.StringObservable.Line;
import rx.observables.StringObservable.UnsafeFunc0;
import rx.observers.TestObserver;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -278,15 +278,6 @@ public synchronized int read(byte[] b, int off, int len) {
assertEquals(1, numReads.get());
}

@Test
public void testFromString(){
String foo = "foo";

assertEquals("f", StringObservable.from(foo).first().toBlocking().single());
assertEquals("o", StringObservable.from(foo).skip(1).take(1).toBlocking().single());
assertEquals("o", StringObservable.from(foo).takeLast(1).toBlocking().single());
}

@Test
public void testFromReader() {
final String inStr = "test";
Expand All @@ -299,10 +290,16 @@ public void testFromReader() {
public void testByLine() {
String newLine = System.getProperty("line.separator");

List<Line> lines = byLine(Observable.from(Arrays.asList("qwer", newLine + "asdf" + newLine, "zx", "cv")))
.toList().toBlocking().single();
List<String> lines = byLine(Observable.from(Arrays.asList("qwer", newLine + "asdf" + newLine, "zx", "cv"))).toList().toBlocking().single();

assertEquals(Arrays.asList("qwer", "asdf", "zxcv"), lines);
}

@Test
public void testByCharacter() {
List<String> chars = byCharacter(Observable.from(Arrays.asList("foo", "bar"))).toList().toBlocking().single();

assertEquals(Arrays.asList(new Line(0, "qwer"), new Line(1, "asdf"), new Line(2, "zxcv")), lines);
assertEquals(Arrays.asList("f", "o", "o", "b", "a", "r"), chars);
}

@Test
Expand Down