diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index c4cc12823..ccab31b57 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -128,7 +128,6 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { state->get_observer()->on_error(err); - state->get_disposable()->dispose(); } void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); } diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index b4e7649fe..486ea56c8 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -63,7 +63,6 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { m_state->get_observer()->on_error(err); - m_state->dispose(); } void on_completed() const @@ -107,7 +106,6 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { m_state->get_observer()->on_error(err); - m_state->dispose(); } void on_completed() const diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 058739e5d..be97c2a22 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -57,7 +57,6 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - disposable->dispose(); disposable->get_observer_with_timeout_under_lock()->observer.on_error(err); } }; @@ -89,26 +88,16 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const noexcept { - { - const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); - if (disposable->is_disposed()) - return; - + const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + if (!disposable->is_disposed()) obs_with_timeout->observer.on_error(err); - } - disposable->dispose(); } void on_completed() const noexcept { - { - const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); - if (disposable->is_disposed()) - return; - + const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + if (!disposable->is_disposed()) obs_with_timeout->observer.on_completed(); - } - disposable->dispose(); } }; diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index 85b0dac97..d09e3b2c4 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -36,7 +36,6 @@ namespace rpp::details catch (...) { this->observer.on_error(std::current_exception()); - this->disposable.dispose(); } } @@ -94,7 +93,6 @@ namespace rpp::details if (state->itr.value() == std::cend(state->container)) { state->observer.on_completed(); - state->disposable.dispose(); return; }