From e7a46e64419971ba03fe2dc4160208291a09adc8 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 14 Jan 2024 00:33:39 +0300 Subject: [PATCH 1/8] Init window_toggle --- src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/window_toggle.hpp | 183 ++++++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 src/rpp/rpp/operators/window_toggle.hpp diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index d41d3dcd8..0b19cccd1 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -37,6 +37,7 @@ #include #include #include +#include /** * @defgroup filtering_operators Filtering Operators diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp new file mode 100644 index 000000000..79e717a0c --- /dev/null +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -0,0 +1,183 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include +#include +#include + +#include + +namespace rpp +{ +template +using windowed_observable = decltype(std::declval>().get_observable()); +} + +namespace rpp::operators::details +{ +template +struct window_toggle_state +{ + using Observable = rpp::utils::extract_observer_type_t; + using value_type = rpp::utils::extract_observable_type_t; + using Subject = forwarding_subject; + + static_assert(std::same_as().get_observable())>); + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn closings; + + std::mutex mutex{}; + mutable std::vector().get_observer())> observers{}; +}; + +template +struct window_toggle_closing_observer_strategy +{ + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr disposble; + std::shared_ptr> state; + typename window_toggle_state::Subject subj; + + template + void on_next(T&& v) const + { + disposble->remove(subj.get_disposable()); + subj.get_observer().on_completed(); + std::lock_guard lock{state->mutex}; + state->observers.erase(state->observers.remove(subj.get_observer())); + } + + void on_error(const std::exception_ptr& err) const + { + disposble->dispose(); + + std::lock_guard lock{state->mutex}; + for (const auto& obs : state->observers) + obs.on_error(err); + state->observer.on_error(err); + } + void on_completed() const {} + + void set_upstream(const disposable_wrapper& d) const { subj.get_disposable().add(d); } + bool is_disposed() const { return subj.get_disposable().is_disposed(); } +}; + +template +struct window_toggle_opening_observer_strategy +{ + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr disposble; + std::shared_ptr> state; + + template + void on_next(T&& v) const + { + std::lock_guard lock{state->mutex}; + typename window_toggle_state::Subject subject{disposble}; + state->subjects.emplace_back(subject.get_observer()); + state->observer.on_next(subject.get_observable()); + disposble->add(subject.get_disposable()); + state->closings(std::forward(v)).subscribe(subject.get_disposable(), ); + } + + void on_error(const std::exception_ptr& err) const + { + disposble->dispose(); + + std::lock_guard lock{state->mutex}; + for (const auto& obs : state->observers) + obs.on_error(err); + state->observer.on_error(err); + } + void on_completed() const {} + + void set_upstream(const disposable_wrapper& d) const { disposble->add(d); } + bool is_disposed() const { return disposble->is_disposed(); } +}; + +template + requires rpp::constraint::observable>> +class window_toggle_observer_strategy +{ +public: + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + window_toggle_observer_strategy(TObserver&& observer, const TOpeningsObservable& openings, const TClosingsSelectorFn& closings) + : m_state{std::make_shared>(std::move(observer), closings)} + { + m_state->observer.set_upstream(m_disposble->add_ref()); + m_disposble->add(openings.subscribe_with_disposable()); + } + + void on_next(const auto& v) const + { + std::lock_guard lock{m_state->mutex}; + for (const auto& obs : m_state->observers) + obs.on_next(v); + } + + void on_error(const std::exception_ptr& err) const + { + m_disposble->dispose(); + + std::lock_guard lock{m_state->mutex}; + for (const auto& obs : m_state->observers) + obs.on_error(err); + m_state->observer.on_error(err); + } + + void on_completed() const + { + m_disposble->dispose(); + + std::lock_guard lock{m_state->mutex}; + for (const auto& obs : m_state->observers) + obs.on_completed(); + m_state->observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) const { m_disposble->add(d); } + + bool is_disposed() const { return m_disposble->is_disposed(); } + +private: + std::shared_ptr m_disposble = std::make_shared(); + std::shared_ptr> m_state; +}; + +template + requires rpp::constraint::observable>> +struct window_toggle_t : public operators::details::operator_observable_strategy +{ + template + using result_value = windowed_observable; + + template + using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; +}; +} + +namespace rpp::operators +{ +template + requires rpp::constraint::observable>> +auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector) +{ + return details::window_toggle_t{std::forward(openings), std::forward(closings_selector)}; +} +} // namespace rpp::operators \ No newline at end of file From cdc9e6acc7208276ef4417b9ee2b170f8033bc06 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 14 Jan 2024 13:43:40 +0300 Subject: [PATCH 2/8] fixes --- .../operators/details/forwarding_subject.hpp | 4 +- src/rpp/rpp/operators/window_toggle.hpp | 82 +++++++++---------- 2 files changed, 42 insertions(+), 44 deletions(-) diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index 5605c1b3e..6a285e918 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -58,9 +58,9 @@ class forwarding_strategy m_state->on_subscribe(std::forward(observer)); } - rpp::disposable_wrapper get_disposable() const + rpp::composite_disposable_wrapper get_disposable() const { - return rpp::disposable_wrapper::from_weak(m_state); + return rpp::composite_disposable_wrapper::from_weak(m_state); } private: diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 79e717a0c..e242e54b9 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -17,8 +17,6 @@ #include #include -#include - namespace rpp { template @@ -43,85 +41,85 @@ struct window_toggle_state mutable std::vector().get_observer())> observers{}; }; -template +template struct window_toggle_closing_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + std::shared_ptr disposable; + std::shared_ptr state; + typename TState::Subject subj; - std::shared_ptr disposble; - std::shared_ptr> state; - typename window_toggle_state::Subject subj; - - template - void on_next(T&& v) const + void on_next(const auto&) const { - disposble->remove(subj.get_disposable()); - subj.get_observer().on_completed(); + disposable->remove(subj.get_disposable()); + const auto obs = subj.get_observer(); + obs.on_completed(); + std::lock_guard lock{state->mutex}; - state->observers.erase(state->observers.remove(subj.get_observer())); + state->observers.erase(state->observers.remove(obs)); } void on_error(const std::exception_ptr& err) const { - disposble->dispose(); + disposable->dispose(); std::lock_guard lock{state->mutex}; for (const auto& obs : state->observers) obs.on_error(err); state->observer.on_error(err); } - void on_completed() const {} - - void set_upstream(const disposable_wrapper& d) const { subj.get_disposable().add(d); } - bool is_disposed() const { return subj.get_disposable().is_disposed(); } + + static void on_completed() {} + static void set_upstream(const disposable_wrapper&) { } + static bool is_disposed() { return false; } }; -template +template struct window_toggle_opening_observer_strategy { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - - std::shared_ptr disposble; - std::shared_ptr> state; + std::shared_ptr disposable; + std::shared_ptr state; template void on_next(T&& v) const { - std::lock_guard lock{state->mutex}; - typename window_toggle_state::Subject subject{disposble}; - state->subjects.emplace_back(subject.get_observer()); - state->observer.on_next(subject.get_observable()); - disposble->add(subject.get_disposable()); - state->closings(std::forward(v)).subscribe(subject.get_disposable(), ); + typename TState::Subject subject{disposable}; + { + std::lock_guard lock{state->mutex}; + state->subjects.emplace_back(subject.get_observer()); + state->observer.on_next(subject.get_observable()); + } + disposable->add(rpp::disposable_wrapper::from_weak(subject.get_disposable())); + state->closings(std::forward(v)).subscribe(subject.get_disposable(), window_toggle_closing_observer_strategy{disposable, state, subject}); } void on_error(const std::exception_ptr& err) const { - disposble->dispose(); + disposable->dispose(); std::lock_guard lock{state->mutex}; for (const auto& obs : state->observers) obs.on_error(err); state->observer.on_error(err); } - void on_completed() const {} - void set_upstream(const disposable_wrapper& d) const { disposble->add(d); } - bool is_disposed() const { return disposble->is_disposed(); } + static void on_completed() {} + static void set_upstream(const disposable_wrapper&) {} + static bool is_disposed() { return false; } }; template requires rpp::constraint::observable>> class window_toggle_observer_strategy { + using TState = window_toggle_state; public: using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; window_toggle_observer_strategy(TObserver&& observer, const TOpeningsObservable& openings, const TClosingsSelectorFn& closings) - : m_state{std::make_shared>(std::move(observer), closings)} + : m_state{std::make_shared(std::move(observer), closings)} { - m_state->observer.set_upstream(m_disposble->add_ref()); - m_disposble->add(openings.subscribe_with_disposable()); + m_state->observer.set_upstream(m_disposable->add_ref()); + m_disposable->add(openings.subscribe_with_disposable(window_toggle_opening_observer_strategy{m_disposable, m_state})); } void on_next(const auto& v) const @@ -133,7 +131,7 @@ class window_toggle_observer_strategy void on_error(const std::exception_ptr& err) const { - m_disposble->dispose(); + m_disposable->dispose(); std::lock_guard lock{m_state->mutex}; for (const auto& obs : m_state->observers) @@ -143,7 +141,7 @@ class window_toggle_observer_strategy void on_completed() const { - m_disposble->dispose(); + m_disposable->dispose(); std::lock_guard lock{m_state->mutex}; for (const auto& obs : m_state->observers) @@ -151,13 +149,13 @@ class window_toggle_observer_strategy m_state->observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) const { m_disposble->add(d); } + void set_upstream(const disposable_wrapper& d) const { m_disposable->add(d); } - bool is_disposed() const { return m_disposble->is_disposed(); } + bool is_disposed() const { return m_disposable->is_disposed(); } private: - std::shared_ptr m_disposble = std::make_shared(); - std::shared_ptr> m_state; + std::shared_ptr m_disposable = std::make_shared(); + std::shared_ptr m_state; }; template From abff3a1da2cf44f72efa2328c54ce4091010c2c3 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 14 Jan 2024 14:47:19 +0300 Subject: [PATCH 3/8] tests for window_toggle --- src/rpp/rpp/observables/observable.hpp | 16 ++- src/rpp/rpp/operators/combine_latest.hpp | 2 +- src/rpp/rpp/operators/details/strategy.hpp | 3 - src/rpp/rpp/operators/fwd.hpp | 4 + src/rpp/rpp/operators/merge.hpp | 4 +- src/rpp/rpp/operators/take_until.hpp | 2 +- src/rpp/rpp/operators/window_toggle.hpp | 127 +++++++++++------- src/rpp/rpp/operators/with_latest_from.hpp | 2 +- src/rpp/rpp/subjects/details/base_subject.hpp | 2 +- src/rpp/rpp/subjects/fwd.hpp | 2 +- src/rpp/rpp/utils/constraints.hpp | 3 + src/tests/rpp/test_window_toggle.cpp | 116 ++++++++++++++++ 12 files changed, 224 insertions(+), 59 deletions(-) create mode 100644 src/tests/rpp/test_window_toggle.cpp diff --git a/src/rpp/rpp/observables/observable.hpp b/src/rpp/rpp/observables/observable.hpp index 085de06d3..eef06500d 100644 --- a/src/rpp/rpp/observables/observable.hpp +++ b/src/rpp/rpp/observables/observable.hpp @@ -142,10 +142,24 @@ class observable [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(observer&& observer) const { if (!observer.is_disposed()) - m_strategy.subscribe(rpp::composite_disposable_wrapper{std::make_shared>()}, std::move(observer)); + return m_strategy.subscribe(rpp::composite_disposable_wrapper{std::make_shared>()}, std::move(observer)); return {}; } + /** + * @brief Subscribes observer strategy to emissions from this observable. + * + * @details This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed. + * @warning This overloading has some performance penalties, use it only when you really need to use disposable + * @return composite_disposable_wrapper is disposable to be able to dispose observer when it needed + */ + template ObserverStrategy> + requires (!constraint::observer) + [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(ObserverStrategy&& observer_strategy) const + { + return subscribe(rpp::composite_disposable_wrapper{std::make_shared>()}, std::forward(observer_strategy)); + } + /** * @brief Subscribe passed observer to emissions from this observable. * diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 2cf6eb287..64d8e0c04 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -112,7 +112,7 @@ struct combine_latest_t template void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const { - // Need to take ownership over current_thread in case of inner-observables also uses them + // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); } diff --git a/src/rpp/rpp/operators/details/strategy.hpp b/src/rpp/rpp/operators/details/strategy.hpp index 34a5f4a7d..ee5737d1a 100644 --- a/src/rpp/rpp/operators/details/strategy.hpp +++ b/src/rpp/rpp/operators/details/strategy.hpp @@ -21,9 +21,6 @@ #include #include -#include -#include - namespace rpp::operators::details { template diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 4202864e4..9a78fe015 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -148,6 +148,10 @@ template + requires rpp::constraint::observable>> +auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector); } // namespace rpp::operators namespace rpp diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index b4c9a24a8..61d7f59a3 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -134,7 +134,7 @@ struct merge_t template void subscribe(Observer&& observer, const observable_chain_strategy& strategy) const { - // Need to take ownership over current_thread in case of inner-observables also uses them + // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); using InnerObservable = typename observable_chain_strategy::value_type; @@ -160,7 +160,7 @@ struct merge_with_t { merge_observer_strategy> strategy{std::forward(observer)}; - // Need to take ownership over current_thread in case of inner-observables also uses them + // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); strategy.on_next(observable_strategy); diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 5808df8eb..f7b7a90b3 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -101,7 +101,7 @@ struct take_until_t auto d = std::make_shared>>(std::forward(observer)); d->get_observer()->set_upstream(rpp::disposable_wrapper::from_weak(d)); - // Need to take ownership over current_thread in case of inner-observables also uses them + // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); observable.subscribe(take_until_throttle_observer_strategy>{d}); diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index e242e54b9..04c0b2203 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -14,9 +14,14 @@ #include #include +#include #include #include +#include + +#include + namespace rpp { template @@ -34,41 +39,65 @@ struct window_toggle_state static_assert(std::same_as().get_observable())>); - RPP_NO_UNIQUE_ADDRESS TObserver observer; - RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn closings; + struct state_t + { + RPP_NO_UNIQUE_ADDRESS TObserver observer; + mutable std::list().get_observer())> observers{}; + }; + + window_toggle_state(TObserver&& observer, const TClosingsSelectorFn& closings) + : m_state{state_t{std::move(observer)}} + , m_closings{closings} + { + } - std::mutex mutex{}; - mutable std::vector().get_observer())> observers{}; + rpp::operators::details::pointer_under_lock get_state_under_lock() { return rpp::operators::details::pointer_under_lock{m_state}; } + + template + auto get_closing(T&& v) const {return m_closings(std::forward(v)); } + + auto on_new_subject(const Subject& subject) + { + const auto locked_state = get_state_under_lock(); + const auto ptr = &locked_state->observers.emplace_back(subject.get_observer()); + locked_state->observer.on_next(subject.get_observable()); + return ptr; + } + +private: + rpp::operators::details::value_with_mutex m_state{}; + RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings; }; template struct window_toggle_closing_observer_strategy { - std::shared_ptr disposable; - std::shared_ptr state; - typename TState::Subject subj; + std::shared_ptr disposable; + std::shared_ptr state; + rpp::disposable_wrapper subject_disposable; + decltype(std::declval().on_new_subject(std::declval())) ptr; void on_next(const auto&) const { - disposable->remove(subj.get_disposable()); - const auto obs = subj.get_observer(); - obs.on_completed(); - - std::lock_guard lock{state->mutex}; - state->observers.erase(state->observers.remove(obs)); + on_completed(); } void on_error(const std::exception_ptr& err) const { - disposable->dispose(); - - std::lock_guard lock{state->mutex}; - for (const auto& obs : state->observers) + const auto locked_state = state->get_state_under_lock(); + for (const auto& obs : locked_state->observers) obs.on_error(err); - state->observer.on_error(err); + locked_state->observer.on_error(err); + } + + void on_completed() const + { + disposable->remove(subject_disposable); + ptr->on_completed(); + + const auto locked_state= state->get_state_under_lock(); + locked_state->observers.remove_if([ptr=ptr](const auto& obs) { return &obs == ptr; }); } - - static void on_completed() {} static void set_upstream(const disposable_wrapper&) { } static bool is_disposed() { return false; } }; @@ -83,23 +112,17 @@ struct window_toggle_opening_observer_strategy void on_next(T&& v) const { typename TState::Subject subject{disposable}; - { - std::lock_guard lock{state->mutex}; - state->subjects.emplace_back(subject.get_observer()); - state->observer.on_next(subject.get_observable()); - } - disposable->add(rpp::disposable_wrapper::from_weak(subject.get_disposable())); - state->closings(std::forward(v)).subscribe(subject.get_disposable(), window_toggle_closing_observer_strategy{disposable, state, subject}); + const auto ptr = state->on_new_subject(subject); + disposable->add(subject.get_disposable()); + state->get_closing(std::forward(v)).subscribe(subject.get_disposable(), window_toggle_closing_observer_strategy{disposable, state, subject.get_disposable(), ptr}); } void on_error(const std::exception_ptr& err) const { - disposable->dispose(); - - std::lock_guard lock{state->mutex}; - for (const auto& obs : state->observers) + const auto locked_state = state->get_state_under_lock(); + for (const auto& obs : locked_state->observers) obs.on_error(err); - state->observer.on_error(err); + locked_state->observer.on_error(err); } static void on_completed() {} @@ -118,35 +141,31 @@ class window_toggle_observer_strategy window_toggle_observer_strategy(TObserver&& observer, const TOpeningsObservable& openings, const TClosingsSelectorFn& closings) : m_state{std::make_shared(std::move(observer), closings)} { - m_state->observer.set_upstream(m_disposable->add_ref()); + m_state->get_state_under_lock()->observer.set_upstream(m_disposable->add_ref()); m_disposable->add(openings.subscribe_with_disposable(window_toggle_opening_observer_strategy{m_disposable, m_state})); } void on_next(const auto& v) const { - std::lock_guard lock{m_state->mutex}; - for (const auto& obs : m_state->observers) + const auto locked_state = m_state->get_state_under_lock(); + for (const auto& obs : locked_state->observers) obs.on_next(v); } void on_error(const std::exception_ptr& err) const { - m_disposable->dispose(); - - std::lock_guard lock{m_state->mutex}; - for (const auto& obs : m_state->observers) + const auto locked_state = m_state->get_state_under_lock(); + for (const auto& obs : locked_state->observers) obs.on_error(err); - m_state->observer.on_error(err); + locked_state->observer.on_error(err); } void on_completed() const { - m_disposable->dispose(); - - std::lock_guard lock{m_state->mutex}; - for (const auto& obs : m_state->observers) + const auto locked_state = m_state->get_state_under_lock(); + for (const auto& obs : locked_state->observers) obs.on_completed(); - m_state->observer.on_completed(); + locked_state->observer.on_completed(); } void set_upstream(const disposable_wrapper& d) const { m_disposable->add(d); } @@ -160,13 +179,25 @@ class window_toggle_observer_strategy template requires rpp::constraint::observable>> -struct window_toggle_t : public operators::details::operator_observable_strategy +struct window_toggle_t //: public operators::details::operator_observable_strategy { + RPP_NO_UNIQUE_ADDRESS TOpeningsObservable openings; + RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn closings_selector; + template - using result_value = windowed_observable; + using result_value = rpp::windowed_observable; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + + template + void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + { + // Need to take ownership over current_thread in case of inner-observables also using it + auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); + using expected_value = typename observable_chain_strategy::value_type; + observable_strategy.subscribe(rpp::observer, TOpeningsObservable, TClosingsSelectorFn>>{std::forward(observer), openings, closings_selector}); + } }; } @@ -176,6 +207,6 @@ template>> auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector) { - return details::window_toggle_t{std::forward(openings), std::forward(closings_selector)}; + return details::window_toggle_t, std::decay_t>{std::forward(openings), std::forward(closings_selector)}; } } // namespace rpp::operators \ No newline at end of file diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 441c6451b..1714cb076 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -140,7 +140,7 @@ struct with_latest_from_t template void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const { - // Need to take ownership over current_thread in case of inner-observables also uses them + // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); } diff --git a/src/rpp/rpp/subjects/details/base_subject.hpp b/src/rpp/rpp/subjects/details/base_subject.hpp index 35e3b5490..5ff7d710f 100644 --- a/src/rpp/rpp/subjects/details/base_subject.hpp +++ b/src/rpp/rpp/subjects/details/base_subject.hpp @@ -52,7 +52,7 @@ class base_subject return rpp::observable{m_strategy}; } - rpp::disposable_wrapper get_disposable() const + auto get_disposable() const { return m_strategy.get_disposable(); } diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp index 7cb50c0bd..93c592b2e 100644 --- a/src/rpp/rpp/subjects/fwd.hpp +++ b/src/rpp/rpp/subjects/fwd.hpp @@ -25,7 +25,7 @@ concept subject_strategy = requires(Strategy t, rpp::details::observers::fake_ob { {t.get_observer()} -> rpp::constraint::observer; t.on_subscribe(std::move(obs)); - {t.get_disposable() } -> rpp::constraint::decayed_same_as; + {t.get_disposable() } -> rpp::constraint::decayed_any_of; }; } template Strategy> diff --git a/src/rpp/rpp/utils/constraints.hpp b/src/rpp/rpp/utils/constraints.hpp index cd3b09993..22c96c8ab 100644 --- a/src/rpp/rpp/utils/constraints.hpp +++ b/src/rpp/rpp/utils/constraints.hpp @@ -24,6 +24,9 @@ concept decayed_type = std::same_as, T>; template concept any_of = (std::same_as || ...); +template +concept decayed_any_of = (decayed_same_as || ...); + template concept variadic_decayed_same_as = sizeof...(Types) == 1 && (decayed_same_as && ...); diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp new file mode 100644 index 000000000..31c5017b3 --- /dev/null +++ b/src/tests/rpp/test_window_toggle.cpp @@ -0,0 +1,116 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include + +#include "mock_observer.hpp" +#include "snitch_logging.hpp" + +TEST_CASE("window_toggle") +{ + mock_observer_strategy> mock{}; + std::vector> inner_mocks{}; + + auto subscribe_mocks = [&mock, &inner_mocks](auto&& observable) + { + observable.subscribe([&mock, &inner_mocks](const rpp::windowed_observable& observable) + { + mock.on_next(observable); + observable.subscribe(inner_mocks.emplace_back()); + }, + [&mock](const std::exception_ptr& err) { mock.on_error(err); }, + [&mock]() { mock.on_completed(); }); + }; + + SECTION("opening - just(1), closing - never()") + { + subscribe_mocks(rpp::source::just(1,2,3) + | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::never();})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 1); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_received_values() == std::vector{1,2,3}); + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + } + SECTION("opening - just(1), closing - empty()") + { + subscribe_mocks(rpp::source::just(1,2,3) + | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::empty();})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 1); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_received_values() == std::vector{}); + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + } + SECTION("opening - just(1,2,3), closing - empty()") + { + subscribe_mocks(rpp::source::just(1,2,3) + | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::empty();})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 3); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_received_values() == std::vector{}); + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + } + SECTION("opening - just(1,2,3), closing - never()") + { + subscribe_mocks(rpp::source::just(1,2,3) + | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::never();})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 3); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + CHECK(inner_mocks[0].get_received_values() == std::vector{1,2,3}); + CHECK(inner_mocks[1].get_received_values() == std::vector{2,3}); + CHECK(inner_mocks[2].get_received_values() == std::vector{3}); + } + SECTION("opening - just(1,2,3), closing - just(1)") + { + subscribe_mocks(rpp::source::just(1,2,3) + | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 3); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + CHECK(inner_mocks[0].get_received_values() == std::vector{1}); + CHECK(inner_mocks[1].get_received_values() == std::vector{2}); + CHECK(inner_mocks[2].get_received_values() == std::vector{3}); + } +} \ No newline at end of file From a49abd41ae1f7174721e8a5ab34f73f4dba8f443 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 14 Jan 2024 21:40:50 +0300 Subject: [PATCH 4/8] extend doc --- src/examples/rpp/doxygen/window.cpp | 2 +- src/examples/rpp/doxygen/window_toggle.cpp | 45 +++++++++++++++++++ .../observables/connectable_observable.hpp | 2 +- src/rpp/rpp/operators/window.hpp | 6 +-- src/rpp/rpp/operators/window_toggle.hpp | 33 +++++++++++++- src/tests/rpp/test_window.cpp | 16 +++---- src/tests/rpp/test_window_toggle.cpp | 4 +- 7 files changed, 91 insertions(+), 17 deletions(-) create mode 100644 src/examples/rpp/doxygen/window_toggle.cpp diff --git a/src/examples/rpp/doxygen/window.cpp b/src/examples/rpp/doxygen/window.cpp index fd66e1c76..a3d9f3d58 100644 --- a/src/examples/rpp/doxygen/window.cpp +++ b/src/examples/rpp/doxygen/window.cpp @@ -11,7 +11,7 @@ int main() //! [window] rpp::source::just(1, 2, 3, 4, 5) | rpp::operators::window(3) - | rpp::operators::subscribe([](const rpp::windowed_observable& v) + | rpp::operators::subscribe([](const rpp::window_observable& v) { std::cout << "\nNew observable " << std::endl; v.subscribe([](int v) {std::cout << v << " "; }); diff --git a/src/examples/rpp/doxygen/window_toggle.cpp b/src/examples/rpp/doxygen/window_toggle.cpp new file mode 100644 index 000000000..fd12540e5 --- /dev/null +++ b/src/examples/rpp/doxygen/window_toggle.cpp @@ -0,0 +1,45 @@ +#include + +#include + +/** + * @example window_toggle.cpp + **/ +int main() +{ + + //! [window_toggle] + size_t counter{}; + auto source = rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3, 4, 5) | rpp::operators::publish() | rpp::operators::ref_count(); + source + | rpp::operators::window_toggle(source, [source](int){ return source | rpp::ops::filter([](int v) { return v % 2 == 0; }); }) + | rpp::operators::subscribe([&counter](const rpp::window_toggle_observable& obs) + { + std::cout << "New observable " << ++counter << std::endl; + obs.subscribe([counter](int v) {std::cout << counter << ": " << v << " " << std::endl; }, [counter]() { std::cout << "closing " << counter << std::endl; }); + }); + // Output: + // New observable 1 + // 1: 1 + // New observable 2 + // 1: 2 + // 2: 2 + // closing 1 + // New observable 3 + // 2: 3 + // 3: 3 + // New observable 4 + // 2: 4 + // 3: 4 + // 4: 4 + // closing 2 + // closing 3 + // New observable 5 + // 4: 5 + // 5: 5 + // closing 4 + // closing 5 + //! [window_toggle] + + return 0; +} diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp index a73eada12..e770535c0 100644 --- a/src/rpp/rpp/observables/connectable_observable.hpp +++ b/src/rpp/rpp/observables/connectable_observable.hpp @@ -24,7 +24,7 @@ struct ref_count_on_subscribe_t; template struct ref_count_on_subscribe_t> { - rpp::connectable_observable original_observable{}; + rpp::connectable_observable original_observable; struct state_t { diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index 06475c76f..db1abbb92 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -22,7 +22,7 @@ namespace rpp { template -using windowed_observable = decltype(std::declval>().get_observable()); +using window_observable = decltype(std::declval>().get_observable()); } namespace rpp::operators::details { @@ -105,7 +105,7 @@ class window_observer_strategy struct window_t : public operators::details::operator_observable_strategy_different_types, size_t> { template - using result_value = windowed_observable; + using result_value = window_observable; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; @@ -115,7 +115,7 @@ struct window_t : public operators::details::operator_observable_strategy_differ namespace rpp::operators { /** - * @brief Subdivide original observable into sub-observables (windowed observables) and emit sub-observables of items instead of original items + * @brief Subdivide original observable into sub-observables (window observables) and emit sub-observables of items instead of original items * * @marble window { diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 04c0b2203..440ed008b 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -25,7 +25,7 @@ namespace rpp { template -using windowed_observable = decltype(std::declval>().get_observable()); +using window_toggle_observable = decltype(std::declval>().get_observable()); } namespace rpp::operators::details @@ -185,7 +185,7 @@ struct window_toggle_t //: public operators::details::operator_observable_strate RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn closings_selector; template - using result_value = rpp::windowed_observable; + using result_value = rpp::window_toggle_observable; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; @@ -203,6 +203,35 @@ struct window_toggle_t //: public operators::details::operator_observable_strate namespace rpp::operators { +/** + * @brief Subdivide original observable into sub-observables (window observables) and emit sub-observables of items instead of original items + * @details Values from `openings` observable used to specify moment when new window will be opened. `closings_selector` is used to obtain observable to specify moment when new window will be closed. + * + * @marble window_toggle + { + source observable : +-1-2-3-4-5-| + + operator "window(2)" : + { + .+1-2| + .....+3-4| + .........+5-| + } + } + * + * @details Actually it is similar to `buffer` but it emits observable instead of container. + * + * @param openings is observable which emissions used to start new window + * @param closings_selector is function which returns observable which emission/completion means closing of opened window + * + * @warning #include + * + * @par Example + * @snippet window_toggle.cpp window_toggle + * + * @ingroup transforming_operators + * @see https://reactivex.io/documentation/operators/window.html + */ template requires rpp::constraint::observable>> auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector) diff --git a/src/tests/rpp/test_window.cpp b/src/tests/rpp/test_window.cpp index e1e6d51ed..29f278e64 100644 --- a/src/tests/rpp/test_window.cpp +++ b/src/tests/rpp/test_window.cpp @@ -28,14 +28,14 @@ TEST_CASE("window subdivide observable into sub-observables") { SECTION("see 2 observables") { - auto mock = mock_observer_strategy>{}; + auto mock = mock_observer_strategy>{}; obs.subscribe(mock); CHECK(mock.get_total_on_next_count() == 2); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } - SECTION("first windowed observable emits first 2 values and completes") + SECTION("first window observable emits first 2 values and completes") { auto mock = mock_observer_strategy{}; size_t i = 0; @@ -49,7 +49,7 @@ TEST_CASE("window subdivide observable into sub-observables") CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } - SECTION("second windowed observable emits last 1 value and completes") + SECTION("second window observable emits last 1 value and completes") { auto mock = mock_observer_strategy{}; size_t i = 0; @@ -75,7 +75,7 @@ TEST_CASE("window subdivide observable into sub-observables") auto obs = subj.get_observable() | rpp::ops::window(2); SECTION("emit first item") { - auto mock = mock_observer_strategy>{}; + auto mock = mock_observer_strategy>{}; obs.subscribe(mock); CHECK(mock.get_total_on_next_count() == 0); @@ -84,7 +84,7 @@ TEST_CASE("window subdivide observable into sub-observables") subj.get_observer().on_next(1); - SECTION("see new windowed observable") + SECTION("see new window observable") { CHECK(mock.get_total_on_next_count() == 1); CHECK(mock.get_on_error_count() == 0); @@ -94,7 +94,7 @@ TEST_CASE("window subdivide observable into sub-observables") SECTION("emit second item") { subj.get_observer().on_next(2); - SECTION("no any new windowed observable") + SECTION("no any new window observable") { CHECK(mock.get_total_on_next_count() == 1); CHECK(mock.get_on_error_count() == 0); @@ -104,7 +104,7 @@ TEST_CASE("window subdivide observable into sub-observables") SECTION("emit third item") { subj.get_observer().on_next(3); - SECTION("new windowed observable") + SECTION("new window observable") { CHECK(mock.get_total_on_next_count() == 2); CHECK(mock.get_on_error_count() == 0); @@ -192,7 +192,7 @@ TEST_CASE("window disposes original disposable only when everything is disposed" auto inner_observer_disposable = std::make_shared(); obs | rpp::ops::window(2) - | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::windowed_observable& new_obs) + | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::window_observable& new_obs) { new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){}); }); diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index 31c5017b3..29a1aaeac 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -20,12 +20,12 @@ TEST_CASE("window_toggle") { - mock_observer_strategy> mock{}; + mock_observer_strategy> mock{}; std::vector> inner_mocks{}; auto subscribe_mocks = [&mock, &inner_mocks](auto&& observable) { - observable.subscribe([&mock, &inner_mocks](const rpp::windowed_observable& observable) + observable.subscribe([&mock, &inner_mocks](const rpp::window_toggle_observable& observable) { mock.on_next(observable); observable.subscribe(inner_mocks.emplace_back()); From 72adeea12bf73bf1464f8146125f427fb81dddbf Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 14 Jan 2024 21:42:01 +0300 Subject: [PATCH 5/8] extend test --- src/tests/rpp/test_window_toggle.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index 29a1aaeac..46b840ebf 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -16,8 +16,10 @@ #include #include "mock_observer.hpp" +#include "disposable_observable.hpp" #include "snitch_logging.hpp" + TEST_CASE("window_toggle") { mock_observer_strategy> mock{}; @@ -113,4 +115,9 @@ TEST_CASE("window_toggle") CHECK(inner_mocks[1].get_received_values() == std::vector{2}); CHECK(inner_mocks[2].get_received_values() == std::vector{3}); } +} + +TEST_CASE("window_toggle satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::just(1); })); } \ No newline at end of file From d4b2c02ac9b498b58ba205554f40a1cb48bb6d9c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 14 Jan 2024 21:44:03 +0300 Subject: [PATCH 6/8] extend tests --- src/tests/rpp/test_window_toggle.cpp | 36 ++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index 46b840ebf..96358b5bc 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -17,6 +17,7 @@ #include "mock_observer.hpp" #include "disposable_observable.hpp" +#include "rpp/schedulers/immediate.hpp" #include "snitch_logging.hpp" @@ -117,6 +118,41 @@ TEST_CASE("window_toggle") } } +TEST_CASE("window_toggle disposes original disposable only when everything is disposed") +{ + auto source_disposable = std::make_shared(); + auto obs = rpp::source::create([source_disposable](auto&& obs) + { + obs.set_upstream(source_disposable); + obs.on_next(1); + }); + + auto observer_disposable = std::make_shared(); + auto inner_observer_disposable = std::make_shared(); + obs + | rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never(); }) + | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::window_toggle_observable& new_obs) + { + new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){}); + }); + + CHECK(!source_disposable->is_disposed()); + CHECK(!observer_disposable->is_disposed()); + CHECK(!inner_observer_disposable->is_disposed()); + + observer_disposable->dispose(); + + CHECK(!source_disposable->is_disposed()); + CHECK(observer_disposable->is_disposed()); + CHECK(!inner_observer_disposable->is_disposed()); + + inner_observer_disposable->dispose(); + + CHECK(source_disposable->is_disposed()); + CHECK(observer_disposable->is_disposed()); + CHECK(inner_observer_disposable->is_disposed()); +} + TEST_CASE("window_toggle satisfies disposable contracts") { test_operator_with_disposable(rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::just(1); })); From 0932a99d49775f5a2c5738064f036435e3b53ea1 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 14 Jan 2024 21:45:08 +0300 Subject: [PATCH 7/8] extend tests --- src/tests/rpp/test_window_toggle.cpp | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index 96358b5bc..b936a8208 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -26,7 +26,7 @@ TEST_CASE("window_toggle") mock_observer_strategy> mock{}; std::vector> inner_mocks{}; - auto subscribe_mocks = [&mock, &inner_mocks](auto&& observable) + auto subscribe_mocks = [&mock, &inner_mocks](const auto& observable) { observable.subscribe([&mock, &inner_mocks](const rpp::window_toggle_observable& observable) { @@ -116,6 +116,24 @@ TEST_CASE("window_toggle") CHECK(inner_mocks[1].get_received_values() == std::vector{2}); CHECK(inner_mocks[2].get_received_values() == std::vector{3}); } + SECTION("opening - never(), closing - just(1)") + { + subscribe_mocks(rpp::source::never() + | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 0); + } + SECTION("opening - empty(), closing - just(1)") + { + subscribe_mocks(rpp::source::empty() + | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 0); + } } TEST_CASE("window_toggle disposes original disposable only when everything is disposed") From c7aba626631c2c7119355fc3c40bcec6eff61370 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sun, 14 Jan 2024 21:52:29 +0300 Subject: [PATCH 8/8] extend tests --- src/tests/rpp/test_window_toggle.cpp | 54 ++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index b936a8208..106b5dfd9 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include "mock_observer.hpp" #include "disposable_observable.hpp" @@ -122,8 +123,13 @@ TEST_CASE("window_toggle") | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); CHECK(mock.get_on_error_count() == 0); - CHECK(mock.get_on_completed_count() == 1); - REQUIRE(inner_mocks.size() == 0); + CHECK(mock.get_on_completed_count() == 0); + REQUIRE(inner_mocks.size() == 3); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } } SECTION("opening - empty(), closing - just(1)") { @@ -134,6 +140,50 @@ TEST_CASE("window_toggle") CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 0); } + SECTION("source - error") + { + subscribe_mocks(rpp::source::error({}) + | rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never(); })); + + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + SECTION("openings - error") + { + subscribe_mocks(rpp::source::never() + | rpp::ops::window_toggle(rpp::source::error({}), [](int){return rpp::source::never(); })); + + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + SECTION("openings - just(1), closings - error") + { + subscribe_mocks(rpp::source::never() + | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::error({}); })); + + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + REQUIRE(inner_mocks.size() == 1); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_on_error_count() == 1); + CHECK(inner.get_on_completed_count() == 0); + } + } + SECTION("openings - just(1), closings - throw") + { + subscribe_mocks(rpp::source::never() + | rpp::ops::window_toggle(rpp::source::just(1), [](int){ throw std::runtime_error{""}; return rpp::source::error({}); })); + + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + REQUIRE(inner_mocks.size() == 1); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_on_error_count() == 1); + CHECK(inner.get_on_completed_count() == 0); + } + } } TEST_CASE("window_toggle disposes original disposable only when everything is disposed")