From 14af7e8cee6d7d04cc7a1b1fc40bfcd07a6b2a8a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 15 Jan 2024 00:17:56 +0300 Subject: [PATCH 1/9] extend tests --- src/tests/rpp/test_window_toggle.cpp | 30 ++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index 68747fe7c..9dfa4c642 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -188,36 +188,50 @@ TEST_CASE("window_toggle") TEST_CASE("window_toggle disposes original disposable only when everything is disposed") { - auto source_disposable = rpp::composite_disposable_wrapper::make(); - auto obs = rpp::source::create([source_disposable](auto&& obs) + + auto make_observable = [](auto d, bool emit = true) { - obs.set_upstream(source_disposable); - obs.on_next(1); - }); + return rpp::source::create([d, emit](auto&& obs) + { + obs.set_upstream(d); + if (emit) + obs.on_next(1); + }); + }; + + auto source_disposable = std::make_shared(); + auto opening_disposable = std::make_shared(); + auto closing_disposable = std::make_shared(); + auto observer_disposable = rpp::composite_disposable_wrapper::make(); auto inner_observer_disposable = rpp::composite_disposable_wrapper::make(); - obs - | rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never(); }) + make_observable(source_disposable) + | rpp::ops::window_toggle(make_observable(opening_disposable) , [&](int){return make_observable(closing_disposable, false); }) | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::window_toggle_observable& new_obs) { new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){}); }); + CHECK(!closing_disposable.is_disposed()); CHECK(!source_disposable.is_disposed()); CHECK(!observer_disposable.is_disposed()); CHECK(!inner_observer_disposable.is_disposed()); observer_disposable.dispose(); - CHECK(!source_disposable.is_disposed()); CHECK(observer_disposable.is_disposed()); + CHECK(!opening_disposable.is_disposed()); + CHECK(!closing_disposable.is_disposed()); + CHECK(!source_disposable.is_disposed()); CHECK(!inner_observer_disposable.is_disposed()); inner_observer_disposable.dispose(); CHECK(source_disposable.is_disposed()); CHECK(observer_disposable.is_disposed()); + CHECK(opening_disposable.is_disposed()); + CHECK(closing_disposable.is_disposed()); CHECK(inner_observer_disposable.is_disposed()); } From 951f41d898a03a831b988bf932da6854abeaaa56 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 15 Jan 2024 22:35:28 +0300 Subject: [PATCH 2/9] fix disposable disposing --- src/rpp/rpp/operators/details/forwarding_subject.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index 1a5cf81c7..1a35310ae 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -63,7 +63,7 @@ class forwarding_strategy rpp::composite_disposable_wrapper get_disposable() const { - return m_state.as_weak(); + return m_state; } private: From ef5d2582305e0cefe3adaef88f3eb8d894d2b369 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Mon, 15 Jan 2024 23:58:02 +0300 Subject: [PATCH 3/9] fix leaks --- .../rpp/operators/details/forwarding_subject.hpp | 2 +- src/rpp/rpp/operators/window_toggle.hpp | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index 1a35310ae..1a5cf81c7 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -63,7 +63,7 @@ class forwarding_strategy rpp::composite_disposable_wrapper get_disposable() const { - return m_state; + return m_state.as_weak(); } private: diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 78ca57b10..906059130 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -74,7 +74,7 @@ struct window_toggle_closing_observer_strategy { std::shared_ptr disposable; std::shared_ptr state; - rpp::disposable_wrapper subject_disposable; + rpp::disposable_wrapper this_disposable; decltype(std::declval().on_new_subject(std::declval())) ptr; void on_next(const auto&) const @@ -92,7 +92,7 @@ struct window_toggle_closing_observer_strategy void on_completed() const { - disposable->remove(subject_disposable); + disposable->remove(this_disposable); ptr->on_completed(); const auto locked_state= state->get_state_under_lock(); @@ -112,9 +112,12 @@ struct window_toggle_opening_observer_strategy void on_next(T&& v) const { typename TState::Subject subject{disposable->wrapper_from_this()}; - auto ptr = state->on_new_subject(subject); - disposable->add(subject.get_disposable()); - state->get_closing(std::forward(v)).subscribe(subject.get_disposable(), window_toggle_closing_observer_strategy{disposable, state, subject.get_disposable(), std::move(ptr)}); + const auto ptr = state->on_new_subject(subject); + + auto d = rpp::composite_disposable_wrapper::make(); + d.add(subject.get_disposable()); + disposable->add(d); + state->get_closing(std::forward(v)).subscribe(d, window_toggle_closing_observer_strategy{disposable, state, d, ptr}); } void on_error(const std::exception_ptr& err) const From 2e54bf494a3cf95e1dc1fc2bd3d22646e2931edc Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 19 Jan 2024 00:10:50 +0300 Subject: [PATCH 4/9] temporarly fix --- src/rpp/rpp/operators/window_toggle.hpp | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index 906059130..c89613119 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -72,9 +72,11 @@ struct window_toggle_state template struct window_toggle_closing_observer_strategy { - std::shared_ptr disposable; - std::shared_ptr state; - rpp::disposable_wrapper this_disposable; + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr disposable; + std::shared_ptr state; + rpp::composite_disposable_wrapper this_disposable; decltype(std::declval().on_new_subject(std::declval())) ptr; void on_next(const auto&) const @@ -92,14 +94,16 @@ struct window_toggle_closing_observer_strategy void on_completed() const { - disposable->remove(this_disposable); ptr->on_completed(); + disposable->remove(this_disposable); + this_disposable.dispose(); + const auto locked_state= state->get_state_under_lock(); locked_state->observers.remove_if([ptr=ptr](const auto& obs) { return &obs == ptr; }); } - static void set_upstream(const disposable_wrapper&) { } - static bool is_disposed() { return false; } + void set_upstream(const disposable_wrapper& d) const { this_disposable.add(d); } + bool is_disposed() const { return this_disposable.is_disposed(); } }; template @@ -117,7 +121,7 @@ struct window_toggle_opening_observer_strategy auto d = rpp::composite_disposable_wrapper::make(); d.add(subject.get_disposable()); disposable->add(d); - state->get_closing(std::forward(v)).subscribe(d, window_toggle_closing_observer_strategy{disposable, state, d, ptr}); + state->get_closing(std::forward(v)).subscribe(window_toggle_closing_observer_strategy{disposable, state, d, ptr}); } void on_error(const std::exception_ptr& err) const From a9016bfcc4ca5d3b2c7f4be2ce9b517e66833619 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 19 Jan 2024 16:57:30 +0300 Subject: [PATCH 5/9] correct fix --- src/rpp/rpp/operators/window_toggle.hpp | 31 ++++++++------- src/tests/rpp/test_window_toggle.cpp | 51 ++++++++++++------------- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index c89613119..b9d774853 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -42,7 +42,7 @@ struct window_toggle_state struct state_t { RPP_NO_UNIQUE_ADDRESS TObserver observer; - mutable std::list().get_observer())> observers{}; + std::list().get_observer())> observers{}; }; window_toggle_state(TObserver&& observer, const TClosingsSelectorFn& closings) @@ -58,10 +58,10 @@ struct window_toggle_state auto on_new_subject(const Subject& subject) { - const auto locked_state = get_state_under_lock(); - auto ptr = &locked_state->observers.emplace_back(subject.get_observer()); + auto locked_state = get_state_under_lock(); + const auto itr = locked_state->observers.insert(locked_state->observers.cend(), subject.get_observer()); locked_state->observer.on_next(subject.get_observable()); - return ptr; + return itr; } private: @@ -77,7 +77,7 @@ struct window_toggle_closing_observer_strategy std::shared_ptr disposable; std::shared_ptr state; rpp::composite_disposable_wrapper this_disposable; - decltype(std::declval().on_new_subject(std::declval())) ptr; + decltype(std::declval().on_new_subject(std::declval())) itr; void on_next(const auto&) const { @@ -86,7 +86,7 @@ struct window_toggle_closing_observer_strategy void on_error(const std::exception_ptr& err) const { - const auto locked_state = state->get_state_under_lock(); + auto locked_state = state->get_state_under_lock(); for (const auto& obs : locked_state->observers) obs.on_error(err); locked_state->observer.on_error(err); @@ -94,14 +94,15 @@ struct window_toggle_closing_observer_strategy void on_completed() const { - ptr->on_completed(); - disposable->remove(this_disposable); + + itr->on_completed(); this_disposable.dispose(); - - const auto locked_state= state->get_state_under_lock(); - locked_state->observers.remove_if([ptr=ptr](const auto& obs) { return &obs == ptr; }); + + auto locked_state= state->get_state_under_lock(); + locked_state->observers.erase(itr); } + void set_upstream(const disposable_wrapper& d) const { this_disposable.add(d); } bool is_disposed() const { return this_disposable.is_disposed(); } }; @@ -116,12 +117,10 @@ struct window_toggle_opening_observer_strategy void on_next(T&& v) const { typename TState::Subject subject{disposable->wrapper_from_this()}; - const auto ptr = state->on_new_subject(subject); + const auto itr = state->on_new_subject(subject); - auto d = rpp::composite_disposable_wrapper::make(); - d.add(subject.get_disposable()); - disposable->add(d); - state->get_closing(std::forward(v)).subscribe(window_toggle_closing_observer_strategy{disposable, state, d, ptr}); + disposable->add(subject.get_disposable()); + state->get_closing(std::forward(v)).subscribe(window_toggle_closing_observer_strategy{disposable, state, subject.get_disposable(), itr}); } void on_error(const std::exception_ptr& err) const diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index 9dfa4c642..0467c8dd9 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -25,7 +25,7 @@ TEST_CASE("window_toggle") { mock_observer_strategy> mock{}; - std::vector> inner_mocks{}; + std::vector> inner_mocks{}; auto subscribe_mocks = [&mock, &inner_mocks](const auto& observable) { @@ -40,9 +40,9 @@ TEST_CASE("window_toggle") SECTION("opening - just(1), closing - never()") { - subscribe_mocks(rpp::source::just(1,2,3) + subscribe_mocks(rpp::source::just(1,2,3) | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::never();})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 1); @@ -55,9 +55,9 @@ TEST_CASE("window_toggle") } SECTION("opening - just(1), closing - empty()") { - subscribe_mocks(rpp::source::just(1,2,3) + subscribe_mocks(rpp::source::just(1,2,3) | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::empty();})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 1); @@ -70,9 +70,9 @@ TEST_CASE("window_toggle") } SECTION("opening - just(1,2,3), closing - empty()") { - subscribe_mocks(rpp::source::just(1,2,3) + subscribe_mocks(rpp::source::just(1,2,3) | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::empty();})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 3); @@ -85,9 +85,9 @@ TEST_CASE("window_toggle") } SECTION("opening - just(1,2,3), closing - never()") { - subscribe_mocks(rpp::source::just(1,2,3) + subscribe_mocks(rpp::source::just(1,2,3) | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::never();})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 3); @@ -102,9 +102,9 @@ TEST_CASE("window_toggle") } SECTION("opening - just(1,2,3), closing - just(1)") { - subscribe_mocks(rpp::source::just(1,2,3) + subscribe_mocks(rpp::source::just(1,2,3) | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 3); @@ -119,9 +119,9 @@ TEST_CASE("window_toggle") } SECTION("opening - never(), closing - just(1)") { - subscribe_mocks(rpp::source::never() + subscribe_mocks(rpp::source::never() | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); REQUIRE(inner_mocks.size() == 3); @@ -133,34 +133,34 @@ TEST_CASE("window_toggle") } SECTION("opening - empty(), closing - just(1)") { - subscribe_mocks(rpp::source::empty() + subscribe_mocks(rpp::source::empty() | rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);})); - + CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 1); REQUIRE(inner_mocks.size() == 0); } SECTION("source - error") { - subscribe_mocks(rpp::source::error({}) + subscribe_mocks(rpp::source::error({}) | rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never(); })); - + CHECK(mock.get_on_error_count() == 1); CHECK(mock.get_on_completed_count() == 0); } SECTION("openings - error") { - subscribe_mocks(rpp::source::never() + subscribe_mocks(rpp::source::never() | rpp::ops::window_toggle(rpp::source::error({}), [](int){return rpp::source::never(); })); - + CHECK(mock.get_on_error_count() == 1); CHECK(mock.get_on_completed_count() == 0); } SECTION("openings - just(1), closings - error") { - subscribe_mocks(rpp::source::never() + subscribe_mocks(rpp::source::never() | rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::error({}); })); - + CHECK(mock.get_on_error_count() == 1); CHECK(mock.get_on_completed_count() == 0); REQUIRE(inner_mocks.size() == 1); @@ -172,9 +172,9 @@ TEST_CASE("window_toggle") } SECTION("openings - just(1), closings - throw") { - subscribe_mocks(rpp::source::never() + subscribe_mocks(rpp::source::never() | rpp::ops::window_toggle(rpp::source::just(1), [](int){ throw std::runtime_error{""}; return rpp::source::error({}); })); - + CHECK(mock.get_on_error_count() == 1); CHECK(mock.get_on_completed_count() == 0); REQUIRE(inner_mocks.size() == 1); @@ -203,11 +203,10 @@ TEST_CASE("window_toggle disposes original disposable only when everything is di auto opening_disposable = std::make_shared(); auto closing_disposable = std::make_shared(); - auto observer_disposable = rpp::composite_disposable_wrapper::make(); auto inner_observer_disposable = rpp::composite_disposable_wrapper::make(); - make_observable(source_disposable) - | rpp::ops::window_toggle(make_observable(opening_disposable) , [&](int){return make_observable(closing_disposable, false); }) + make_observable(source_disposable) + | rpp::ops::window_toggle(make_observable(opening_disposable) , [&](int){return make_observable(closing_disposable, false); }) | rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::window_toggle_observable& new_obs) { new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){}); From fcb6d598392eea582dadc9a7f27211f462bf852c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 30 Jan 2024 23:59:22 +0300 Subject: [PATCH 6/9] compile fix --- src/tests/rpp/test_window_toggle.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index 0467c8dd9..cf719435a 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -199,9 +199,9 @@ TEST_CASE("window_toggle disposes original disposable only when everything is di }); }; - auto source_disposable = std::make_shared(); - auto opening_disposable = std::make_shared(); - auto closing_disposable = std::make_shared(); + auto source_disposable = rpp::composite_disposable_wrapper::make(); + auto opening_disposable = rpp::composite_disposable_wrapper::make(); + auto closing_disposable = rpp::composite_disposable_wrapper::make(); auto observer_disposable = rpp::composite_disposable_wrapper::make(); auto inner_observer_disposable = rpp::composite_disposable_wrapper::make(); From 8f7197d9fdf06d35086f519c5825e28220b52ec8 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 31 Jan 2024 00:01:26 +0300 Subject: [PATCH 7/9] tests works --- src/rpp/rpp/operators/details/forwarding_subject.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index 1a5cf81c7..a7080ff3d 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -63,7 +63,7 @@ class forwarding_strategy rpp::composite_disposable_wrapper get_disposable() const { - return m_state.as_weak(); + return m_state;//.as_weak(); } private: From dada7613d8e901fa4b3c885cc3eb1485de92763a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Wed, 31 Jan 2024 00:11:45 +0300 Subject: [PATCH 8/9] fix --- src/rpp/rpp/operators/details/forwarding_subject.hpp | 2 +- src/tests/rpp/test_window_toggle.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index a7080ff3d..1a5cf81c7 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -63,7 +63,7 @@ class forwarding_strategy rpp::composite_disposable_wrapper get_disposable() const { - return m_state;//.as_weak(); + return m_state.as_weak(); } private: diff --git a/src/tests/rpp/test_window_toggle.cpp b/src/tests/rpp/test_window_toggle.cpp index cf719435a..a0765d11f 100644 --- a/src/tests/rpp/test_window_toggle.cpp +++ b/src/tests/rpp/test_window_toggle.cpp @@ -212,7 +212,7 @@ TEST_CASE("window_toggle disposes original disposable only when everything is di new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){}); }); - CHECK(!closing_disposable.is_disposed()); + CHECK(closing_disposable.is_disposed()); CHECK(!source_disposable.is_disposed()); CHECK(!observer_disposable.is_disposed()); CHECK(!inner_observer_disposable.is_disposed()); @@ -221,7 +221,7 @@ TEST_CASE("window_toggle disposes original disposable only when everything is di CHECK(observer_disposable.is_disposed()); CHECK(!opening_disposable.is_disposed()); - CHECK(!closing_disposable.is_disposed()); + CHECK(closing_disposable.is_disposed()); CHECK(!source_disposable.is_disposed()); CHECK(!inner_observer_disposable.is_disposed()); From 9fbc1d2fbac3738f70ff7684e12837e68bc2910a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 9 Feb 2024 21:33:29 +0300 Subject: [PATCH 9/9] minor --- src/rpp/rpp/operators/window_toggle.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/window_toggle.hpp b/src/rpp/rpp/operators/window_toggle.hpp index b9d774853..72e64491b 100644 --- a/src/rpp/rpp/operators/window_toggle.hpp +++ b/src/rpp/rpp/operators/window_toggle.hpp @@ -41,7 +41,7 @@ struct window_toggle_state struct state_t { - RPP_NO_UNIQUE_ADDRESS TObserver observer; + RPP_NO_UNIQUE_ADDRESS TObserver observer; std::list().get_observer())> observers{}; };