Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4390,6 +4390,24 @@ public final void onNext(T args) {
return lift(new OperatorDoOnEach<T>(observer));
}


/**
* Modifies the source Observable so that it invokes an action when it calls {@code onCompleted} and no items were emitted.
* <p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onEmpty
* the action to invoke when the source Observable calls {@code onCompleted}, contingent on no items were emitted
* @return the source Observable with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
public final Observable<T> doOnEmpty(final Action0 onEmpty) {
return lift(new OperatorDoOnEmpty<T>(onEmpty));
}

/**
* Modifies the source Observable so that it invokes an action for each item it emits.
* <p>
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/rx/internal/operators/OperatorDoOnEmpty.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package rx.internal.operators;

import rx.Observable;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.Action0;

public final class OperatorDoOnEmpty<T> implements Observable.Operator<T, T> {

private final Action0 onEmpty;

public OperatorDoOnEmpty(Action0 onEmpty) {
this.onEmpty = onEmpty;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {

return new Subscriber<T>(child) {

private boolean isEmpty = true;
private boolean done = false;

@Override
public void onCompleted() {
if (done) {
return;
}
if (isEmpty) {
try {
onEmpty.call();
} catch (Throwable e) {
Exceptions.throwOrReport(e,this);
return;
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add return; after call to Exceptions.throwOrReport. Thomas I suggest you read the comments on the github site because it seems like you are reacting to truncated comments (perhaps email notifications).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right I was reading email notifications.

if (!isUnsubscribed()) {
child.onCompleted();
}
} else {
child.onCompleted();
}
done = true;
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because onEmpty.call could take a non-trivial time to complete I would normally put in a isUnsubscribed check before calling child.onCompleted(). I'd suggest this rejig:

            @Override
            public void onCompleted() {
                if (done) {
                    return;
                }
                if (isEmpty) {
                    try {
                        onEmpty.call();
                    } catch (Throwable e) {
                        Exceptions.throwOrReport(e,this);
                        return;
                    }
                    if (!isUnsubscribed()) {
                        child.onCompleted();
                    }
                } else {
                    child.onCompleted();
                }
                done = true;
            }


@Override
public void onError(Throwable e) {
if (done) {
return;
}
child.onError(e);
}

@Override
public void onNext(T t) {
if (done) {
return;
}
isEmpty = false;
child.onNext(t);
}
};
}

}
149 changes: 149 additions & 0 deletions src/test/java/rx/internal/operators/OperatorDoOnEmptyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/



package rx.internal.operators;

import org.junit.Test;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

public final class OperatorDoOnEmptyTest {

@Test
public void testNonEmpty() {
Observable<String> source = Observable.just("Chicago", "Houston", "Phoenix");

final AtomicBoolean wasCalled = new AtomicBoolean(false);

source.doOnEmpty(new Action0() {
@Override
public void call() {
wasCalled.set(true);
}
}).subscribe();

assertFalse(wasCalled.get());
}

@Test
public void testEmpty() {
Observable<String> source = Observable.empty();

final AtomicBoolean wasCalled = new AtomicBoolean(false);

source.doOnEmpty(new Action0() {
@Override
public void call() {
wasCalled.set(true);
}
}).subscribe();

assertTrue(wasCalled.get());
}

@Test
public void testUnsubscription() {
final AtomicBoolean wasCalled = new AtomicBoolean(false);

PublishSubject<Integer> source = PublishSubject.create();

Subscription subscription = source.doOnEmpty(new Action0() {
@Override
public void call() {
wasCalled.set(true);
}
}).take(3).subscribe();

assertTrue(source.hasObservers());

source.onNext(0);
source.onNext(1);

assertTrue(source.hasObservers());

source.onNext(2);

assertFalse(source.hasObservers());

subscription.unsubscribe();

assertFalse(wasCalled.get());
}

@Test
public void testBackPressure() {

final AtomicBoolean wasCalled = new AtomicBoolean(false);

Observable<Integer> source = Observable.range(0,1000).doOnEmpty(new Action0() {
@Override
public void call() {
wasCalled.set(true);
}
});

TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>(0);

source.subscribe(subscriber);

subscriber.requestMore(1);

assertTrue(subscriber.getOnNextEvents().size() == 1);
assertTrue(subscriber.getOnCompletedEvents().isEmpty());
assertTrue(subscriber.getOnErrorEvents().size() == 0);
assertFalse(wasCalled.get());
}

@Test
public void subscriberStateTest() {
final AtomicInteger counter = new AtomicInteger(0);

final AtomicInteger callCount = new AtomicInteger(0);

Observable<Integer> o = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.range(1, counter.getAndIncrement() % 2);
}
}).doOnEmpty(new Action0() {
@Override
public void call() {
callCount.incrementAndGet();
}
});

o.subscribe();
o.subscribe();
o.subscribe();
o.subscribe();
o.subscribe();

assert(callCount.get() == 3);
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a unit test for unsubscription would be good

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit test that verifies backpressure composes through.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay will do.


}