From f07de13996764f84005336fce7403a886da4317e Mon Sep 17 00:00:00 2001 From: Thomas Nield Date: Wed, 20 Jan 2016 12:35:01 -0600 Subject: [PATCH 01/10] Implement OperatorDoOnEmpty --- src/main/java/rx/Observable.java | 45 ++++-- .../internal/operators/OperatorDoOnEmpty.java | 59 +++++++ .../operators/OperatorDoOnEmptyTest.java | 148 ++++++++++++++++++ 3 files changed, 243 insertions(+), 9 deletions(-) create mode 100644 src/main/java/rx/internal/operators/OperatorDoOnEmpty.java create mode 100644 src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 9287e105de..2d8aacfabe 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -12,21 +12,32 @@ */ package rx; -import java.util.*; -import java.util.concurrent.*; - -import rx.annotations.*; -import rx.exceptions.*; +import rx.annotations.Beta; +import rx.annotations.Experimental; +import rx.exceptions.Exceptions; +import rx.exceptions.OnErrorNotImplementedException; import rx.functions.*; import rx.internal.operators.*; import rx.internal.producers.SingleProducer; -import rx.internal.util.*; -import rx.observables.*; +import rx.internal.util.RxRingBuffer; +import rx.internal.util.ScalarSynchronousObservable; +import rx.internal.util.UtilityFunctions; +import rx.observables.BlockingObservable; +import rx.observables.ConnectableObservable; +import rx.observables.GroupedObservable; import rx.observers.SafeSubscriber; -import rx.plugins.*; -import rx.schedulers.*; +import rx.plugins.RxJavaObservableExecutionHook; +import rx.plugins.RxJavaPlugins; +import rx.schedulers.Schedulers; +import rx.schedulers.TimeInterval; +import rx.schedulers.Timestamped; import rx.subscriptions.Subscriptions; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + /** * The Observable class that implements the Reactive Pattern. *

@@ -4425,6 +4436,22 @@ public final void onNext(T v) { return lift(new OperatorDoOnEach(observer)); } + /** + * Modifies the source Observable so that it invokes an action when it calls {@code onCompleted} and no items were emitted. + *

+ *

+ *
Scheduler:
+ *
{@code doOnEmpty} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onEmpty + * the action to invoke when the source Observable calls {@code onCompleted}, contingent on no items were emitted + * @return the source Observable with the side-effecting behavior applied + * @see ReactiveX operators documentation: Do + */ + public final Observable doOnEmpty(final Action0 onEmpty) { + return lift(new OperatorDoOnEmpty(onEmpty)); + } /** * Modifies the source Observable so that it notifies an Observer for each item it emits. diff --git a/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java new file mode 100644 index 0000000000..24dbc34a15 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java @@ -0,0 +1,59 @@ +package rx.internal.operators; + +import rx.Observable; +import rx.Subscriber; +import rx.exceptions.Exceptions; +import rx.functions.Action0; + +public final class OperatorDoOnEmpty implements Observable.Operator { + + private final Action0 onEmpty; + + public OperatorDoOnEmpty(Action0 onEmpty) { + this.onEmpty = onEmpty; + } + + @Override + public Subscriber call(final Subscriber child) { + + return new Subscriber(child) { + + private boolean isEmpty = true; + private boolean done = false; + + @Override + public void onCompleted() { + if (done) { + return; + } + if (isEmpty) { + try { + onEmpty.call(); + } catch (Exception e) { + Exceptions.throwIfFatal(e); + } + } + child.onCompleted(); + done = true; + } + + @Override + public void onError(Throwable e) { + if (done) { + return; + } + child.onError(e); + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + isEmpty = false; + child.onNext(t); + } + }; + } + +} diff --git a/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java b/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java new file mode 100644 index 0000000000..472887c6b3 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java @@ -0,0 +1,148 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package rx.internal.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Subscription; +import rx.functions.Action0; +import rx.functions.Func0; +import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; + +public final class OperatorDoOnEmptyTest { + + @Test + public void testNonEmpty() { + Observable source = Observable.just("Chicago", "Houston", "Phoenix"); + + final AtomicBoolean wasCalled = new AtomicBoolean(false); + + source.doOnEmpty(new Action0() { + @Override + public void call() { + wasCalled.set(true); + } + }).subscribe(); + + assertTrue(!wasCalled.get()); + } + + @Test + public void testEmpty() { + Observable source = Observable.empty(); + + final AtomicBoolean wasCalled = new AtomicBoolean(false); + + source.doOnEmpty(new Action0() { + @Override + public void call() { + wasCalled.set(true); + } + }).subscribe(); + + assertTrue(wasCalled.get()); + } + + @Test + public void testUnsubscription() { + final AtomicBoolean wasCalled = new AtomicBoolean(false); + + PublishSubject source = PublishSubject.create(); + + Subscription subscription = source.doOnEmpty(new Action0() { + @Override + public void call() { + wasCalled.set(true); + } + }).take(3).subscribe(); + + assertTrue(source.hasObservers()); + + source.onNext(0); + source.onNext(1); + + assertTrue(source.hasObservers()); + + source.onNext(2); + + assertTrue(!source.hasObservers()); + + subscription.unsubscribe(); + + assertTrue(!wasCalled.get()); + } + + @Test + public void testBackPressure() { + + final AtomicBoolean wasCalled = new AtomicBoolean(false); + + Observable source = Observable.range(0,1000).doOnEmpty(new Action0() { + @Override + public void call() { + wasCalled.set(true); + } + }); + + TestSubscriber subscriber = new TestSubscriber(0); + + source.subscribe(subscriber); + + subscriber.requestMore(1); + + assertTrue(subscriber.getOnNextEvents().size() == 1); + assertTrue(subscriber.getOnCompletedEvents().isEmpty()); + assertTrue(subscriber.getOnErrorEvents().size() == 0); + assertTrue(!wasCalled.get()); + } + + @Test + public void subscriberStateTest() { + final AtomicInteger counter = new AtomicInteger(0); + + final AtomicInteger callCount = new AtomicInteger(0); + + Observable o = Observable.defer(new Func0>() { + @Override + public Observable call() { + return Observable.range(1, counter.getAndIncrement() % 2); + } + }).doOnEmpty(new Action0() { + @Override + public void call() { + callCount.incrementAndGet(); + } + }); + + o.subscribe(); + o.subscribe(); + o.subscribe(); + o.subscribe(); + o.subscribe(); + + assert(callCount.get() == 3); + } + +} From 42cce0626e7d178be3644829841fcd01ca69030d Mon Sep 17 00:00:00 2001 From: Thomas Nield Date: Wed, 20 Jan 2016 12:46:06 -0600 Subject: [PATCH 02/10] restore imports --- src/main/java/rx/Observable.java | 368 +++++++++++++++---------------- 1 file changed, 179 insertions(+), 189 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 2d8aacfabe..3c26471ac6 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -12,32 +12,21 @@ */ package rx; -import rx.annotations.Beta; -import rx.annotations.Experimental; -import rx.exceptions.Exceptions; -import rx.exceptions.OnErrorNotImplementedException; +import java.util.*; +import java.util.concurrent.*; + +import rx.annotations.*; +import rx.exceptions.*; import rx.functions.*; import rx.internal.operators.*; import rx.internal.producers.SingleProducer; -import rx.internal.util.RxRingBuffer; -import rx.internal.util.ScalarSynchronousObservable; -import rx.internal.util.UtilityFunctions; -import rx.observables.BlockingObservable; -import rx.observables.ConnectableObservable; -import rx.observables.GroupedObservable; +import rx.internal.util.*; +import rx.observables.*; import rx.observers.SafeSubscriber; -import rx.plugins.RxJavaObservableExecutionHook; -import rx.plugins.RxJavaPlugins; -import rx.schedulers.Schedulers; -import rx.schedulers.TimeInterval; -import rx.schedulers.Timestamped; +import rx.plugins.*; +import rx.schedulers.*; import rx.subscriptions.Subscriptions; -import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - /** * The Observable class that implements the Reactive Pattern. *

@@ -50,7 +39,7 @@ *

* For more information see the ReactiveX * documentation. - * + * * @param * the type of the items emitted by the Observable */ @@ -63,7 +52,7 @@ public class Observable { *

* Note: Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor, * unless you specifically have a need for inheritance. - * + * * @param f * {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called */ @@ -92,7 +81,7 @@ protected Observable(OnSubscribe f) { *

Scheduler:
*
{@code create} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the type of the items that this Observable emits * @param f @@ -123,8 +112,8 @@ public interface Operator extends Func1, Subscriber< /** * Passes all emitted values from this Observable to the provided conversion function to be collected and * returned as a single value. Note that it is legal for a conversion function to return an Observable - * (enabling chaining). - * + * (enabling chaining). + * * @param conversion a function that converts from this {@code Observable} to an {@code R} * @return an instance of R created by the provided conversion function * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) @@ -137,7 +126,7 @@ public void call(Subscriber subscriber) { subscriber.add(Observable.subscribe(subscriber, Observable.this)); }}); } - + /** * Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass * the values of the current Observable through the Operator function. @@ -155,7 +144,7 @@ public void call(Subscriber subscriber) { *
Scheduler:
*
{@code lift} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param operator the Operator that implements the Observable-operating function to be applied to the source * Observable * @return an Observable that is the result of applying the lifted Operator to the source Observable @@ -172,7 +161,7 @@ public void call(Subscriber o) { st.onStart(); onSubscribe.call(st); } catch (Throwable e) { - // localized capture of errors rather than it skipping all operators + // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); @@ -187,7 +176,7 @@ public void call(Subscriber o) { } }); } - + /** * Transform an Observable by applying a particular Transformer function to it. *

@@ -201,7 +190,7 @@ public void call(Subscriber o) { *

Scheduler:
*
{@code compose} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param transformer implements the function that transforms the source Observable * @return the source Observable, transformed by the transformer function * @see RxJava wiki: Implementing Your Own Operators @@ -267,7 +256,7 @@ public Single toSingle() { public Completable toCompletable() { return Completable.fromObservable(this); } - + /* ********************************************************************************************************* * Operators Below Here @@ -283,7 +272,7 @@ public Completable toCompletable() { *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param sources * an Iterable of Observable sources competing to react first * @return an Observable that emits the same sequence as whichever of the source Observables first @@ -303,7 +292,7 @@ public final static Observable amb(IterableScheduler: *
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * an Observable competing to react first * @param o2 @@ -325,7 +314,7 @@ public final static Observable amb(Observable o1, Observable *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * an Observable competing to react first * @param o2 @@ -349,7 +338,7 @@ public final static Observable amb(Observable o1, Observable *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * an Observable competing to react first * @param o2 @@ -403,7 +392,7 @@ public final static Observable amb(Observable o1, Observable *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * an Observable competing to react first * @param o2 @@ -499,7 +488,7 @@ public final static Observable amb(Observable o1, Observable *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * an Observable competing to react first * @param o2 @@ -562,7 +551,7 @@ public static final Observable combineLatest(ObservableScheduler: *
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -590,7 +579,7 @@ public static final Observable combineLatest(ObservableScheduler: *
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -621,7 +610,7 @@ public static final Observable combineLatest(ObservableScheduler: *
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -654,7 +643,7 @@ public static final Observable combineLatest(Observab *
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -689,7 +678,7 @@ public static final Observable combineLatest(Obse *
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -726,7 +715,7 @@ public static final Observable combineLatest( *
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -765,7 +754,7 @@ public static final Observable combineLat *
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -896,7 +885,7 @@ public final static Observable concat(Observable t1, Observa *
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be concatenated * @param t2 @@ -1012,7 +1001,7 @@ public final static Observable concat(Observable t1, Observa *
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be concatenated * @param t2 @@ -1046,7 +1035,7 @@ public final static Observable concat(Observable t1, Observa *
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be concatenated * @param t2 @@ -1087,7 +1076,7 @@ public final static Observable concat(Observable t1, Observa *
Scheduler:
*
{@code defer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param observableFactory * the Observable factory function to invoke for each {@link Observer} that subscribes to the * resulting Observable @@ -1110,7 +1099,7 @@ public void call(Subscriber subscriber) { } }); } - + /** * Returns an Observable that emits no items to the {@link Observer} and immediately invokes its * {@link Observer#onCompleted onCompleted} method. @@ -1141,7 +1130,7 @@ public final static Observable empty() { *
Scheduler:
*
{@code error} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param exception * the particular Throwable to pass to {@link Observer#onError onError} * @param @@ -1168,7 +1157,7 @@ public final static Observable error(Throwable exception) { *
Scheduler:
*
{@code from} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param future * the source {@link Future} * @param @@ -1195,7 +1184,7 @@ public final static Observable from(Future future) { *
Scheduler:
*
{@code from} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param future * the source {@link Future} * @param timeout @@ -1224,7 +1213,7 @@ public final static Observable from(Future future, long time *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param future * the source {@link Future} * @param scheduler @@ -1249,7 +1238,7 @@ public final static Observable from(Future future, Scheduler *
Scheduler:
*
{@code from} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param iterable * the source {@link Iterable} sequence * @param @@ -1270,7 +1259,7 @@ public final static Observable from(Iterable iterable) { *
Scheduler:
*
{@code from} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param array * the source Array * @param @@ -1324,7 +1313,7 @@ public static Observable fromCallable(Callable func) { *
Scheduler:
*
{@code interval} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param interval * interval size in time units (see below) * @param unit @@ -1345,7 +1334,7 @@ public final static Observable interval(long interval, TimeUnit unit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param interval * interval size in time units (see below) * @param unit @@ -1371,7 +1360,7 @@ public final static Observable interval(long interval, TimeUnit unit, Sche *
Scheduler:
*
{@code interval} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param initialDelay * the initial delay time to wait before emitting the first value of 0L * @param period @@ -1399,7 +1388,7 @@ public final static Observable interval(long initialDelay, long period, Ti *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param initialDelay * the initial delay time to wait before emitting the first value of 0L * @param period @@ -1433,7 +1422,7 @@ public final static Observable interval(long initialDelay, long period, Ti *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param value * the item to emit * @param @@ -1444,7 +1433,7 @@ public final static Observable interval(long initialDelay, long period, Ti public final static Observable just(final T value) { return ScalarSynchronousObservable.create(value); } - + /** * Converts two items into an Observable that emits those items. *

@@ -1453,7 +1442,7 @@ public final static Observable just(final T value) { *

Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1477,7 +1466,7 @@ public final static Observable just(T t1, T t2) { *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1503,7 +1492,7 @@ public final static Observable just(T t1, T t2, T t3) { *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1561,7 +1550,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5) { *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1593,7 +1582,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6) { *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1627,7 +1616,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1663,7 +1652,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1701,7 +1690,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1732,7 +1721,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) { return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9, t10 }); } - + /** * Flattens an Iterable of Observables into one Observable, without any transformation. *

@@ -1744,7 +1733,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T *

Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param sequences * the Iterable of Observables * @return an Observable that emits items that are the result of flattening the items emitted by the @@ -1767,7 +1756,7 @@ public final static Observable merge(IterableScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param sequences * the Iterable of Observables * @param maxConcurrent @@ -1822,7 +1811,7 @@ public final static Observable merge(ObservableScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param source * an Observable that emits Observables * @param maxConcurrent @@ -1853,7 +1842,7 @@ public final static Observable merge(ObservableScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -1877,7 +1866,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -1903,7 +1892,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -1931,7 +1920,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -1961,7 +1950,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -1993,7 +1982,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2027,7 +2016,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2063,7 +2052,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2101,7 +2090,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param sequences * the Array of Observables * @return an Observable that emits all of the items emitted by the Observables in the Array @@ -2110,7 +2099,7 @@ public final static Observable merge(Observable t1, Observab public final static Observable merge(Observable[] sequences) { return merge(from(sequences)); } - + /** * Flattens an Array of Observables into one Observable, without any transformation, while limiting the * number of concurrent subscriptions to these Observables. @@ -2123,7 +2112,7 @@ public final static Observable merge(Observable[] sequences) *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param sequences * the Array of Observables * @param maxConcurrent @@ -2153,7 +2142,7 @@ public final static Observable merge(Observable[] sequences, *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param source * an Observable that emits Observables * @return an Observable that emits all of the items emitted by the Observables emitted by the @@ -2182,7 +2171,7 @@ public final static Observable mergeDelayError(ObservableScheduler: *
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param source * an Observable that emits Observables * @param maxConcurrent @@ -2214,7 +2203,7 @@ public final static Observable mergeDelayError(ObservableScheduler: *
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2244,7 +2233,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2276,7 +2265,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2310,7 +2299,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2346,7 +2335,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2385,7 +2374,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2425,7 +2414,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2468,7 +2457,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2503,7 +2492,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code nest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits a single item: the source Observable * @see ReactiveX operators documentation: To */ @@ -2521,7 +2510,7 @@ public final Observable> nest() { *
Scheduler:
*
{@code never} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the type of items (not) emitted by the Observable * @return an Observable that never emits any items or sends any notifications to an {@link Observer} @@ -2539,7 +2528,7 @@ public final static Observable never() { *
Scheduler:
*
{@code range} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param start * the value of the first Integer in the sequence * @param count @@ -2575,7 +2564,7 @@ public final static Observable range(int start, int count) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param start * the value of the first Integer in the sequence * @param count @@ -2598,7 +2587,7 @@ public final static Observable range(int start, int count, Scheduler sc *
Scheduler:
*
{@code sequenceEqual} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param first * the first Observable to compare * @param second @@ -2630,7 +2619,7 @@ public final Boolean call(T first, T second) { *
Scheduler:
*
{@code sequenceEqual} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param first * the first Observable to compare * @param second @@ -2661,7 +2650,7 @@ public final static Observable sequenceEqual(ObservableScheduler: *
{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param the item type * @param sequenceOfSequences * the source Observable that emits Observables @@ -2685,7 +2674,7 @@ public final static Observable switchOnNext(ObservableScheduler: *
{@code timer} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param initialDelay * the initial delay time to wait before emitting the first value of 0L * @param period @@ -2714,7 +2703,7 @@ public final static Observable timer(long initialDelay, long period, TimeU *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param initialDelay * the initial delay time to wait before emitting the first value of 0L * @param period @@ -2744,7 +2733,7 @@ public final static Observable timer(long initialDelay, long period, TimeU *
Scheduler:
*
{@code timer} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param delay * the initial delay before emitting a single {@code 0L} * @param unit @@ -2768,7 +2757,7 @@ public final static Observable timer(long delay, TimeUnit unit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param delay * the initial delay before emitting a single 0L * @param unit @@ -2791,7 +2780,7 @@ public final static Observable timer(long delay, TimeUnit unit, Scheduler *
Scheduler:
*
{@code using} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param resourceFactory * the factory function to create a resource object that depends on the Observable * @param observableFactory @@ -2807,9 +2796,9 @@ public final static Observable using( final Action1 disposeAction) { return using(resourceFactory, observableFactory, disposeAction, false); } - + /** - * Constructs an Observable that creates a dependent resource object which is disposed of just before + * Constructs an Observable that creates a dependent resource object which is disposed of just before * termination if you have set {@code disposeEagerly} to {@code true} and unsubscription does not occur * before termination. Otherwise resource disposal will occur on unsubscription. Eager disposal is * particularly appropriate for a synchronous Observable that resuses resources. {@code disposeAction} will @@ -2820,7 +2809,7 @@ public final static Observable using( *
Scheduler:
*
{@code using} does not operate by default on a particular {@link Scheduler}.
* - * + * * @warn "Backpressure Support" section missing from javadoc * @param resourceFactory * the factory function to create a resource object that depends on the Observable @@ -2829,7 +2818,7 @@ public final static Observable using( * @param disposeAction * the function that will dispose of the resource * @param disposeEagerly - * if {@code true} then disposal will happen either on unsubscription or just before emission of + * if {@code true} then disposal will happen either on unsubscription or just before emission of * a terminal event ({@code onComplete} or {@code onError}). * @return the Observable whose lifetime controls the lifetime of the dependent resource object * @see ReactiveX operators documentation: Using @@ -2861,7 +2850,7 @@ public final static Observable using( *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param ws * an Iterable of source Observables * @param zipFunction @@ -2895,7 +2884,7 @@ public final static Observable zip(Iterable> ws, *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param ws * an Observable of source Observables * @param zipFunction @@ -2933,7 +2922,7 @@ public Observable[] call(List> o) { *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -2967,7 +2956,7 @@ public final static Observable zip(Observable o1, O *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3003,7 +2992,7 @@ public final static Observable zip(Observable o *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3041,7 +3030,7 @@ public final static Observable zip(ObservableScheduler: *
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3080,7 +3069,7 @@ public final static Observable zip(ObservableScheduler: *
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3122,7 +3111,7 @@ public final static Observable zip(ObservableScheduler: *
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3166,7 +3155,7 @@ public final static Observable zip(Observable *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3212,7 +3201,7 @@ public final static Observable zip(Observ *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3251,7 +3240,7 @@ public final static Observable zip(Ob *
Scheduler:
*
{@code all} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param predicate * a function that evaluates an item and returns a Boolean * @return an Observable that emits {@code true} if all items emitted by the source Observable satisfy the @@ -3261,7 +3250,7 @@ public final static Observable zip(Ob public final Observable all(Func1 predicate) { return lift(new OperatorAll(predicate)); } - + /** * Mirrors the Observable (current or provided) that first either emits an item or sends a termination * notification. @@ -3271,7 +3260,7 @@ public final Observable all(Func1 predicate) { *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable competing to react first * @return an Observable that emits the same sequence as whichever of the source Observables first @@ -3290,7 +3279,7 @@ public final Observable ambWith(Observable t1) { *
Scheduler:
*
{@code asObservable} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that hides the identity of this Observable */ public final Observable asObservable() { @@ -3310,7 +3299,7 @@ public final Observable asObservable() { *
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param bufferClosingSelector * a {@link Func0} that produces an Observable that governs the boundary between buffers. * Whenever this {@code Observable} emits an item, {@code buffer} emits the current buffer and @@ -3334,7 +3323,7 @@ public final Observable> buffer(Func0Scheduler: *
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param count * the maximum number of items in each buffer before it should be emitted * @return an Observable that emits connected, non-overlapping buffers, each containing at most @@ -3356,7 +3345,7 @@ public final Observable> buffer(int count) { *
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param count * the maximum size of each buffer before it should be emitted * @param skip @@ -3386,7 +3375,7 @@ public final Observable> buffer(int count, int skip) { *
Scheduler:
*
This version of {@code buffer} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted * @param timeshift @@ -3416,7 +3405,7 @@ public final Observable> buffer(long timespan, long timeshift, TimeUnit *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted * @param timeshift @@ -3447,7 +3436,7 @@ public final Observable> buffer(long timespan, long timeshift, TimeUnit *
Scheduler:
*
This version of {@code buffer} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted and replaced with a new * buffer @@ -3476,7 +3465,7 @@ public final Observable> buffer(long timespan, TimeUnit unit) { *
Scheduler:
*
This version of {@code buffer} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted and replaced with a new * buffer @@ -3509,7 +3498,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count) *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted and replaced with a new * buffer @@ -3543,7 +3532,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count, *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted and replaced with a new * buffer @@ -3572,7 +3561,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, Scheduler *
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param bufferOpenings * the Observable that, when it emits an item, causes a new buffer to be created * @param bufferClosingSelector @@ -3602,7 +3591,7 @@ public final Observable> buffer(ObservableScheduler: *
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the boundary value type (ignored) * @param boundary @@ -3632,7 +3621,7 @@ public final Observable> buffer(Observable boundary) { *
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the boundary value type (ignored) * @param boundary @@ -3673,7 +3662,7 @@ public final Observable> buffer(Observable boundary, int initialC *
Scheduler:
*
{@code cache} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that, when first subscribed to, caches all of its items and notifications for the * benefit of subsequent subscribers * @see ReactiveX operators documentation: Replay @@ -3719,7 +3708,7 @@ public final Observable cache(int initialCapacity) { *

* Note: The capacity hint is not an upper bound on cache size. For that, consider * {@link #replay(int)} in combination with {@link ConnectableObservable#autoConnect()} or similar. - * + * * @param initialCapacity hint for number of items to cache (for optimizing underlying data structure) * @return an Observable that, when first subscribed to, caches all of its items and notifications for the * benefit of subsequent subscribers @@ -3738,7 +3727,7 @@ public final Observable cacheWithInitialCapacity(int initialCapacity) { *

Scheduler:
*
{@code cast} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param klass * the target class type that {@code cast} will cast the items emitted by the source Observable * into before emitting them from the resulting Observable @@ -3764,7 +3753,7 @@ public final Observable cast(final Class klass) { *
Scheduler:
*
{@code collect} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param stateFactory * the mutable data structure that will collect the items * @param collector @@ -3784,11 +3773,11 @@ public final R call(R state, T value) { } }; - + /* * Discussion and confirmation of implementation at * https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642532 - * + * * It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty. */ return lift(new OperatorScan(stateFactory, accumulator)).last(); @@ -3804,7 +3793,7 @@ public final R call(R state, T value) { *
Scheduler:
*
{@code concatMap} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param func * a function that, when applied to an item emitted by the source Observable, returns an * Observable @@ -3815,7 +3804,7 @@ public final R call(R state, T value) { public final Observable concatMap(Func1> func) { return concat(map(func)); } - + /** * Returns an Observable that emits the items emitted from the current Observable, then the next, one after * the other, without interleaving them. @@ -3825,7 +3814,7 @@ public final Observable concatMap(Func1Scheduler: *
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be concatenated after the current * @return an Observable that emits items emitted by the two source Observables, one after the other, @@ -3845,7 +3834,7 @@ public final Observable concatWith(Observable t1) { *
Scheduler:
*
{@code contains} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param element * the item to search for in the emissions from the source Observable * @return an Observable that emits {@code true} if the specified item is emitted by the source Observable, @@ -3872,7 +3861,7 @@ public final Boolean call(T t1) { *
Scheduler:
*
{@code count} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits a single item: the number of elements emitted by the source Observable * @see ReactiveX operators documentation: Count * @see #countLong() @@ -3880,7 +3869,7 @@ public final Boolean call(T t1) { public final Observable count() { return reduce(0, CountHolder.INSTANCE); } - + private static final class CountHolder { static final Func2 INSTANCE = new Func2() { @Override @@ -3889,7 +3878,7 @@ public final Integer call(Integer count, Object o) { } }; } - + /** * Returns an Observable that counts the total number of items emitted by the source Observable and emits * this count as a 64-bit Long. @@ -3902,7 +3891,7 @@ public final Integer call(Integer count, Object o) { *
Scheduler:
*
{@code countLong} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits a single item: the number of items emitted by the source Observable as a * 64-bit Long item * @see ReactiveX operators documentation: Count @@ -3920,7 +3909,7 @@ public final Long call(Long count, Object o) { } }; } - + /** * Returns an Observable that mirrors the source Observable, except that it drops items emitted by the * source Observable that are followed by another item within a computed debounce duration. @@ -3933,7 +3922,7 @@ public final Long call(Long count, Object o) { *
Scheduler:
*
This version of {@code debounce} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the debounce value type (ignored) * @param debounceSelector @@ -3970,7 +3959,7 @@ public final Observable debounce(Func1 *
Scheduler:
*
This version of {@code debounce} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param timeout * the time each item has to be "the most recent" of those emitted by the source Observable to * ensure that it's not dropped @@ -4009,7 +3998,7 @@ public final Observable debounce(long timeout, TimeUnit unit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param timeout * the time each item has to be "the most recent" of those emitted by the source Observable to * ensure that it's not dropped @@ -4037,7 +4026,7 @@ public final Observable debounce(long timeout, TimeUnit unit, Scheduler sched *
Scheduler:
*
{@code defaultIfEmpty} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param defaultValue * the item to emit if the source Observable emits no items * @return an Observable that emits either the specified default item if the source Observable emits no @@ -4085,7 +4074,7 @@ public final Observable switchIfEmpty(Observable alternate) { *
Scheduler:
*
This version of {@code delay} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the subscription delay value type (ignored) * @param @@ -4119,7 +4108,7 @@ public final Observable delay( *
Scheduler:
*
This version of {@code delay} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the item delay value type (ignored) * @param itemDelay @@ -4143,7 +4132,7 @@ public final Observable delay(Func1> i *
Scheduler:
*
This version of {@code delay} operates by default on the {@code compuation} {@link Scheduler}.
* - * + * * @param delay * the delay to shift the source by * @param unit @@ -4164,7 +4153,7 @@ public final Observable delay(long delay, TimeUnit unit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param delay * the delay to shift the source by * @param unit @@ -4186,7 +4175,7 @@ public final Observable delay(long delay, TimeUnit unit, Scheduler scheduler) *
Scheduler:
*
This version of {@code delay} operates by default on the {@code compuation} {@link Scheduler}.
* - * + * * @param delay * the time to delay the subscription * @param unit @@ -4207,7 +4196,7 @@ public final Observable delaySubscription(long delay, TimeUnit unit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param delay * the time to delay the subscription * @param unit @@ -4221,7 +4210,7 @@ public final Observable delaySubscription(long delay, TimeUnit unit) { public final Observable delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) { return create(new OnSubscribeDelaySubscription(this, delay, unit, scheduler)); } - + /** * Returns an Observable that delays the subscription to the source Observable until a second Observable * emits an item. @@ -4231,7 +4220,7 @@ public final Observable delaySubscription(long delay, TimeUnit unit, Schedule *
Scheduler:
*
This method does not operate by default on a particular {@link Scheduler}.
* - * + * * @param subscriptionDelay * a function that returns an Observable that triggers the subscription to the source Observable * once it emits any item @@ -4254,7 +4243,7 @@ public final Observable delaySubscription(Func0> *
Scheduler:
*
This method does not operate by default on a particular {@link Scheduler}.
* - * + * * @param the value type of the other Observable, irrelevant * @param other the other Observable that should trigger the subscription * to this Observable. @@ -4268,7 +4257,7 @@ public final Observable delaySubscription(Observable other) { } return create(new OnSubscribeDelaySubscriptionOther(this, other)); } - + /** * Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the * {@link Notification} objects emitted by the source Observable into the items or notifications they @@ -4279,7 +4268,7 @@ public final Observable delaySubscription(Observable other) { *
Scheduler:
*
{@code dematerialize} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits the items and notifications embedded in the {@link Notification} objects * emitted by the source Observable * @throws OnErrorNotImplementedException @@ -4299,7 +4288,7 @@ public final Observable dematerialize() { *
Scheduler:
*
{@code distinct} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits only those items emitted by the source Observable that are distinct from * each other * @see ReactiveX operators documentation: Distinct @@ -4317,7 +4306,7 @@ public final Observable distinct() { *
Scheduler:
*
{@code distinct} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param keySelector * a function that projects an emitted item to a key value that is used to decide whether an item * is distinct from another one or not @@ -4337,7 +4326,7 @@ public final Observable distinct(Func1 keySelecto *
Scheduler:
*
{@code distinctUntilChanged} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits those items from the source Observable that are distinct from their * immediate predecessors * @see ReactiveX operators documentation: Distinct @@ -4355,7 +4344,7 @@ public final Observable distinctUntilChanged() { *
Scheduler:
*
{@code distinctUntilChanged} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param keySelector * a function that projects an emitted item to a key value that is used to decide whether an item * is distinct from another one or not @@ -4375,7 +4364,7 @@ public final Observable distinctUntilChanged(Func1Scheduler: *
{@code doOnCompleted} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param onCompleted * the action to invoke when the source Observable calls {@code onCompleted} * @return the source Observable with the side-effecting behavior applied @@ -4400,7 +4389,24 @@ public final void onNext(T args) { return lift(new OperatorDoOnEach(observer)); } - + + /** + * Modifies the source Observable so that it invokes an action when it calls {@code onCompleted} and no items were emitted. + *

+ *

+ *
Scheduler:
+ *
{@code doOnEmpty} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onEmpty + * the action to invoke when the source Observable calls {@code onCompleted}, contingent on no items were emitted + * @return the source Observable with the side-effecting behavior applied + * @see ReactiveX operators documentation: Do + */ + public final Observable doOnEmpty(final Action0 onEmpty) { + return lift(new OperatorDoOnEmpty(onEmpty)); + } + /** * Modifies the source Observable so that it invokes an action for each item it emits. *

@@ -4409,7 +4415,7 @@ public final void onNext(T args) { *

Scheduler:
*
{@code doOnEach} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param onNotification * the action to invoke for each item emitted by the source Observable * @return the source Observable with the side-effecting behavior applied @@ -4436,22 +4442,6 @@ public final void onNext(T v) { return lift(new OperatorDoOnEach(observer)); } - /** - * Modifies the source Observable so that it invokes an action when it calls {@code onCompleted} and no items were emitted. - *

- *

- *
Scheduler:
- *
{@code doOnEmpty} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param onEmpty - * the action to invoke when the source Observable calls {@code onCompleted}, contingent on no items were emitted - * @return the source Observable with the side-effecting behavior applied - * @see ReactiveX operators documentation: Do - */ - public final Observable doOnEmpty(final Action0 onEmpty) { - return lift(new OperatorDoOnEmpty(onEmpty)); - } /** * Modifies the source Observable so that it notifies an Observer for each item it emits. From bc5df5045c0978a45a05a366bb488a2198b11f7d Mon Sep 17 00:00:00 2001 From: Thomas Nield Date: Wed, 20 Jan 2016 13:22:37 -0600 Subject: [PATCH 03/10] remove IDEA formatting --- src/main/java/rx/Observable.java | 323 +++++++++++++++---------------- 1 file changed, 153 insertions(+), 170 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 3c26471ac6..9287e105de 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -39,7 +39,7 @@ *

* For more information see the ReactiveX * documentation. - * + * * @param * the type of the items emitted by the Observable */ @@ -52,7 +52,7 @@ public class Observable { *

* Note: Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor, * unless you specifically have a need for inheritance. - * + * * @param f * {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called */ @@ -81,7 +81,7 @@ protected Observable(OnSubscribe f) { *

Scheduler:
*
{@code create} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the type of the items that this Observable emits * @param f @@ -112,8 +112,8 @@ public interface Operator extends Func1, Subscriber< /** * Passes all emitted values from this Observable to the provided conversion function to be collected and * returned as a single value. Note that it is legal for a conversion function to return an Observable - * (enabling chaining). - * + * (enabling chaining). + * * @param conversion a function that converts from this {@code Observable} to an {@code R} * @return an instance of R created by the provided conversion function * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) @@ -126,7 +126,7 @@ public void call(Subscriber subscriber) { subscriber.add(Observable.subscribe(subscriber, Observable.this)); }}); } - + /** * Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass * the values of the current Observable through the Operator function. @@ -144,7 +144,7 @@ public void call(Subscriber subscriber) { *
Scheduler:
*
{@code lift} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param operator the Operator that implements the Observable-operating function to be applied to the source * Observable * @return an Observable that is the result of applying the lifted Operator to the source Observable @@ -161,7 +161,7 @@ public void call(Subscriber o) { st.onStart(); onSubscribe.call(st); } catch (Throwable e) { - // localized capture of errors rather than it skipping all operators + // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); @@ -176,7 +176,7 @@ public void call(Subscriber o) { } }); } - + /** * Transform an Observable by applying a particular Transformer function to it. *

@@ -190,7 +190,7 @@ public void call(Subscriber o) { *

Scheduler:
*
{@code compose} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param transformer implements the function that transforms the source Observable * @return the source Observable, transformed by the transformer function * @see RxJava wiki: Implementing Your Own Operators @@ -256,7 +256,7 @@ public Single toSingle() { public Completable toCompletable() { return Completable.fromObservable(this); } - + /* ********************************************************************************************************* * Operators Below Here @@ -272,7 +272,7 @@ public Completable toCompletable() { *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param sources * an Iterable of Observable sources competing to react first * @return an Observable that emits the same sequence as whichever of the source Observables first @@ -292,7 +292,7 @@ public final static Observable amb(IterableScheduler: *
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * an Observable competing to react first * @param o2 @@ -314,7 +314,7 @@ public final static Observable amb(Observable o1, Observable *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * an Observable competing to react first * @param o2 @@ -338,7 +338,7 @@ public final static Observable amb(Observable o1, Observable *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * an Observable competing to react first * @param o2 @@ -392,7 +392,7 @@ public final static Observable amb(Observable o1, Observable *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * an Observable competing to react first * @param o2 @@ -488,7 +488,7 @@ public final static Observable amb(Observable o1, Observable *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * an Observable competing to react first * @param o2 @@ -551,7 +551,7 @@ public static final Observable combineLatest(ObservableScheduler: *
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -579,7 +579,7 @@ public static final Observable combineLatest(ObservableScheduler: *
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -610,7 +610,7 @@ public static final Observable combineLatest(ObservableScheduler: *
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -643,7 +643,7 @@ public static final Observable combineLatest(Observab *
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -678,7 +678,7 @@ public static final Observable combineLatest(Obse *
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -715,7 +715,7 @@ public static final Observable combineLatest( *
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -754,7 +754,7 @@ public static final Observable combineLat *
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -885,7 +885,7 @@ public final static Observable concat(Observable t1, Observa *
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be concatenated * @param t2 @@ -1001,7 +1001,7 @@ public final static Observable concat(Observable t1, Observa *
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be concatenated * @param t2 @@ -1035,7 +1035,7 @@ public final static Observable concat(Observable t1, Observa *
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be concatenated * @param t2 @@ -1076,7 +1076,7 @@ public final static Observable concat(Observable t1, Observa *
Scheduler:
*
{@code defer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param observableFactory * the Observable factory function to invoke for each {@link Observer} that subscribes to the * resulting Observable @@ -1099,7 +1099,7 @@ public void call(Subscriber subscriber) { } }); } - + /** * Returns an Observable that emits no items to the {@link Observer} and immediately invokes its * {@link Observer#onCompleted onCompleted} method. @@ -1130,7 +1130,7 @@ public final static Observable empty() { *
Scheduler:
*
{@code error} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param exception * the particular Throwable to pass to {@link Observer#onError onError} * @param @@ -1157,7 +1157,7 @@ public final static Observable error(Throwable exception) { *
Scheduler:
*
{@code from} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param future * the source {@link Future} * @param @@ -1184,7 +1184,7 @@ public final static Observable from(Future future) { *
Scheduler:
*
{@code from} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param future * the source {@link Future} * @param timeout @@ -1213,7 +1213,7 @@ public final static Observable from(Future future, long time *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param future * the source {@link Future} * @param scheduler @@ -1238,7 +1238,7 @@ public final static Observable from(Future future, Scheduler *
Scheduler:
*
{@code from} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param iterable * the source {@link Iterable} sequence * @param @@ -1259,7 +1259,7 @@ public final static Observable from(Iterable iterable) { *
Scheduler:
*
{@code from} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param array * the source Array * @param @@ -1313,7 +1313,7 @@ public static Observable fromCallable(Callable func) { *
Scheduler:
*
{@code interval} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param interval * interval size in time units (see below) * @param unit @@ -1334,7 +1334,7 @@ public final static Observable interval(long interval, TimeUnit unit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param interval * interval size in time units (see below) * @param unit @@ -1360,7 +1360,7 @@ public final static Observable interval(long interval, TimeUnit unit, Sche *
Scheduler:
*
{@code interval} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param initialDelay * the initial delay time to wait before emitting the first value of 0L * @param period @@ -1388,7 +1388,7 @@ public final static Observable interval(long initialDelay, long period, Ti *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param initialDelay * the initial delay time to wait before emitting the first value of 0L * @param period @@ -1422,7 +1422,7 @@ public final static Observable interval(long initialDelay, long period, Ti *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param value * the item to emit * @param @@ -1433,7 +1433,7 @@ public final static Observable interval(long initialDelay, long period, Ti public final static Observable just(final T value) { return ScalarSynchronousObservable.create(value); } - + /** * Converts two items into an Observable that emits those items. *

@@ -1442,7 +1442,7 @@ public final static Observable just(final T value) { *

Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1466,7 +1466,7 @@ public final static Observable just(T t1, T t2) { *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1492,7 +1492,7 @@ public final static Observable just(T t1, T t2, T t3) { *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1550,7 +1550,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5) { *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1582,7 +1582,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6) { *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1616,7 +1616,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1652,7 +1652,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1690,7 +1690,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * first item * @param t2 @@ -1721,7 +1721,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) { return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9, t10 }); } - + /** * Flattens an Iterable of Observables into one Observable, without any transformation. *

@@ -1733,7 +1733,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T *

Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param sequences * the Iterable of Observables * @return an Observable that emits items that are the result of flattening the items emitted by the @@ -1756,7 +1756,7 @@ public final static Observable merge(IterableScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param sequences * the Iterable of Observables * @param maxConcurrent @@ -1811,7 +1811,7 @@ public final static Observable merge(ObservableScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param source * an Observable that emits Observables * @param maxConcurrent @@ -1842,7 +1842,7 @@ public final static Observable merge(ObservableScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -1866,7 +1866,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -1892,7 +1892,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -1920,7 +1920,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -1950,7 +1950,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -1982,7 +1982,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2016,7 +2016,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2052,7 +2052,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2090,7 +2090,7 @@ public final static Observable merge(Observable t1, Observab *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param sequences * the Array of Observables * @return an Observable that emits all of the items emitted by the Observables in the Array @@ -2099,7 +2099,7 @@ public final static Observable merge(Observable t1, Observab public final static Observable merge(Observable[] sequences) { return merge(from(sequences)); } - + /** * Flattens an Array of Observables into one Observable, without any transformation, while limiting the * number of concurrent subscriptions to these Observables. @@ -2112,7 +2112,7 @@ public final static Observable merge(Observable[] sequences) *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param sequences * the Array of Observables * @param maxConcurrent @@ -2142,7 +2142,7 @@ public final static Observable merge(Observable[] sequences, *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param source * an Observable that emits Observables * @return an Observable that emits all of the items emitted by the Observables emitted by the @@ -2171,7 +2171,7 @@ public final static Observable mergeDelayError(ObservableScheduler: *
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param source * an Observable that emits Observables * @param maxConcurrent @@ -2203,7 +2203,7 @@ public final static Observable mergeDelayError(ObservableScheduler: *
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2233,7 +2233,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2265,7 +2265,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2299,7 +2299,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2335,7 +2335,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2374,7 +2374,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2414,7 +2414,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2457,7 +2457,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be merged * @param t2 @@ -2492,7 +2492,7 @@ public final static Observable mergeDelayError(Observable t1 *
Scheduler:
*
{@code nest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits a single item: the source Observable * @see ReactiveX operators documentation: To */ @@ -2510,7 +2510,7 @@ public final Observable> nest() { *
Scheduler:
*
{@code never} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the type of items (not) emitted by the Observable * @return an Observable that never emits any items or sends any notifications to an {@link Observer} @@ -2528,7 +2528,7 @@ public final static Observable never() { *
Scheduler:
*
{@code range} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param start * the value of the first Integer in the sequence * @param count @@ -2564,7 +2564,7 @@ public final static Observable range(int start, int count) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param start * the value of the first Integer in the sequence * @param count @@ -2587,7 +2587,7 @@ public final static Observable range(int start, int count, Scheduler sc *
Scheduler:
*
{@code sequenceEqual} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param first * the first Observable to compare * @param second @@ -2619,7 +2619,7 @@ public final Boolean call(T first, T second) { *
Scheduler:
*
{@code sequenceEqual} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param first * the first Observable to compare * @param second @@ -2650,7 +2650,7 @@ public final static Observable sequenceEqual(ObservableScheduler: *
{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param the item type * @param sequenceOfSequences * the source Observable that emits Observables @@ -2674,7 +2674,7 @@ public final static Observable switchOnNext(ObservableScheduler: *
{@code timer} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param initialDelay * the initial delay time to wait before emitting the first value of 0L * @param period @@ -2703,7 +2703,7 @@ public final static Observable timer(long initialDelay, long period, TimeU *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param initialDelay * the initial delay time to wait before emitting the first value of 0L * @param period @@ -2733,7 +2733,7 @@ public final static Observable timer(long initialDelay, long period, TimeU *
Scheduler:
*
{@code timer} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param delay * the initial delay before emitting a single {@code 0L} * @param unit @@ -2757,7 +2757,7 @@ public final static Observable timer(long delay, TimeUnit unit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param delay * the initial delay before emitting a single 0L * @param unit @@ -2780,7 +2780,7 @@ public final static Observable timer(long delay, TimeUnit unit, Scheduler *
Scheduler:
*
{@code using} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param resourceFactory * the factory function to create a resource object that depends on the Observable * @param observableFactory @@ -2796,9 +2796,9 @@ public final static Observable using( final Action1 disposeAction) { return using(resourceFactory, observableFactory, disposeAction, false); } - + /** - * Constructs an Observable that creates a dependent resource object which is disposed of just before + * Constructs an Observable that creates a dependent resource object which is disposed of just before * termination if you have set {@code disposeEagerly} to {@code true} and unsubscription does not occur * before termination. Otherwise resource disposal will occur on unsubscription. Eager disposal is * particularly appropriate for a synchronous Observable that resuses resources. {@code disposeAction} will @@ -2809,7 +2809,7 @@ public final static Observable using( *
Scheduler:
*
{@code using} does not operate by default on a particular {@link Scheduler}.
* - * + * * @warn "Backpressure Support" section missing from javadoc * @param resourceFactory * the factory function to create a resource object that depends on the Observable @@ -2818,7 +2818,7 @@ public final static Observable using( * @param disposeAction * the function that will dispose of the resource * @param disposeEagerly - * if {@code true} then disposal will happen either on unsubscription or just before emission of + * if {@code true} then disposal will happen either on unsubscription or just before emission of * a terminal event ({@code onComplete} or {@code onError}). * @return the Observable whose lifetime controls the lifetime of the dependent resource object * @see ReactiveX operators documentation: Using @@ -2850,7 +2850,7 @@ public final static Observable using( *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param ws * an Iterable of source Observables * @param zipFunction @@ -2884,7 +2884,7 @@ public final static Observable zip(Iterable> ws, *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param ws * an Observable of source Observables * @param zipFunction @@ -2922,7 +2922,7 @@ public Observable[] call(List> o) { *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -2956,7 +2956,7 @@ public final static Observable zip(Observable o1, O *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -2992,7 +2992,7 @@ public final static Observable zip(Observable o *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3030,7 +3030,7 @@ public final static Observable zip(ObservableScheduler: *
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3069,7 +3069,7 @@ public final static Observable zip(ObservableScheduler: *
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3111,7 +3111,7 @@ public final static Observable zip(ObservableScheduler: *
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3155,7 +3155,7 @@ public final static Observable zip(Observable *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3201,7 +3201,7 @@ public final static Observable zip(Observ *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param o1 * the first source Observable * @param o2 @@ -3240,7 +3240,7 @@ public final static Observable zip(Ob *
Scheduler:
*
{@code all} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param predicate * a function that evaluates an item and returns a Boolean * @return an Observable that emits {@code true} if all items emitted by the source Observable satisfy the @@ -3250,7 +3250,7 @@ public final static Observable zip(Ob public final Observable all(Func1 predicate) { return lift(new OperatorAll(predicate)); } - + /** * Mirrors the Observable (current or provided) that first either emits an item or sends a termination * notification. @@ -3260,7 +3260,7 @@ public final Observable all(Func1 predicate) { *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable competing to react first * @return an Observable that emits the same sequence as whichever of the source Observables first @@ -3279,7 +3279,7 @@ public final Observable ambWith(Observable t1) { *
Scheduler:
*
{@code asObservable} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that hides the identity of this Observable */ public final Observable asObservable() { @@ -3299,7 +3299,7 @@ public final Observable asObservable() { *
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param bufferClosingSelector * a {@link Func0} that produces an Observable that governs the boundary between buffers. * Whenever this {@code Observable} emits an item, {@code buffer} emits the current buffer and @@ -3323,7 +3323,7 @@ public final Observable> buffer(Func0Scheduler: *
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param count * the maximum number of items in each buffer before it should be emitted * @return an Observable that emits connected, non-overlapping buffers, each containing at most @@ -3345,7 +3345,7 @@ public final Observable> buffer(int count) { *
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param count * the maximum size of each buffer before it should be emitted * @param skip @@ -3375,7 +3375,7 @@ public final Observable> buffer(int count, int skip) { *
Scheduler:
*
This version of {@code buffer} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted * @param timeshift @@ -3405,7 +3405,7 @@ public final Observable> buffer(long timespan, long timeshift, TimeUnit *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted * @param timeshift @@ -3436,7 +3436,7 @@ public final Observable> buffer(long timespan, long timeshift, TimeUnit *
Scheduler:
*
This version of {@code buffer} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted and replaced with a new * buffer @@ -3465,7 +3465,7 @@ public final Observable> buffer(long timespan, TimeUnit unit) { *
Scheduler:
*
This version of {@code buffer} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted and replaced with a new * buffer @@ -3498,7 +3498,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count) *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted and replaced with a new * buffer @@ -3532,7 +3532,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count, *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param timespan * the period of time each buffer collects items before it is emitted and replaced with a new * buffer @@ -3561,7 +3561,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, Scheduler *
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param bufferOpenings * the Observable that, when it emits an item, causes a new buffer to be created * @param bufferClosingSelector @@ -3591,7 +3591,7 @@ public final Observable> buffer(ObservableScheduler: *
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the boundary value type (ignored) * @param boundary @@ -3621,7 +3621,7 @@ public final Observable> buffer(Observable boundary) { *
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the boundary value type (ignored) * @param boundary @@ -3662,7 +3662,7 @@ public final Observable> buffer(Observable boundary, int initialC *
Scheduler:
*
{@code cache} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that, when first subscribed to, caches all of its items and notifications for the * benefit of subsequent subscribers * @see ReactiveX operators documentation: Replay @@ -3708,7 +3708,7 @@ public final Observable cache(int initialCapacity) { *

* Note: The capacity hint is not an upper bound on cache size. For that, consider * {@link #replay(int)} in combination with {@link ConnectableObservable#autoConnect()} or similar. - * + * * @param initialCapacity hint for number of items to cache (for optimizing underlying data structure) * @return an Observable that, when first subscribed to, caches all of its items and notifications for the * benefit of subsequent subscribers @@ -3727,7 +3727,7 @@ public final Observable cacheWithInitialCapacity(int initialCapacity) { *

Scheduler:
*
{@code cast} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param klass * the target class type that {@code cast} will cast the items emitted by the source Observable * into before emitting them from the resulting Observable @@ -3753,7 +3753,7 @@ public final Observable cast(final Class klass) { *
Scheduler:
*
{@code collect} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param stateFactory * the mutable data structure that will collect the items * @param collector @@ -3773,11 +3773,11 @@ public final R call(R state, T value) { } }; - + /* * Discussion and confirmation of implementation at * https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642532 - * + * * It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty. */ return lift(new OperatorScan(stateFactory, accumulator)).last(); @@ -3793,7 +3793,7 @@ public final R call(R state, T value) { *
Scheduler:
*
{@code concatMap} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param func * a function that, when applied to an item emitted by the source Observable, returns an * Observable @@ -3804,7 +3804,7 @@ public final R call(R state, T value) { public final Observable concatMap(Func1> func) { return concat(map(func)); } - + /** * Returns an Observable that emits the items emitted from the current Observable, then the next, one after * the other, without interleaving them. @@ -3814,7 +3814,7 @@ public final Observable concatMap(Func1Scheduler: *
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * an Observable to be concatenated after the current * @return an Observable that emits items emitted by the two source Observables, one after the other, @@ -3834,7 +3834,7 @@ public final Observable concatWith(Observable t1) { *
Scheduler:
*
{@code contains} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param element * the item to search for in the emissions from the source Observable * @return an Observable that emits {@code true} if the specified item is emitted by the source Observable, @@ -3861,7 +3861,7 @@ public final Boolean call(T t1) { *
Scheduler:
*
{@code count} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits a single item: the number of elements emitted by the source Observable * @see ReactiveX operators documentation: Count * @see #countLong() @@ -3869,7 +3869,7 @@ public final Boolean call(T t1) { public final Observable count() { return reduce(0, CountHolder.INSTANCE); } - + private static final class CountHolder { static final Func2 INSTANCE = new Func2() { @Override @@ -3878,7 +3878,7 @@ public final Integer call(Integer count, Object o) { } }; } - + /** * Returns an Observable that counts the total number of items emitted by the source Observable and emits * this count as a 64-bit Long. @@ -3891,7 +3891,7 @@ public final Integer call(Integer count, Object o) { *
Scheduler:
*
{@code countLong} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits a single item: the number of items emitted by the source Observable as a * 64-bit Long item * @see ReactiveX operators documentation: Count @@ -3909,7 +3909,7 @@ public final Long call(Long count, Object o) { } }; } - + /** * Returns an Observable that mirrors the source Observable, except that it drops items emitted by the * source Observable that are followed by another item within a computed debounce duration. @@ -3922,7 +3922,7 @@ public final Long call(Long count, Object o) { *
Scheduler:
*
This version of {@code debounce} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the debounce value type (ignored) * @param debounceSelector @@ -3959,7 +3959,7 @@ public final Observable debounce(Func1 *
Scheduler:
*
This version of {@code debounce} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param timeout * the time each item has to be "the most recent" of those emitted by the source Observable to * ensure that it's not dropped @@ -3998,7 +3998,7 @@ public final Observable debounce(long timeout, TimeUnit unit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param timeout * the time each item has to be "the most recent" of those emitted by the source Observable to * ensure that it's not dropped @@ -4026,7 +4026,7 @@ public final Observable debounce(long timeout, TimeUnit unit, Scheduler sched *
Scheduler:
*
{@code defaultIfEmpty} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param defaultValue * the item to emit if the source Observable emits no items * @return an Observable that emits either the specified default item if the source Observable emits no @@ -4074,7 +4074,7 @@ public final Observable switchIfEmpty(Observable alternate) { *
Scheduler:
*
This version of {@code delay} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the subscription delay value type (ignored) * @param @@ -4108,7 +4108,7 @@ public final Observable delay( *
Scheduler:
*
This version of {@code delay} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the item delay value type (ignored) * @param itemDelay @@ -4132,7 +4132,7 @@ public final Observable delay(Func1> i *
Scheduler:
*
This version of {@code delay} operates by default on the {@code compuation} {@link Scheduler}.
* - * + * * @param delay * the delay to shift the source by * @param unit @@ -4153,7 +4153,7 @@ public final Observable delay(long delay, TimeUnit unit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param delay * the delay to shift the source by * @param unit @@ -4175,7 +4175,7 @@ public final Observable delay(long delay, TimeUnit unit, Scheduler scheduler) *
Scheduler:
*
This version of {@code delay} operates by default on the {@code compuation} {@link Scheduler}.
* - * + * * @param delay * the time to delay the subscription * @param unit @@ -4196,7 +4196,7 @@ public final Observable delaySubscription(long delay, TimeUnit unit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param delay * the time to delay the subscription * @param unit @@ -4210,7 +4210,7 @@ public final Observable delaySubscription(long delay, TimeUnit unit) { public final Observable delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) { return create(new OnSubscribeDelaySubscription(this, delay, unit, scheduler)); } - + /** * Returns an Observable that delays the subscription to the source Observable until a second Observable * emits an item. @@ -4220,7 +4220,7 @@ public final Observable delaySubscription(long delay, TimeUnit unit, Schedule *
Scheduler:
*
This method does not operate by default on a particular {@link Scheduler}.
* - * + * * @param subscriptionDelay * a function that returns an Observable that triggers the subscription to the source Observable * once it emits any item @@ -4243,7 +4243,7 @@ public final Observable delaySubscription(Func0> *
Scheduler:
*
This method does not operate by default on a particular {@link Scheduler}.
* - * + * * @param the value type of the other Observable, irrelevant * @param other the other Observable that should trigger the subscription * to this Observable. @@ -4257,7 +4257,7 @@ public final Observable delaySubscription(Observable other) { } return create(new OnSubscribeDelaySubscriptionOther(this, other)); } - + /** * Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the * {@link Notification} objects emitted by the source Observable into the items or notifications they @@ -4268,7 +4268,7 @@ public final Observable delaySubscription(Observable other) { *
Scheduler:
*
{@code dematerialize} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits the items and notifications embedded in the {@link Notification} objects * emitted by the source Observable * @throws OnErrorNotImplementedException @@ -4288,7 +4288,7 @@ public final Observable dematerialize() { *
Scheduler:
*
{@code distinct} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits only those items emitted by the source Observable that are distinct from * each other * @see ReactiveX operators documentation: Distinct @@ -4306,7 +4306,7 @@ public final Observable distinct() { *
Scheduler:
*
{@code distinct} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param keySelector * a function that projects an emitted item to a key value that is used to decide whether an item * is distinct from another one or not @@ -4326,7 +4326,7 @@ public final Observable distinct(Func1 keySelecto *
Scheduler:
*
{@code distinctUntilChanged} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return an Observable that emits those items from the source Observable that are distinct from their * immediate predecessors * @see ReactiveX operators documentation: Distinct @@ -4344,7 +4344,7 @@ public final Observable distinctUntilChanged() { *
Scheduler:
*
{@code distinctUntilChanged} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param keySelector * a function that projects an emitted item to a key value that is used to decide whether an item * is distinct from another one or not @@ -4364,7 +4364,7 @@ public final Observable distinctUntilChanged(Func1Scheduler: *
{@code doOnCompleted} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param onCompleted * the action to invoke when the source Observable calls {@code onCompleted} * @return the source Observable with the side-effecting behavior applied @@ -4389,24 +4389,7 @@ public final void onNext(T args) { return lift(new OperatorDoOnEach(observer)); } - - /** - * Modifies the source Observable so that it invokes an action when it calls {@code onCompleted} and no items were emitted. - *

- *

- *
Scheduler:
- *
{@code doOnEmpty} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param onEmpty - * the action to invoke when the source Observable calls {@code onCompleted}, contingent on no items were emitted - * @return the source Observable with the side-effecting behavior applied - * @see ReactiveX operators documentation: Do - */ - public final Observable doOnEmpty(final Action0 onEmpty) { - return lift(new OperatorDoOnEmpty(onEmpty)); - } - + /** * Modifies the source Observable so that it invokes an action for each item it emits. *

@@ -4415,7 +4398,7 @@ public final Observable doOnEmpty(final Action0 onEmpty) { *

Scheduler:
*
{@code doOnEach} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param onNotification * the action to invoke for each item emitted by the source Observable * @return the source Observable with the side-effecting behavior applied From 0b240aaa22222895d77b871b50aad172b898818d Mon Sep 17 00:00:00 2001 From: Thomas Nield Date: Wed, 20 Jan 2016 13:25:41 -0600 Subject: [PATCH 04/10] put doOnEmpty back --- src/main/java/rx/Observable.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 9287e105de..8617689b0e 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -4390,6 +4390,24 @@ public final void onNext(T args) { return lift(new OperatorDoOnEach(observer)); } + + /** + * Modifies the source Observable so that it invokes an action when it calls {@code onCompleted} and no items were emitted. + *

+ *

+ *
Scheduler:
+ *
{@code doOnEmpty} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onEmpty + * the action to invoke when the source Observable calls {@code onCompleted}, contingent on no items were emitted + * @return the source Observable with the side-effecting behavior applied + * @see ReactiveX operators documentation: Do + */ + public final Observable doOnEmpty(final Action0 onEmpty) { + return lift(new OperatorDoOnEmpty(onEmpty)); + } + /** * Modifies the source Observable so that it invokes an action for each item it emits. *

From 605d4d1ac28e0eef7a4c74e8901079c48c89e903 Mon Sep 17 00:00:00 2001 From: Thomas Nield Date: Tue, 26 Jan 2016 06:57:54 -0600 Subject: [PATCH 05/10] use assertFalse --- .../java/rx/internal/operators/OperatorDoOnEmptyTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java b/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java index 472887c6b3..b7f5c1c487 100644 --- a/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java +++ b/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; public final class OperatorDoOnEmptyTest { @@ -46,7 +47,7 @@ public void call() { } }).subscribe(); - assertTrue(!wasCalled.get()); + assertFalse(wasCalled.get()); } @Test @@ -91,7 +92,7 @@ public void call() { subscription.unsubscribe(); - assertTrue(!wasCalled.get()); + assertFalse(wasCalled.get()); } @Test @@ -115,7 +116,7 @@ public void call() { assertTrue(subscriber.getOnNextEvents().size() == 1); assertTrue(subscriber.getOnCompletedEvents().isEmpty()); assertTrue(subscriber.getOnErrorEvents().size() == 0); - assertTrue(!wasCalled.get()); + assertFalse(wasCalled.get()); } @Test From 4150aabf4091c194f9730c9e88dbd395420b31cd Mon Sep 17 00:00:00 2001 From: Thomas Nield Date: Tue, 9 Feb 2016 09:25:16 -0600 Subject: [PATCH 06/10] Implement Throwable, not Exception --- src/main/java/rx/internal/operators/OperatorDoOnEmpty.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java index 24dbc34a15..e0260b16bd 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java @@ -29,7 +29,7 @@ public void onCompleted() { if (isEmpty) { try { onEmpty.call(); - } catch (Exception e) { + } catch (Throwable e) { Exceptions.throwIfFatal(e); } } From 4f8fa97936c5f3b7c3e331d7fe04c3b051fc787c Mon Sep 17 00:00:00 2001 From: Thomas Nield Date: Tue, 9 Feb 2016 09:27:14 -0600 Subject: [PATCH 07/10] Forgot an assertFalse --- src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java b/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java index b7f5c1c487..918633a0ae 100644 --- a/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java +++ b/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java @@ -88,7 +88,7 @@ public void call() { source.onNext(2); - assertTrue(!source.hasObservers()); + assertFalse(source.hasObservers()); subscription.unsubscribe(); From d0f95c2ea60cd2005bb8670b60fa66459be8b4f4 Mon Sep 17 00:00:00 2001 From: Thomas Nield Date: Tue, 9 Feb 2016 20:03:41 -0600 Subject: [PATCH 08/10] use Exceptions.throwOrReport() for OperatorDoOnEmpty --- src/main/java/rx/internal/operators/OperatorDoOnEmpty.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java index e0260b16bd..6fa6103e2c 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java @@ -30,7 +30,7 @@ public void onCompleted() { try { onEmpty.call(); } catch (Throwable e) { - Exceptions.throwIfFatal(e); + Exceptions.throwOrReport(e,this); } } child.onCompleted(); From 02ffb8cc2521c5dd9776cf5fc5e47beab278dfea Mon Sep 17 00:00:00 2001 From: Thomas Nield Date: Wed, 10 Feb 2016 14:30:57 -0600 Subject: [PATCH 09/10] Put return after onError() call --- src/main/java/rx/internal/operators/OperatorDoOnEmpty.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java index 6fa6103e2c..ac4a84511e 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java @@ -31,6 +31,7 @@ public void onCompleted() { onEmpty.call(); } catch (Throwable e) { Exceptions.throwOrReport(e,this); + return; } } child.onCompleted(); From 70372f249d0cff9a0499a7775547fbb6d4584a45 Mon Sep 17 00:00:00 2001 From: Thomas Nield Date: Thu, 3 Mar 2016 01:00:12 -0600 Subject: [PATCH 10/10] Unsubscription and fixes for doOnEmpty() --- src/main/java/rx/internal/operators/OperatorDoOnEmpty.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java index ac4a84511e..ae34130ccf 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java @@ -33,8 +33,12 @@ public void onCompleted() { Exceptions.throwOrReport(e,this); return; } + if (!isUnsubscribed()) { + child.onCompleted(); + } + } else { + child.onCompleted(); } - child.onCompleted(); done = true; }