diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 78ca57b10..72e64491b 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -41,8 +41,8 @@ struct window_toggle_state struct state_t { - RPP_NO_UNIQUE_ADDRESS TObserver observer; - mutable std::list().get_observer())> observers{}; + RPP_NO_UNIQUE_ADDRESS TObserver observer; + std::list().get_observer())> observers{}; }; window_toggle_state(TObserver&& observer, const TClosingsSelectorFn& closings) @@ -58,10 +58,10 @@ struct window_toggle_state auto on_new_subject(const Subject& subject) { - const auto locked_state = get_state_under_lock(); - auto ptr = &locked_state->observers.emplace_back(subject.get_observer()); + auto locked_state = get_state_under_lock(); + const auto itr = locked_state->observers.insert(locked_state->observers.cend(), subject.get_observer()); locked_state->observer.on_next(subject.get_observable()); - return ptr; + return itr; } private: @@ -72,10 +72,12 @@ struct window_toggle_state template struct window_toggle_closing_observer_strategy { - std::shared_ptr disposable; - std::shared_ptr state; - rpp::disposable_wrapper subject_disposable; - decltype(std::declval().on_new_subject(std::declval())) ptr; + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr disposable; + std::shared_ptr state; + rpp::composite_disposable_wrapper this_disposable; + decltype(std::declval().on_new_subject(std::declval())) itr; void on_next(const auto&) const { @@ -84,7 +86,7 @@ struct window_toggle_closing_observer_strategy void on_error(const std::exception_ptr& err) const { - const auto locked_state = state->get_state_under_lock(); + auto locked_state = state->get_state_under_lock(); for (const auto& obs : locked_state->observers) obs.on_error(err); locked_state->observer.on_error(err); @@ -92,14 +94,17 @@ struct window_toggle_closing_observer_strategy void on_completed() const { - disposable->remove(subject_disposable); - ptr->on_completed(); - - const auto locked_state= state->get_state_under_lock(); - locked_state->observers.remove_if([ptr=ptr](const auto& obs) { return &obs == ptr; }); + disposable->remove(this_disposable); + + itr->on_completed(); + this_disposable.dispose(); + + auto locked_state= state->get_state_under_lock(); + locked_state->observers.erase(itr); } - static void set_upstream(const disposable_wrapper&) { } - static bool is_disposed() { return false; } + + void set_upstream(const disposable_wrapper& d) const { this_disposable.add(d); } + bool is_disposed() const { return this_disposable.is_disposed(); } }; template @@ -112,9 +117,10 @@ struct window_toggle_opening_observer_strategy void on_next(T&& v) const { typename TState::Subject subject{disposable->wrapper_from_this()}; - auto ptr = state->on_new_subject(subject); + const auto itr = state->on_new_subject(subject); + disposable->add(subject.get_disposable()); - state->get_closing(std::forward(v)).subscribe(subject.get_disposable(), window_toggle_closing_observer_strategy{disposable, state, subject.get_disposable(), std::move(ptr)}); + state->get_closing(std::forward(v)).subscribe(window_toggle_closing_observer_strategy{disposable, state, subject.get_disposable(), itr}); } void on_error(const std::exception_ptr& err) const diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index 68747fe7c..a0765d11f 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -25,7 +25,7 @@ TEST_CASE("window_toggle") { mock_observer_strategy> mock{}; - std::vector> inner_mocks{}; + std::vector> inner_mocks{}; auto subscribe_mocks = [&mock, &inner_mocks](const auto& observable) { @@ -40,9 +40,9 @@ TEST_CASE("window_toggle") SECTION("opening - just(1), closing - never()") { - subscribe_mocks(rpp::source::just(1,2,3) + subscribe_mocks(rpp::source::just(1,2,3) | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::never();})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 1); @@ -55,9 +55,9 @@ TEST_CASE("window_toggle") } SECTION("opening - just(1), closing - empty()") { - subscribe_mocks(rpp::source::just(1,2,3) + subscribe_mocks(rpp::source::just(1,2,3) | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::empty();})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 1); @@ -70,9 +70,9 @@ TEST_CASE("window_toggle") } SECTION("opening - just(1,2,3), closing - empty()") { - subscribe_mocks(rpp::source::just(1,2,3) + subscribe_mocks(rpp::source::just(1,2,3) | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::empty();})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 3); @@ -85,9 +85,9 @@ TEST_CASE("window_toggle") } SECTION("opening - just(1,2,3), closing - never()") { - subscribe_mocks(rpp::source::just(1,2,3) + subscribe_mocks(rpp::source::just(1,2,3) | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::never();})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 3); @@ -102,9 +102,9 @@ TEST_CASE("window_toggle") } SECTION("opening - just(1,2,3), closing - just(1)") { - subscribe_mocks(rpp::source::just(1,2,3) + subscribe_mocks(rpp::source::just(1,2,3) | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 3); @@ -119,9 +119,9 @@ TEST_CASE("window_toggle") } SECTION("opening - never(), closing - just(1)") { - subscribe_mocks(rpp::source::never() + subscribe_mocks(rpp::source::never() | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); REQUIRE(inner_mocks.size() == 3); @@ -133,34 +133,34 @@ TEST_CASE("window_toggle") } SECTION("opening - empty(), closing - just(1)") { - subscribe_mocks(rpp::source::empty() + subscribe_mocks(rpp::source::empty() | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 0); } SECTION("source - error") { - subscribe_mocks(rpp::source::error({}) + subscribe_mocks(rpp::source::error({}) | rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never(); })); - + CHECK(mock.get_on_error_count() == 1); CHECK(mock.get_on_completed_count() == 0); } SECTION("openings - error") { - subscribe_mocks(rpp::source::never() + subscribe_mocks(rpp::source::never() | rpp::ops::window_toggle(rpp::source::error({}), [](int){return rpp::source::never(); })); - + CHECK(mock.get_on_error_count() == 1); CHECK(mock.get_on_completed_count() == 0); } SECTION("openings - just(1), closings - error") { - subscribe_mocks(rpp::source::never() + subscribe_mocks(rpp::source::never() | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::error({}); })); - + CHECK(mock.get_on_error_count() == 1); CHECK(mock.get_on_completed_count() == 0); REQUIRE(inner_mocks.size() == 1); @@ -172,9 +172,9 @@ TEST_CASE("window_toggle") } SECTION("openings - just(1), closings - throw") { - subscribe_mocks(rpp::source::never() + subscribe_mocks(rpp::source::never() | rpp::ops::window_toggle(rpp::source::just(1), [](int){ throw std::runtime_error{""}; return rpp::source::error({}); })); - + CHECK(mock.get_on_error_count() == 1); CHECK(mock.get_on_completed_count() == 0); REQUIRE(inner_mocks.size() == 1); @@ -188,36 +188,49 @@ TEST_CASE("window_toggle") TEST_CASE("window_toggle disposes original disposable only when everything is disposed") { - auto source_disposable = rpp::composite_disposable_wrapper::make(); - auto obs = rpp::source::create([source_disposable](auto&& obs) + + auto make_observable = [](auto d, bool emit = true) { - obs.set_upstream(source_disposable); - obs.on_next(1); - }); + return rpp::source::create([d, emit](auto&& obs) + { + obs.set_upstream(d); + if (emit) + obs.on_next(1); + }); + }; + + auto source_disposable = rpp::composite_disposable_wrapper::make(); + auto opening_disposable = rpp::composite_disposable_wrapper::make(); + auto closing_disposable = rpp::composite_disposable_wrapper::make(); auto observer_disposable = rpp::composite_disposable_wrapper::make(); auto inner_observer_disposable = rpp::composite_disposable_wrapper::make(); - obs - | rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never(); }) + make_observable(source_disposable) + | rpp::ops::window_toggle(make_observable(opening_disposable) , [&](int){return make_observable(closing_disposable, false); }) | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::window_toggle_observable& new_obs) { new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){}); }); + CHECK(closing_disposable.is_disposed()); CHECK(!source_disposable.is_disposed()); CHECK(!observer_disposable.is_disposed()); CHECK(!inner_observer_disposable.is_disposed()); observer_disposable.dispose(); - CHECK(!source_disposable.is_disposed()); CHECK(observer_disposable.is_disposed()); + CHECK(!opening_disposable.is_disposed()); + CHECK(closing_disposable.is_disposed()); + CHECK(!source_disposable.is_disposed()); CHECK(!inner_observer_disposable.is_disposed()); inner_observer_disposable.dispose(); CHECK(source_disposable.is_disposed()); CHECK(observer_disposable.is_disposed()); + CHECK(opening_disposable.is_disposed()); + CHECK(closing_disposable.is_disposed()); CHECK(inner_observer_disposable.is_disposed()); }