diff --git a/src/examples/rpp/doxygen/window.cpp b/src/examples/rpp/doxygen/window.cpp index fd66e1c76..a3d9f3d58 100644 --- a/src/examples/rpp/doxygen/window.cpp +++ b/src/examples/rpp/doxygen/window.cpp @@ -11,7 +11,7 @@ int main() //! [window] rpp::source::just(1, 2, 3, 4, 5) | rpp::operators::window(3) - | rpp::operators::subscribe([](const rpp::windowed_observable& v) + | rpp::operators::subscribe([](const rpp::window_observable& v) { std::cout << "\nNew observable " << std::endl; v.subscribe([](int v) {std::cout << v << " "; }); diff --git a/src/examples/rpp/doxygen/window_toggle.cpp b/src/examples/rpp/doxygen/window_toggle.cpp new file mode 100644 index 000000000..fd12540e5 --- /dev/null +++ b/src/examples/rpp/doxygen/window_toggle.cpp @@ -0,0 +1,45 @@ +#include + +#include + +/** + * @example window_toggle.cpp + **/ +int main() +{ + + //! [window_toggle] + size_t counter{}; + auto source = rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3, 4, 5) | rpp::operators::publish() | rpp::operators::ref_count(); + source + | rpp::operators::window_toggle(source, [source](int){ return source | rpp::ops::filter([](int v) { return v % 2 == 0; }); }) + | rpp::operators::subscribe([&counter](const rpp::window_toggle_observable& obs) + { + std::cout << "New observable " << ++counter << std::endl; + obs.subscribe([counter](int v) {std::cout << counter << ": " << v << " " << std::endl; }, [counter]() { std::cout << "closing " << counter << std::endl; }); + }); + // Output: + // New observable 1 + // 1: 1 + // New observable 2 + // 1: 2 + // 2: 2 + // closing 1 + // New observable 3 + // 2: 3 + // 3: 3 + // New observable 4 + // 2: 4 + // 3: 4 + // 4: 4 + // closing 2 + // closing 3 + // New observable 5 + // 4: 5 + // 5: 5 + // closing 4 + // closing 5 + //! [window_toggle] + + return 0; +} diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp index a73eada12..e770535c0 100644 --- a/src/rpp/rpp/observables/connectable_observable.hpp +++ b/src/rpp/rpp/observables/connectable_observable.hpp @@ -24,7 +24,7 @@ struct ref_count_on_subscribe_t; template struct ref_count_on_subscribe_t> { - rpp::connectable_observable original_observable{}; + rpp::connectable_observable original_observable; struct state_t { diff --git a/src/rpp/rpp/observables/observable.hpp b/src/rpp/rpp/observables/observable.hpp index 085de06d3..eef06500d 100644 --- a/src/rpp/rpp/observables/observable.hpp +++ b/src/rpp/rpp/observables/observable.hpp @@ -142,10 +142,24 @@ class observable [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(observer&& observer) const { if (!observer.is_disposed()) - m_strategy.subscribe(rpp::composite_disposable_wrapper{std::make_shared>()}, std::move(observer)); + return m_strategy.subscribe(rpp::composite_disposable_wrapper{std::make_shared>()}, std::move(observer)); return {}; } + /** + * @brief Subscribes observer strategy to emissions from this observable. + * + * @details This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed. + * @warning This overloading has some performance penalties, use it only when you really need to use disposable + * @return composite_disposable_wrapper is disposable to be able to dispose observer when it needed + */ + template ObserverStrategy> + requires (!constraint::observer) + [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(ObserverStrategy&& observer_strategy) const + { + return subscribe(rpp::composite_disposable_wrapper{std::make_shared>()}, std::forward(observer_strategy)); + } + /** * @brief Subscribe passed observer to emissions from this observable. * diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index d41d3dcd8..0b19cccd1 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -37,6 +37,7 @@ #include #include #include +#include /** * @defgroup filtering_operators Filtering Operators diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 2cf6eb287..64d8e0c04 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -112,7 +112,7 @@ struct combine_latest_t template void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const { - // Need to take ownership over current_thread in case of inner-observables also uses them + // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); } diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index 5605c1b3e..6a285e918 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -58,9 +58,9 @@ class forwarding_strategy m_state->on_subscribe(std::forward(observer)); } - rpp::disposable_wrapper get_disposable() const + rpp::composite_disposable_wrapper get_disposable() const { - return rpp::disposable_wrapper::from_weak(m_state); + return rpp::composite_disposable_wrapper::from_weak(m_state); } private: diff --git a/src/rpp/rpp/operators/details/strategy.hpp b/src/rpp/rpp/operators/details/strategy.hpp index 34a5f4a7d..ee5737d1a 100644 --- a/src/rpp/rpp/operators/details/strategy.hpp +++ b/src/rpp/rpp/operators/details/strategy.hpp @@ -21,9 +21,6 @@ #include #include -#include -#include - namespace rpp::operators::details { template diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 4202864e4..9a78fe015 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -148,6 +148,10 @@ template + requires rpp::constraint::observable>> +auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector); } // namespace rpp::operators namespace rpp diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index b4c9a24a8..61d7f59a3 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -134,7 +134,7 @@ struct merge_t template void subscribe(Observer&& observer, const observable_chain_strategy& strategy) const { - // Need to take ownership over current_thread in case of inner-observables also uses them + // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); using InnerObservable = typename observable_chain_strategy::value_type; @@ -160,7 +160,7 @@ struct merge_with_t { merge_observer_strategy> strategy{std::forward(observer)}; - // Need to take ownership over current_thread in case of inner-observables also uses them + // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); strategy.on_next(observable_strategy); diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 5808df8eb..f7b7a90b3 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -101,7 +101,7 @@ struct take_until_t auto d = std::make_shared>>(std::forward(observer)); d->get_observer()->set_upstream(rpp::disposable_wrapper::from_weak(d)); - // Need to take ownership over current_thread in case of inner-observables also uses them + // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); observable.subscribe(take_until_throttle_observer_strategy>{d}); diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index 06475c76f..db1abbb92 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -22,7 +22,7 @@ namespace rpp { template -using windowed_observable = decltype(std::declval>().get_observable()); +using window_observable = decltype(std::declval>().get_observable()); } namespace rpp::operators::details { @@ -105,7 +105,7 @@ class window_observer_strategy struct window_t : public operators::details::operator_observable_strategy_different_types, size_t> { template - using result_value = windowed_observable; + using result_value = window_observable; template using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; @@ -115,7 +115,7 @@ struct window_t : public operators::details::operator_observable_strategy_differ namespace rpp::operators { /** - * @brief Subdivide original observable into sub-observables (windowed observables) and emit sub-observables of items instead of original items + * @brief Subdivide original observable into sub-observables (window observables) and emit sub-observables of items instead of original items * * @marble window { diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp new file mode 100644 index 000000000..440ed008b --- /dev/null +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -0,0 +1,241 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include + +#include + +namespace rpp +{ +template +using window_toggle_observable = decltype(std::declval>().get_observable()); +} + +namespace rpp::operators::details +{ +template +struct window_toggle_state +{ + using Observable = rpp::utils::extract_observer_type_t; + using value_type = rpp::utils::extract_observable_type_t; + using Subject = forwarding_subject; + + static_assert(std::same_as().get_observable())>); + + struct state_t + { + RPP_NO_UNIQUE_ADDRESS TObserver observer; + mutable std::list().get_observer())> observers{}; + }; + + window_toggle_state(TObserver&& observer, const TClosingsSelectorFn& closings) + : m_state{state_t{std::move(observer)}} + , m_closings{closings} + { + } + + rpp::operators::details::pointer_under_lock get_state_under_lock() { return rpp::operators::details::pointer_under_lock{m_state}; } + + template + auto get_closing(T&& v) const {return m_closings(std::forward(v)); } + + auto on_new_subject(const Subject& subject) + { + const auto locked_state = get_state_under_lock(); + const auto ptr = &locked_state->observers.emplace_back(subject.get_observer()); + locked_state->observer.on_next(subject.get_observable()); + return ptr; + } + +private: + rpp::operators::details::value_with_mutex m_state{}; + RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings; +}; + +template +struct window_toggle_closing_observer_strategy +{ + std::shared_ptr disposable; + std::shared_ptr state; + rpp::disposable_wrapper subject_disposable; + decltype(std::declval().on_new_subject(std::declval())) ptr; + + void on_next(const auto&) const + { + on_completed(); + } + + void on_error(const std::exception_ptr& err) const + { + const auto locked_state = state->get_state_under_lock(); + for (const auto& obs : locked_state->observers) + obs.on_error(err); + locked_state->observer.on_error(err); + } + + void on_completed() const + { + disposable->remove(subject_disposable); + ptr->on_completed(); + + const auto locked_state= state->get_state_under_lock(); + locked_state->observers.remove_if([ptr=ptr](const auto& obs) { return &obs == ptr; }); + } + static void set_upstream(const disposable_wrapper&) { } + static bool is_disposed() { return false; } +}; + +template +struct window_toggle_opening_observer_strategy +{ + std::shared_ptr disposable; + std::shared_ptr state; + + template + void on_next(T&& v) const + { + typename TState::Subject subject{disposable}; + const auto ptr = state->on_new_subject(subject); + disposable->add(subject.get_disposable()); + state->get_closing(std::forward(v)).subscribe(subject.get_disposable(), window_toggle_closing_observer_strategy{disposable, state, subject.get_disposable(), ptr}); + } + + void on_error(const std::exception_ptr& err) const + { + const auto locked_state = state->get_state_under_lock(); + for (const auto& obs : locked_state->observers) + obs.on_error(err); + locked_state->observer.on_error(err); + } + + static void on_completed() {} + static void set_upstream(const disposable_wrapper&) {} + static bool is_disposed() { return false; } +}; + +template + requires rpp::constraint::observable>> +class window_toggle_observer_strategy +{ + using TState = window_toggle_state; +public: + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + window_toggle_observer_strategy(TObserver&& observer, const TOpeningsObservable& openings, const TClosingsSelectorFn& closings) + : m_state{std::make_shared(std::move(observer), closings)} + { + m_state->get_state_under_lock()->observer.set_upstream(m_disposable->add_ref()); + m_disposable->add(openings.subscribe_with_disposable(window_toggle_opening_observer_strategy{m_disposable, m_state})); + } + + void on_next(const auto& v) const + { + const auto locked_state = m_state->get_state_under_lock(); + for (const auto& obs : locked_state->observers) + obs.on_next(v); + } + + void on_error(const std::exception_ptr& err) const + { + const auto locked_state = m_state->get_state_under_lock(); + for (const auto& obs : locked_state->observers) + obs.on_error(err); + locked_state->observer.on_error(err); + } + + void on_completed() const + { + const auto locked_state = m_state->get_state_under_lock(); + for (const auto& obs : locked_state->observers) + obs.on_completed(); + locked_state->observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) const { m_disposable->add(d); } + + bool is_disposed() const { return m_disposable->is_disposed(); } + +private: + std::shared_ptr m_disposable = std::make_shared(); + std::shared_ptr m_state; +}; + +template + requires rpp::constraint::observable>> +struct window_toggle_t //: public operators::details::operator_observable_strategy +{ + RPP_NO_UNIQUE_ADDRESS TOpeningsObservable openings; + RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn closings_selector; + + template + using result_value = rpp::window_toggle_observable; + + template + using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + + template + void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const + { + // Need to take ownership over current_thread in case of inner-observables also using it + auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); + using expected_value = typename observable_chain_strategy::value_type; + observable_strategy.subscribe(rpp::observer, TOpeningsObservable, TClosingsSelectorFn>>{std::forward(observer), openings, closings_selector}); + } +}; +} + +namespace rpp::operators +{ +/** + * @brief Subdivide original observable into sub-observables (window observables) and emit sub-observables of items instead of original items + * @details Values from `openings` observable used to specify moment when new window will be opened. `closings_selector` is used to obtain observable to specify moment when new window will be closed. + * + * @marble window_toggle + { + source observable : +-1-2-3-4-5-| + + operator "window(2)" : + { + .+1-2| + .....+3-4| + .........+5-| + } + } + * + * @details Actually it is similar to `buffer` but it emits observable instead of container. + * + * @param openings is observable which emissions used to start new window + * @param closings_selector is function which returns observable which emission/completion means closing of opened window + * + * @warning #include + * + * @par Example + * @snippet window_toggle.cpp window_toggle + * + * @ingroup transforming_operators + * @see https://reactivex.io/documentation/operators/window.html + */ +template + requires rpp::constraint::observable>> +auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector) +{ + return details::window_toggle_t, std::decay_t>{std::forward(openings), std::forward(closings_selector)}; +} +} // namespace rpp::operators \ No newline at end of file diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 441c6451b..1714cb076 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -140,7 +140,7 @@ struct with_latest_from_t template void subscribe(Observer&& observer, const observable_chain_strategy& observable_strategy) const { - // Need to take ownership over current_thread in case of inner-observables also uses them + // Need to take ownership over current_thread in case of inner-observables also using it auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned(); observables.apply(&subscribe_impl, std::forward(observer), observable_strategy, selector); } diff --git a/src/rpp/rpp/subjects/details/base_subject.hpp b/src/rpp/rpp/subjects/details/base_subject.hpp index 35e3b5490..5ff7d710f 100644 --- a/src/rpp/rpp/subjects/details/base_subject.hpp +++ b/src/rpp/rpp/subjects/details/base_subject.hpp @@ -52,7 +52,7 @@ class base_subject return rpp::observable{m_strategy}; } - rpp::disposable_wrapper get_disposable() const + auto get_disposable() const { return m_strategy.get_disposable(); } diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp index 7cb50c0bd..93c592b2e 100644 --- a/src/rpp/rpp/subjects/fwd.hpp +++ b/src/rpp/rpp/subjects/fwd.hpp @@ -25,7 +25,7 @@ concept subject_strategy = requires(Strategy t, rpp::details::observers::fake_ob { {t.get_observer()} -> rpp::constraint::observer; t.on_subscribe(std::move(obs)); - {t.get_disposable() } -> rpp::constraint::decayed_same_as; + {t.get_disposable() } -> rpp::constraint::decayed_any_of; }; } template Strategy> diff --git a/src/rpp/rpp/utils/constraints.hpp b/src/rpp/rpp/utils/constraints.hpp index cd3b09993..22c96c8ab 100644 --- a/src/rpp/rpp/utils/constraints.hpp +++ b/src/rpp/rpp/utils/constraints.hpp @@ -24,6 +24,9 @@ concept decayed_type = std::same_as, T>; template concept any_of = (std::same_as || ...); +template +concept decayed_any_of = (decayed_same_as || ...); + template concept variadic_decayed_same_as = sizeof...(Types) == 1 && (decayed_same_as && ...); diff --git a/src/tests/rpp/test_window.cpp b/src/tests/rpp/test_window.cpp index e1e6d51ed..29f278e64 100644 --- a/src/tests/rpp/test_window.cpp +++ b/src/tests/rpp/test_window.cpp @@ -28,14 +28,14 @@ TEST_CASE("window subdivide observable into sub-observables") { SECTION("see 2 observables") { - auto mock = mock_observer_strategy>{}; + auto mock = mock_observer_strategy>{}; obs.subscribe(mock); CHECK(mock.get_total_on_next_count() == 2); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } - SECTION("first windowed observable emits first 2 values and completes") + SECTION("first window observable emits first 2 values and completes") { auto mock = mock_observer_strategy{}; size_t i = 0; @@ -49,7 +49,7 @@ TEST_CASE("window subdivide observable into sub-observables") CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); } - SECTION("second windowed observable emits last 1 value and completes") + SECTION("second window observable emits last 1 value and completes") { auto mock = mock_observer_strategy{}; size_t i = 0; @@ -75,7 +75,7 @@ TEST_CASE("window subdivide observable into sub-observables") auto obs = subj.get_observable() | rpp::ops::window(2); SECTION("emit first item") { - auto mock = mock_observer_strategy>{}; + auto mock = mock_observer_strategy>{}; obs.subscribe(mock); CHECK(mock.get_total_on_next_count() == 0); @@ -84,7 +84,7 @@ TEST_CASE("window subdivide observable into sub-observables") subj.get_observer().on_next(1); - SECTION("see new windowed observable") + SECTION("see new window observable") { CHECK(mock.get_total_on_next_count() == 1); CHECK(mock.get_on_error_count() == 0); @@ -94,7 +94,7 @@ TEST_CASE("window subdivide observable into sub-observables") SECTION("emit second item") { subj.get_observer().on_next(2); - SECTION("no any new windowed observable") + SECTION("no any new window observable") { CHECK(mock.get_total_on_next_count() == 1); CHECK(mock.get_on_error_count() == 0); @@ -104,7 +104,7 @@ TEST_CASE("window subdivide observable into sub-observables") SECTION("emit third item") { subj.get_observer().on_next(3); - SECTION("new windowed observable") + SECTION("new window observable") { CHECK(mock.get_total_on_next_count() == 2); CHECK(mock.get_on_error_count() == 0); @@ -192,7 +192,7 @@ TEST_CASE("window disposes original disposable only when everything is disposed" auto inner_observer_disposable = std::make_shared(); obs | rpp::ops::window(2) - | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::windowed_observable& new_obs) + | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::window_observable& new_obs) { new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){}); }); diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp new file mode 100644 index 000000000..106b5dfd9 --- /dev/null +++ b/src/tests/rpp/test_window_toggle.cpp @@ -0,0 +1,227 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include +#include + +#include "mock_observer.hpp" +#include "disposable_observable.hpp" +#include "rpp/schedulers/immediate.hpp" +#include "snitch_logging.hpp" + + +TEST_CASE("window_toggle") +{ + mock_observer_strategy> mock{}; + std::vector> inner_mocks{}; + + auto subscribe_mocks = [&mock, &inner_mocks](const auto& observable) + { + observable.subscribe([&mock, &inner_mocks](const rpp::window_toggle_observable& observable) + { + mock.on_next(observable); + observable.subscribe(inner_mocks.emplace_back()); + }, + [&mock](const std::exception_ptr& err) { mock.on_error(err); }, + [&mock]() { mock.on_completed(); }); + }; + + SECTION("opening - just(1), closing - never()") + { + subscribe_mocks(rpp::source::just(1,2,3) + | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::never();})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 1); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_received_values() == std::vector{1,2,3}); + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + } + SECTION("opening - just(1), closing - empty()") + { + subscribe_mocks(rpp::source::just(1,2,3) + | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::empty();})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 1); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_received_values() == std::vector{}); + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + } + SECTION("opening - just(1,2,3), closing - empty()") + { + subscribe_mocks(rpp::source::just(1,2,3) + | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::empty();})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 3); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_received_values() == std::vector{}); + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + } + SECTION("opening - just(1,2,3), closing - never()") + { + subscribe_mocks(rpp::source::just(1,2,3) + | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::never();})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 3); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + CHECK(inner_mocks[0].get_received_values() == std::vector{1,2,3}); + CHECK(inner_mocks[1].get_received_values() == std::vector{2,3}); + CHECK(inner_mocks[2].get_received_values() == std::vector{3}); + } + SECTION("opening - just(1,2,3), closing - just(1)") + { + subscribe_mocks(rpp::source::just(1,2,3) + | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 3); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + CHECK(inner_mocks[0].get_received_values() == std::vector{1}); + CHECK(inner_mocks[1].get_received_values() == std::vector{2}); + CHECK(inner_mocks[2].get_received_values() == std::vector{3}); + } + SECTION("opening - never(), closing - just(1)") + { + subscribe_mocks(rpp::source::never() + | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + REQUIRE(inner_mocks.size() == 3); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_on_error_count() == 0); + CHECK(inner.get_on_completed_count() == 1); + } + } + SECTION("opening - empty(), closing - just(1)") + { + subscribe_mocks(rpp::source::empty() + | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); + + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + REQUIRE(inner_mocks.size() == 0); + } + SECTION("source - error") + { + subscribe_mocks(rpp::source::error({}) + | rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never(); })); + + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + SECTION("openings - error") + { + subscribe_mocks(rpp::source::never() + | rpp::ops::window_toggle(rpp::source::error({}), [](int){return rpp::source::never(); })); + + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + SECTION("openings - just(1), closings - error") + { + subscribe_mocks(rpp::source::never() + | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::error({}); })); + + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + REQUIRE(inner_mocks.size() == 1); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_on_error_count() == 1); + CHECK(inner.get_on_completed_count() == 0); + } + } + SECTION("openings - just(1), closings - throw") + { + subscribe_mocks(rpp::source::never() + | rpp::ops::window_toggle(rpp::source::just(1), [](int){ throw std::runtime_error{""}; return rpp::source::error({}); })); + + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + REQUIRE(inner_mocks.size() == 1); + for (const auto& inner : inner_mocks) + { + CHECK(inner.get_on_error_count() == 1); + CHECK(inner.get_on_completed_count() == 0); + } + } +} + +TEST_CASE("window_toggle disposes original disposable only when everything is disposed") +{ + auto source_disposable = std::make_shared(); + auto obs = rpp::source::create([source_disposable](auto&& obs) + { + obs.set_upstream(source_disposable); + obs.on_next(1); + }); + + auto observer_disposable = std::make_shared(); + auto inner_observer_disposable = std::make_shared(); + obs + | rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never(); }) + | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::window_toggle_observable& new_obs) + { + new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){}); + }); + + CHECK(!source_disposable->is_disposed()); + CHECK(!observer_disposable->is_disposed()); + CHECK(!inner_observer_disposable->is_disposed()); + + observer_disposable->dispose(); + + CHECK(!source_disposable->is_disposed()); + CHECK(observer_disposable->is_disposed()); + CHECK(!inner_observer_disposable->is_disposed()); + + inner_observer_disposable->dispose(); + + CHECK(source_disposable->is_disposed()); + CHECK(observer_disposable->is_disposed()); + CHECK(inner_observer_disposable->is_disposed()); +} + +TEST_CASE("window_toggle satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::just(1); })); +} \ No newline at end of file