diff --git a/src/extensions/rppqt/rppqt/schedulers/main_thread.hpp b/src/extensions/rppqt/rppqt/schedulers/main_thread.hpp index f7cd503ad..9a379e1b3 100644 --- a/src/extensions/rppqt/rppqt/schedulers/main_thread.hpp +++ b/src/extensions/rppqt/rppqt/schedulers/main_thread.hpp @@ -50,8 +50,6 @@ namespace rppqt::schedulers }); } - static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } - static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); } private: diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 4e2a5d1b7..1803c6db3 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -43,11 +43,6 @@ namespace rpp::operators::details , m_worker{std::move(in_worker)} , m_period{period} { - if constexpr (!Worker::is_none_disposable) - { - if (auto d = m_worker.get_disposable(); !d.is_disposed()) - rpp::composite_disposable_impl::add(std::move(d)); - } } template @@ -172,7 +167,7 @@ namespace rpp::operators::details auto lift_with_disposable_strategy(Observer&& observer) const { using worker_t = rpp::schedulers::utils::get_worker_t; - using container = typename DisposableStrategy::template add::disposable_container; + using container = typename DisposableStrategy::disposable_container; const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); auto ptr = disposable.lock(); diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 8e180e5b4..649844ca0 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -45,11 +45,6 @@ namespace rpp::operators::details , worker{std::move(in_worker)} , delay{delay} { - if constexpr (!Worker::is_none_disposable) - { - if (auto d = worker.get_disposable(); !d.is_disposed()) - rpp::composite_disposable_impl::add(std::move(d)); - } } RPP_NO_UNIQUE_ADDRESS Observer observer; @@ -186,7 +181,7 @@ namespace rpp::operators::details auto lift_with_disposable_strategy(Observer&& observer) const { using worker_t = rpp::schedulers::utils::get_worker_t; - using container = typename DisposableStrategy::template add::disposable_container; + using container = typename DisposableStrategy::disposable_container; const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); auto ptr = disposable.lock(); diff --git a/src/rpp/rpp/operators/subscribe_on.hpp b/src/rpp/rpp/operators/subscribe_on.hpp index 9cbf0d12c..e7f314296 100644 --- a/src/rpp/rpp/operators/subscribe_on.hpp +++ b/src/rpp/rpp/operators/subscribe_on.hpp @@ -42,7 +42,7 @@ namespace rpp::operators::details }; template - using updated_disposable_strategy = typename Prev::template add::is_none_disposable ? 0 : 1>; + using updated_disposable_strategy = Prev; RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; @@ -50,11 +50,6 @@ namespace rpp::operators::details void subscribe(Observer&& observer, const rpp::details::observables::chain& observable_strategy) const { const auto worker = scheduler.create_worker(); - if constexpr (!rpp::schedulers::utils::get_worker_t::is_none_disposable) - { - if (auto d = worker.get_disposable(); !d.is_disposed()) - observer.set_upstream(std::move(d)); - } worker.schedule(subscribe_on_schedulable>{observable_strategy}, std::forward(observer)); } }; diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 9d8efda02..058739e5d 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -133,22 +133,16 @@ namespace rpp::operators::details auto lift_with_disposable_strategy(Observer&& observer) const { using worker_t = rpp::schedulers::utils::get_worker_t; - using container = typename DisposableStrategy::template add::disposable_container; + using container = typename DisposableStrategy::disposable_container; - const auto timeout = rpp::schedulers::utils::get_worker_t::now() + period; + const auto timeout = worker_t::now() + period; const auto disposable = disposable_wrapper_impl, TFallbackObservable, container>>::make(std::forward(observer), period, fallback, timeout); auto ptr = disposable.lock(); ptr->get_observer_with_timeout_under_lock()->observer.set_upstream(disposable.as_weak()); const auto worker = scheduler.create_worker(); - if constexpr (!rpp::schedulers::utils::get_worker_t::is_none_disposable) - { - if (auto d = worker.get_disposable(); !d.is_disposed()) - disposable.add(std::move(d)); - } - - using wrapper = timeout_disposable_wrapper, TFallbackObservable, container>; + using wrapper = timeout_disposable_wrapper, TFallbackObservable, container>; worker.schedule( timeout, [](wrapper& handler) -> rpp::schedulers::optional_delay_to { diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index 94b0a2cbc..b0d60c43e 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -181,8 +181,6 @@ namespace rpp::schedulers } } - static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } - static rpp::schedulers::time_point now() { return details::now(); } }; diff --git a/src/rpp/rpp/schedulers/details/worker.hpp b/src/rpp/rpp/schedulers/details/worker.hpp index a3cf2e681..218c919aa 100644 --- a/src/rpp/rpp/schedulers/details/worker.hpp +++ b/src/rpp/rpp/schedulers/details/worker.hpp @@ -56,18 +56,8 @@ namespace rpp::schedulers schedule(tp - now(), std::forward(fn), std::forward(handler), std::forward(args)...); } - rpp::disposable_wrapper get_disposable() const - { - if constexpr (is_none_disposable) - return disposable_wrapper::empty(); - else - return m_strategy.get_disposable(); - } - static rpp::schedulers::time_point now() { return Strategy::now(); } - static constexpr bool is_none_disposable = std::same_as().get_disposable()), rpp::schedulers::details::none_disposable>; - private: RPP_NO_UNIQUE_ADDRESS Strategy m_strategy; }; diff --git a/src/rpp/rpp/schedulers/fwd.hpp b/src/rpp/rpp/schedulers/fwd.hpp index a4896ffa4..c8e7f7744 100644 --- a/src/rpp/rpp/schedulers/fwd.hpp +++ b/src/rpp/rpp/schedulers/fwd.hpp @@ -89,10 +89,6 @@ namespace rpp::schedulers::details static void on_error(const std::exception_ptr&) {} }; - - struct none_disposable - { - }; } // namespace rpp::schedulers::details namespace rpp::schedulers::constraint @@ -148,10 +144,9 @@ namespace rpp::schedulers::constraint }; template - concept strategy = (defer_for_strategy || defer_to_strategy) && requires(const S& s, const details::fake_schedulable_handler& handler) { - { - s.get_disposable() - } -> rpp::constraint::any_of; + concept strategy = (defer_for_strategy || defer_to_strategy)&& + requires () + { { S::now() } -> std::same_as; diff --git a/src/rpp/rpp/schedulers/immediate.hpp b/src/rpp/rpp/schedulers/immediate.hpp index 3d42bb236..2c539610f 100644 --- a/src/rpp/rpp/schedulers/immediate.hpp +++ b/src/rpp/rpp/schedulers/immediate.hpp @@ -73,8 +73,6 @@ namespace rpp::schedulers details::immediate_scheduling_while_condition(duration, rpp::utils::return_true{}, std::forward(fn), std::forward(handler), std::forward(args)...); } - static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } - static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); } }; diff --git a/src/rpp/rpp/schedulers/new_thread.hpp b/src/rpp/rpp/schedulers/new_thread.hpp index 3ea3ca90d..9fa379bb8 100644 --- a/src/rpp/rpp/schedulers/new_thread.hpp +++ b/src/rpp/rpp/schedulers/new_thread.hpp @@ -29,27 +29,19 @@ namespace rpp::schedulers */ class new_thread { - class disposable final : public rpp::details::base_disposable + class state_t final { public: - disposable() = default; + state_t() = default; - template Fn> - void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) - { - m_state->queue.emplace(time_point, std::forward(fn), std::forward(handler), std::forward(args)...); - m_state->has_fresh_data.store(true); - } - - private: - void base_dispose_impl(interface_disposable::Mode) noexcept override + ~state_t() noexcept { if (!m_thread.joinable()) return; { std::lock_guard lock{m_state->mutex}; - m_state->is_disposed = true; + m_state->is_stoping = true; } m_state->cv.notify_all(); @@ -59,24 +51,32 @@ namespace rpp::schedulers m_thread.detach(); } - struct state_t : public details::shared_queue_data + template Fn> + void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) + { + m_state->queue.emplace(time_point, std::forward(fn), std::forward(handler), std::forward(args)...); + m_state->has_fresh_data.store(true); + } + + private: + struct queue_data : public details::shared_queue_data { details::schedulables_queue queue{}; - bool is_disposed{}; + bool is_stoping{}; std::atomic_bool has_fresh_data{false}; }; - static void data_thread(std::shared_ptr state) + static void data_thread(std::shared_ptr state) { current_thread::get_queue() = &state->queue; while (true) { std::unique_lock lock{state->mutex}; - if (state->queue.is_empty() && state->is_disposed) + if (state->queue.is_empty() && state->is_stoping) break; - state->cv.wait(lock, [&] { return !state->queue.is_empty() || state->is_disposed; }); + state->cv.wait(lock, [&] { return !state->queue.is_empty() || state->is_stoping; }); if (state->queue.is_empty()) break; @@ -120,7 +120,7 @@ namespace rpp::schedulers } private: - std::shared_ptr m_state = std::make_shared(); + std::shared_ptr m_state = std::make_shared(); RPP_CALL_DURING_CONSTRUCTION(m_state->queue = details::schedulables_queue(m_state)); @@ -136,15 +136,13 @@ namespace rpp::schedulers template Fn> void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const { - m_state.lock()->defer_to(tp, std::forward(fn), std::forward(handler), std::forward(args)...); + m_state->defer_to(tp, std::forward(fn), std::forward(handler), std::forward(args)...); } - rpp::disposable_wrapper get_disposable() const { return m_state; } - static rpp::schedulers::time_point now() { return details::now(); } private: - disposable_wrapper_impl m_state = disposable_wrapper_impl::make(); + std::shared_ptr m_state = std::make_shared(); }; static rpp::schedulers::worker create_worker() diff --git a/src/rpp/rpp/schedulers/run_loop.hpp b/src/rpp/rpp/schedulers/run_loop.hpp index 8233800ae..b8fa1ffb6 100644 --- a/src/rpp/rpp/schedulers/run_loop.hpp +++ b/src/rpp/rpp/schedulers/run_loop.hpp @@ -120,8 +120,6 @@ namespace rpp::schedulers shared->emplace_and_notify(tp, std::forward(fn), std::forward(handler), std::forward(args)...); } - static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } - static rpp::schedulers::time_point now() { return details::now(); } private: diff --git a/src/rpp/rpp/schedulers/test_scheduler.hpp b/src/rpp/rpp/schedulers/test_scheduler.hpp index 6f29117d3..0cb52cbd9 100644 --- a/src/rpp/rpp/schedulers/test_scheduler.hpp +++ b/src/rpp/rpp/schedulers/test_scheduler.hpp @@ -9,7 +9,6 @@ #pragma once -#include #include namespace rpp::schedulers @@ -23,16 +22,13 @@ namespace rpp::schedulers class worker_strategy; - struct state : public rpp::details::base_disposable + struct state { state() = default; template Fn> void schedule(rpp::schedulers::time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) { - if (is_disposed()) - return; - schedulings.push_back(time_point); queue.emplace(time_point, std::forward(fn), std::forward(handler), std::forward(args)...); } @@ -53,17 +49,15 @@ namespace rpp::schedulers executions.push_back(s_current_time); if (auto new_timepoint = (*fn)()) { - if (!is_disposed()) - { - schedulings.push_back(std::max(s_current_time, new_timepoint.value())); - queue.emplace(schedulings.back(), std::move(fn)); - } + if (fn->is_disposed()) + continue; + + schedulings.push_back(std::max(s_current_time, new_timepoint.value())); + queue.emplace(schedulings.back(), std::move(fn)); } } } - void base_dispose_impl(interface_disposable::Mode) noexcept override {} - std::vector schedulings{}; std::vector executions{}; rpp::schedulers::details::schedulables_queue queue{}; @@ -72,7 +66,7 @@ namespace rpp::schedulers class worker_strategy { public: - worker_strategy(rpp::disposable_wrapper_impl state) + worker_strategy(std::weak_ptr state) : m_state{std::move(state)} { } @@ -82,41 +76,36 @@ namespace rpp::schedulers { if (auto locked = m_state.lock()) { - if (!locked->is_disposed()) - { - locked->schedule(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); - locked->drain(); - } + locked->schedule(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); + locked->drain(); } } static rpp::schedulers::time_point now() { return s_current_time; } - rpp::disposable_wrapper get_disposable() const { return m_state.as_weak(); } - private: - rpp::disposable_wrapper_impl m_state; + std::weak_ptr m_state; }; test_scheduler() = default; rpp::schedulers::worker create_worker() const { - return rpp::schedulers::worker{m_state.as_weak()}; + return rpp::schedulers::worker{m_state}; } - const auto& get_schedulings() const { return m_state.lock()->schedulings; } - const auto& get_executions() const { return m_state.lock()->executions; } + const auto& get_schedulings() const { return m_state->schedulings; } + const auto& get_executions() const { return m_state->executions; } static rpp::schedulers::time_point now() { return s_current_time; } void time_advance(rpp::schedulers::duration dur) const { s_current_time += dur; - m_state.lock()->drain(); + m_state->drain(); } private: - rpp::disposable_wrapper_impl m_state = rpp::disposable_wrapper_impl::make(); + std::shared_ptr m_state = std::make_shared(); }; } // namespace rpp::schedulers diff --git a/src/rpp/rpp/schedulers/thread_pool.hpp b/src/rpp/rpp/schedulers/thread_pool.hpp index 6c4275813..906733d29 100644 --- a/src/rpp/rpp/schedulers/thread_pool.hpp +++ b/src/rpp/rpp/schedulers/thread_pool.hpp @@ -51,8 +51,7 @@ namespace rpp::schedulers m_original_worker.schedule(tp, std::forward(fn), std::forward(handler), std::forward(args)...); } - static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; } - static rpp::schedulers::time_point now() { return original_worker::now(); } + static rpp::schedulers::time_point now() { return original_worker::now(); } private: original_worker m_original_worker; diff --git a/src/rpp/rpp/sources/from.hpp b/src/rpp/rpp/sources/from.hpp index 114c83631..af447bb8e 100644 --- a/src/rpp/rpp/sources/from.hpp +++ b/src/rpp/rpp/sources/from.hpp @@ -84,7 +84,7 @@ namespace rpp::details { public: using value_type = rpp::utils::iterable_value_t; - using expected_disposable_strategy = std::conditional_t::is_none_disposable, rpp::details::observables::bool_disposable_strategy_selector, rpp::details::observables::fixed_disposable_strategy_selector<1>>; + using expected_disposable_strategy = rpp::details::observables::bool_disposable_strategy_selector; template from_iterable_strategy(const TScheduler& scheduler, Args&&... args) @@ -122,11 +122,6 @@ namespace rpp::details else { const auto worker = scheduler.create_worker(); - if constexpr (!rpp::schedulers::utils::get_worker_t::is_none_disposable) - { - if (auto d = worker.get_disposable(); !d.is_disposed()) - obs.set_upstream(std::move(d)); - } worker.schedule(from_iterable_schedulable{}, std::move(obs), container, size_t{}); } } diff --git a/src/rpp/rpp/sources/interval.hpp b/src/rpp/rpp/sources/interval.hpp index d79e8864f..42f3b9233 100644 --- a/src/rpp/rpp/sources/interval.hpp +++ b/src/rpp/rpp/sources/interval.hpp @@ -29,7 +29,7 @@ namespace rpp::details struct interval_strategy { using value_type = size_t; - using expected_disposable_strategy = std::conditional_t::is_none_disposable, rpp::details::observables::bool_disposable_strategy_selector, rpp::details::observables::fixed_disposable_strategy_selector<1>>; + using expected_disposable_strategy = rpp::details::observables::bool_disposable_strategy_selector; RPP_NO_UNIQUE_ADDRESS TScheduler scheduler; TimePointOrDuration initial; @@ -39,12 +39,6 @@ namespace rpp::details void subscribe(TObs&& observer) const { const auto worker = scheduler.create_worker(); - if constexpr (!rpp::schedulers::utils::get_worker_t::is_none_disposable) - { - if (auto d = worker.get_disposable(); !d.is_disposed()) - observer.set_upstream(std::move(d)); - } - worker.schedule(initial, interval_schedulable{}, std::forward(observer), period, size_t{}); } }; diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index 386dd964f..d8627c990 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -172,8 +172,6 @@ TEST_CASE("Immediate scheduler") auto worker = scheduler.create_worker(); - CHECK(worker.get_disposable().is_disposed()); - size_t call_count{}; SECTION("immediate scheduler schedules and re-schedules action immediately") @@ -355,11 +353,7 @@ TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, auto mock_obs = mock_observer_strategy{}; auto obs = std::optional{mock_obs.get_observer(d).as_dynamic()}; - auto worker = std::optional{TestType{}.create_worker()}; - if constexpr (std::same_as) - CHECK(worker->get_disposable().is_disposed()); - - obs->set_upstream(worker->get_disposable()); + auto worker = std::optional{TestType{}.create_worker()}; size_t call_count{}; std::promise thread_of_schedule_promise{}; @@ -717,7 +711,6 @@ TEST_CASE("new_thread utilized current_thread") { auto worker = rpp::schedulers::new_thread::create_worker(); auto obs = mock.get_observer().as_dynamic(); - obs.set_upstream(worker.get_disposable()); worker.schedule([&inner_schedule_executed](const auto& obs) { rpp::schedulers::current_thread::create_worker().schedule([&inner_schedule_executed](const auto&) { inner_schedule_executed = true; @@ -916,8 +909,6 @@ TEST_CASE("current_thread inside new_thread") auto current_thread_invoked = std::make_shared(); worker->schedule([&](const auto& obs) { - worker->get_disposable().dispose(); - rpp::schedulers::current_thread{}.create_worker().schedule([current_thread_invoked](const auto&) { current_thread_invoked->store(true); return rpp::schedulers::optional_delay_from_now{};