Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
state->dispose();
state->get_observer()->on_error(err);
state->dispose();
}

void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); }
Expand Down
4 changes: 2 additions & 2 deletions src/rpp/rpp/operators/details/combining_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
disposable->dispose();
disposable->get_observer_under_lock()->on_error(err);
disposable->dispose();
}

void on_completed() const
{
if (disposable->decrement_on_completed())
{
disposable->dispose();
disposable->get_observer_under_lock()->on_completed();
disposable->dispose();
}
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
m_disposable->dispose();
m_disposable->get_observer_under_lock()->on_error(err);
m_disposable->dispose();
}

void on_completed() const
Expand All @@ -83,8 +83,8 @@ namespace rpp::operators::details
{
m_disposable->remove(v);
}
m_disposable->dispose();
m_disposable->get_observer_under_lock()->on_completed();
m_disposable->dispose();
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/rpp/rpp/operators/on_error_resume_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
disposable.dispose();

std::optional<std::invoke_result_t<Selector, std::exception_ptr>> selector_obs;
try
{
Expand All @@ -53,12 +51,13 @@ namespace rpp::operators::details
{
std::move(selector_obs).value().subscribe(std::move(observer));
}
disposable.dispose();
}

void on_completed() const
{
disposable.dispose();
observer.on_completed();
disposable.dispose();
}

void set_upstream(const disposable_wrapper& d)
Expand Down
4 changes: 2 additions & 2 deletions src/rpp/rpp/operators/switch_on_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
m_state->dispose();
m_state->get_observer()->on_error(err);
m_state->dispose();
}

void on_completed() const
Expand Down Expand Up @@ -106,8 +106,8 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
m_state->dispose();
m_state->get_observer()->on_error(err);
m_state->dispose();
}

void on_completed() const
Expand Down
6 changes: 3 additions & 3 deletions src/rpp/rpp/operators/with_latest_from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
disposable->dispose();
disposable->get_observer_under_lock()->on_error(err);
disposable->dispose();
}

static constexpr rpp::utils::empty_function_t<> on_completed{};
Expand Down Expand Up @@ -112,14 +112,14 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
disposable->dispose();
disposable->get_observer_under_lock()->on_error(err);
disposable->dispose();
}

void on_completed() const
{
disposable->dispose();
disposable->get_observer_under_lock()->on_completed();
disposable->dispose();
}
};

Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/sources/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ namespace rpp::details
if (state->itr.value() == std::cend(state->container))
{
state->observer.on_completed();
state->dispose();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this one is really needed, same thing if we need a dispose call on line 109

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, actually half of this calls would happen inside on_error/on_completed of downstream

return;
}

Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/utils/functors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include <exception>
#include <tuple>

namespace rpp::utils
Expand Down
4 changes: 3 additions & 1 deletion src/tests/rpp/test_combine_latest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ TEST_CASE("combine_latest satisfies disposable contracts")
auto observable_disposable = rpp::composite_disposable_wrapper::make();
{
auto observable = observable_with_disposable<int>(observable_disposable);
auto op = rpp::ops::combine_latest(observable);

test_operator_with_disposable<int>(rpp::ops::combine_latest(observable));
test_operator_with_disposable<int>(op);
test_operator_finish_before_dispose<int>(op);
}

CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2);
Expand Down
4 changes: 3 additions & 1 deletion src/tests/rpp/test_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,10 @@ TEST_CASE("merge satisfies disposable contracts")
auto observable_disposable = rpp::composite_disposable_wrapper::make();
{
auto observable = observable_with_disposable<int>(observable_disposable);
auto op = rpp::ops::merge_with(observable);

test_operator_with_disposable<int>(rpp::ops::merge_with(observable));
test_operator_with_disposable<int>(op);
test_operator_finish_before_dispose<int>(op);
}
CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2);
}
5 changes: 3 additions & 2 deletions src/tests/rpp/test_on_error_resume_next.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ TEST_CASE("on_error_resume_next satisfies disposable contracts")
auto observable_disposable = rpp::composite_disposable_wrapper::make();
{
auto observable = observable_with_disposable<int>(observable_disposable);
auto op = rpp::ops::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty<int>(); });

test_operator_with_disposable<int>(
rpp::ops::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty<int>(); }));
test_operator_with_disposable<int>(op);
test_operator_finish_before_dispose<int>(op);
}

CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ TEST_CASE("with_latest_from satisfies disposable contracts")
auto observable_disposable = rpp::composite_disposable_wrapper::make();
{
auto observable = observable_with_disposable<int>(observable_disposable);
auto op = rpp::ops::with_latest_from(observable);

test_operator_with_disposable<int>(rpp::ops::with_latest_from(observable));
test_operator_with_disposable<int>(op);
test_operator_finish_before_dispose<int>(op);
}

CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2);
Expand Down
4 changes: 3 additions & 1 deletion src/tests/rpp/test_zip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,10 @@ TEST_CASE("zip satisfies disposable contracts")
auto observable_disposable = rpp::composite_disposable_wrapper::make();
{
auto observable = observable_with_disposable<int>(observable_disposable);
auto op = rpp::ops::zip(observable);

test_operator_with_disposable<int>(rpp::ops::zip(observable));
test_operator_with_disposable<int>(op);
test_operator_finish_before_dispose<int>(op);
}

CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2);
Expand Down
38 changes: 38 additions & 0 deletions src/tests/utils/disposable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@ struct wrapped_observable_strategy_no_set_upstream
auto subscribe(auto&&) const {}
};

template<typename T>
void test_operator_over_observable_finish_before_dispose(auto&& op)
{
SECTION("operator calls on_error before dispose")
{
bool callback_called = false;
auto observable_disposable = rpp::make_callback_disposable([&callback_called]() noexcept {
callback_called = true;
});
auto observable = rpp::source::create<T>([&observable_disposable](auto&& obs) {
obs.set_upstream(observable_disposable);
obs.on_error(std::make_exception_ptr(std::runtime_error{""}));
});

op(observable) | rpp::ops::subscribe([](const auto&) {}, [&callback_called](const std::exception_ptr&) { CHECK(!callback_called); });

@AlexInLog AlexInLog Apr 11, 2024

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we also need to add check over CHECK(callback_called) after subscribe

}

SECTION("operator calls on_completed before dispose")
{
bool callback_called = false;
auto observable_disposable = rpp::make_callback_disposable([&callback_called]() noexcept {
callback_called = true;
});
auto observable = rpp::source::create<T>([&observable_disposable](auto&& obs) {
obs.set_upstream(observable_disposable);
obs.on_completed();
});

op(observable) | rpp::ops::subscribe([](const auto&) {}, [&callback_called]() { CHECK(!callback_called); });
}
}

template<typename T>
void test_operator_over_observable_with_disposable(auto&& op)
{
Expand Down Expand Up @@ -112,3 +144,9 @@ void test_operator_with_disposable(auto&& op)
{
test_operator_over_observable_with_disposable<T>([op](auto&& observable) { return observable | op; });
}

template<typename T>
void test_operator_finish_before_dispose(auto&& op)
{
test_operator_over_observable_finish_before_dispose<T>([op](auto&& observable) { return observable | op; });
}