diff --git a/src/rpp/rpp/schedulers/new_thread.hpp b/src/rpp/rpp/schedulers/new_thread.hpp index c48925071..5cefa7ed9 100644 --- a/src/rpp/rpp/schedulers/new_thread.hpp +++ b/src/rpp/rpp/schedulers/new_thread.hpp @@ -44,11 +44,7 @@ namespace rpp::schedulers m_state->is_stoping = true; } m_state->cv.notify_all(); - - if (m_thread.get_id() != std::this_thread::get_id()) - m_thread.join(); - else - m_thread.detach(); + m_thread.detach(); } template Fn> diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index d8627c990..32a8d6601 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -22,6 +22,7 @@ #include #include "rpp/disposables/fwd.hpp" +#include "rpp_trompeloil.hpp" #include #include @@ -735,14 +736,33 @@ TEST_CASE("new_thread utilized current_thread") TEST_CASE("new_thread works till end") { - auto mock = mock_observer_strategy{}; + auto mock = mock_observer{}; + trompeloeil::sequence s{}; - rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + const auto vals = std::array{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + + REQUIRE_CALL(*mock, on_next_lvalue(trompeloeil::_)).TIMES(10).IN_SEQUENCE(s); + + auto done = std::make_shared(); + + const auto last = NAMED_REQUIRE_CALL(*mock, on_completed()).LR_SIDE_EFFECT({ + thread_local rpp::utils::finally_action s_a{[done] { + done->store(true); + }}; + }) + .IN_SEQUENCE(s); + + rpp::source::from_iterable(vals) | rpp::operators::subscribe_on(rpp::schedulers::new_thread{}) - | rpp::operators::as_blocking() | rpp::operators::subscribe(mock); - CHECK(mock.get_received_values().size() == 10); + CHECK(!last->is_satisfied()); + + wait(last); + + while (!done->load()) + { + }; } TEST_CASE("run_loop scheduler dispatches tasks only manually") diff --git a/src/tests/utils/rpp_trompeloil.hpp b/src/tests/utils/rpp_trompeloil.hpp index f2fb3b7f1..712cf907b 100644 --- a/src/tests/utils/rpp_trompeloil.hpp +++ b/src/tests/utils/rpp_trompeloil.hpp @@ -75,7 +75,8 @@ class mock_observer std::shared_ptr m_impl = std::make_shared(); }; -inline void wait(const std::unique_ptr& e) +template +inline void wait(const std::unique_ptr& e) { while (!e->is_satisfied()) {