diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 9287e105de..8617689b0e 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -4390,6 +4390,24 @@ public final void onNext(T args) { return lift(new OperatorDoOnEach(observer)); } + + /** + * Modifies the source Observable so that it invokes an action when it calls {@code onCompleted} and no items were emitted. + *

+ *

+ *
Scheduler:
+ *
{@code doOnEmpty} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onEmpty + * the action to invoke when the source Observable calls {@code onCompleted}, contingent on no items were emitted + * @return the source Observable with the side-effecting behavior applied + * @see ReactiveX operators documentation: Do + */ + public final Observable doOnEmpty(final Action0 onEmpty) { + return lift(new OperatorDoOnEmpty(onEmpty)); + } + /** * Modifies the source Observable so that it invokes an action for each item it emits. *

diff --git a/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java new file mode 100644 index 0000000000..ae34130ccf --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorDoOnEmpty.java @@ -0,0 +1,64 @@ +package rx.internal.operators; + +import rx.Observable; +import rx.Subscriber; +import rx.exceptions.Exceptions; +import rx.functions.Action0; + +public final class OperatorDoOnEmpty implements Observable.Operator { + + private final Action0 onEmpty; + + public OperatorDoOnEmpty(Action0 onEmpty) { + this.onEmpty = onEmpty; + } + + @Override + public Subscriber call(final Subscriber child) { + + return new Subscriber(child) { + + private boolean isEmpty = true; + private boolean done = false; + + @Override + public void onCompleted() { + if (done) { + return; + } + if (isEmpty) { + try { + onEmpty.call(); + } catch (Throwable e) { + Exceptions.throwOrReport(e,this); + return; + } + if (!isUnsubscribed()) { + child.onCompleted(); + } + } else { + child.onCompleted(); + } + done = true; + } + + @Override + public void onError(Throwable e) { + if (done) { + return; + } + child.onError(e); + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + isEmpty = false; + child.onNext(t); + } + }; + } + +} diff --git a/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java b/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java new file mode 100644 index 0000000000..918633a0ae --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java @@ -0,0 +1,149 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package rx.internal.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Subscription; +import rx.functions.Action0; +import rx.functions.Func0; +import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +public final class OperatorDoOnEmptyTest { + + @Test + public void testNonEmpty() { + Observable source = Observable.just("Chicago", "Houston", "Phoenix"); + + final AtomicBoolean wasCalled = new AtomicBoolean(false); + + source.doOnEmpty(new Action0() { + @Override + public void call() { + wasCalled.set(true); + } + }).subscribe(); + + assertFalse(wasCalled.get()); + } + + @Test + public void testEmpty() { + Observable source = Observable.empty(); + + final AtomicBoolean wasCalled = new AtomicBoolean(false); + + source.doOnEmpty(new Action0() { + @Override + public void call() { + wasCalled.set(true); + } + }).subscribe(); + + assertTrue(wasCalled.get()); + } + + @Test + public void testUnsubscription() { + final AtomicBoolean wasCalled = new AtomicBoolean(false); + + PublishSubject source = PublishSubject.create(); + + Subscription subscription = source.doOnEmpty(new Action0() { + @Override + public void call() { + wasCalled.set(true); + } + }).take(3).subscribe(); + + assertTrue(source.hasObservers()); + + source.onNext(0); + source.onNext(1); + + assertTrue(source.hasObservers()); + + source.onNext(2); + + assertFalse(source.hasObservers()); + + subscription.unsubscribe(); + + assertFalse(wasCalled.get()); + } + + @Test + public void testBackPressure() { + + final AtomicBoolean wasCalled = new AtomicBoolean(false); + + Observable source = Observable.range(0,1000).doOnEmpty(new Action0() { + @Override + public void call() { + wasCalled.set(true); + } + }); + + TestSubscriber subscriber = new TestSubscriber(0); + + source.subscribe(subscriber); + + subscriber.requestMore(1); + + assertTrue(subscriber.getOnNextEvents().size() == 1); + assertTrue(subscriber.getOnCompletedEvents().isEmpty()); + assertTrue(subscriber.getOnErrorEvents().size() == 0); + assertFalse(wasCalled.get()); + } + + @Test + public void subscriberStateTest() { + final AtomicInteger counter = new AtomicInteger(0); + + final AtomicInteger callCount = new AtomicInteger(0); + + Observable o = Observable.defer(new Func0>() { + @Override + public Observable call() { + return Observable.range(1, counter.getAndIncrement() % 2); + } + }).doOnEmpty(new Action0() { + @Override + public void call() { + callCount.incrementAndGet(); + } + }); + + o.subscribe(); + o.subscribe(); + o.subscribe(); + o.subscribe(); + o.subscribe(); + + assert(callCount.get() == 3); + } + +}