Skip to content

1.x: implement OperatorDoOnEmpty, with Observable.doOnEmpty() operator#3624

Closed
thomasnield wants to merge 10 commits into
ReactiveX:1.xfrom
thomasnield:1.x
Closed

1.x: implement OperatorDoOnEmpty, with Observable.doOnEmpty() operator#3624
thomasnield wants to merge 10 commits into
ReactiveX:1.xfrom
thomasnield:1.x

Conversation

@thomasnield
Copy link
Copy Markdown

See #3621

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this call get a try catch and Exceptions.throwOrReport called? If so, I'll put in a PR for OperatorDoAfterTerminate as well.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have misunderstood Exceptions.throwOrReport. Does "reporting" pass it up the chain via onError() and throw simply throws an exception without passing it?

@thomasnield
Copy link
Copy Markdown
Author

I saw there was an OperatorDoOnEach which was being paired with an Observer, so I went ahead and scrapped OperatorDoOnEmpty in favor of using that pattern.

@thomasnield
Copy link
Copy Markdown
Author

Okay two questions:

  1. What exactly am I testing for with unsubscription? That Subscription.isUnsubscribed() is true after calling Subscription.unsubscribe()? That items are no longer emitting? Do I have to test for hot and cold sources?

  2. Can you point me to another unit test that does the backpressure test you have in mind?

Comment thread src/main/java/rx/Observable.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure the imports stay with * because it is a source of merge conflict otherwise.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

@akarnokd
Copy link
Copy Markdown
Member

  1. unsubscription test:

use a PublishSubject, this operator and take, push through values up to the limit and check hasObservers on the subject

  1. backpressure

use a range, this operator and subscribe a TestSubscriber which has been created with 0 as an initial value. Use requestMore(1) and assert the testsubscriber received only a single value, no error and no completion

@thomasnield
Copy link
Copy Markdown
Author

@akarnokd Done. The unsubscription test made sense to me. Not too familiar with the TestSubscriber. Regardless, I implemented what I think you meant and the tests passed.

@thomasnield
Copy link
Copy Markdown
Author

@akarnokd Might help if I actually used the operator in question. One second...

@thomasnield
Copy link
Copy Markdown
Author

Okay unit tests are complete.

@davidmoten
Copy link
Copy Markdown
Collaborator

Hey Thomas, for your convenience what you normally do is make a branch of 1.x say doOnEmpty, do your change in that branch and submit your PR from that branch rather than from 1.x. This PR doesn't have to be changed though.

@thomasnield
Copy link
Copy Markdown
Author

Okay, thanks @davidmoten. I'll branch the feature separately next time.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use assertTrue everywhere in the unit tests rather than assert to ensure the assertions are run regardless of runtime flags.

@thomasnield
Copy link
Copy Markdown
Author

Dang it IDEA, sorry my habits have caused Observable to have irrelevant changes. One moment.

@akarnokd
Copy link
Copy Markdown
Member

There are too many, probably irrelevant, changes in Observable.java. Could you start from a clean file?

In addition, your reuse of doOnEach may not work because were sharing the empty flag across subscriptions whose source may not be always empty. For example,

Observable<Integer> o = Observable.defer(
    () -> Observable.range(1, counter.getAndIncrement() % 2))
.doOnEmpty(() -> System.out.println("Empty!"));

o.subscribe(System.out::println);
o.subscribe(System.out::println);
o.subscribe(System.out::println);
o.subscribe(System.out::println);
o.subscribe(System.out::println);

This should print 3 Empty! but only prints 1.

@thomasnield
Copy link
Copy Markdown
Author

That seems like a deliberate side effect almost... so what would the expected behavior be? Because I don't really see how it is much different from onCompleted() other than it has a condition before performing the action. I'll try to reason through what you're showing...

@thomasnield
Copy link
Copy Markdown
Author

Oh shoot, I think I see what you are saying. I was sharing state between the subscriptions. Let me figure out how to do this...

@thomasnield
Copy link
Copy Markdown
Author

Alright, I started over on the implementation and placed that Observable.defer() case as an additional test. All five unit tests passed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use assertFalse

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You forgot one assertTrue.

@akarnokd akarnokd changed the title implement OperatorDoOnEmpty, with Observable.doOnEmpty() operator 1.x: implement OperatorDoOnEmpty, with Observable.doOnEmpty() operator Jan 22, 2016
try {
onEmpty.call();
} catch (Exception e) {
Exceptions.throwIfFatal(e);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a non-fatal error occurs in onEmpty.call() then at the moment it would complete normally. Moreover you should catch for Throwables not just Exceptions.

This should be

catch (Throwable e) {
    Exceptions.throwOrReport(e, this);
    return;
}

Tests for these scenarios would be nice too.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I will put all these on my list today.

@thomasnield
Copy link
Copy Markdown
Author

Question. Why is it the policy that doOnXXX() operators won't emit an exception that occurs in the supplied side effect function? Is the idea that it is up to the client to deal with the exception that occurs in _their _function since it is after all a side-effect? And it is not part of the Observable?

EDIT

Let me rephrase that. Should exceptions in the side-effect function be emitted to the child's onError()? Because I don't think I'm doing that here...

try {
onEmpty.call();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the proper way to handle an error occurring in the side effect function onEmpty?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No Thomas, I've given you the way to do it in my previous comment (and the method used calls onError).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I thought I missed something in my notes. I'll fix it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceptions.throwOrReport(), that's what I was looking for (and I think that is what you said). Thanks for your patience, I'm learning a lot.

onEmpty.call();
} catch (Throwable e) {
Exceptions.throwOrReport(e,this);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add return; after call to Exceptions.throwOrReport. Thomas I suggest you read the comments on the github site because it seems like you are reacting to truncated comments (perhaps email notifications).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right I was reading email notifications.

}
child.onCompleted();
done = true;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because onEmpty.call could take a non-trivial time to complete I would normally put in a isUnsubscribed check before calling child.onCompleted(). I'd suggest this rejig:

            @Override
            public void onCompleted() {
                if (done) {
                    return;
                }
                if (isEmpty) {
                    try {
                        onEmpty.call();
                    } catch (Throwable e) {
                        Exceptions.throwOrReport(e,this);
                        return;
                    }
                    if (!isUnsubscribed()) {
                        child.onCompleted();
                    }
                } else {
                    child.onCompleted();
                }
                done = true;
            }

@thomasnield
Copy link
Copy Markdown
Author

Okay it's done, @davidmoten. Please let me know if I missed anything else...

@thomasnield
Copy link
Copy Markdown
Author

It looks like a test failed. I don't think it has to do with this feature, but I'll confirm tomorrow after I update my fork and see if that rids it.

@davidmoten
Copy link
Copy Markdown
Collaborator

I think this operator was about done so I chucked it into https://github.com/davidmoten/rxjava-extras. That ok with you @thomasnield?

@thomasnield
Copy link
Copy Markdown
Author

@davidmoten That sounds great! I'm glad you did, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants