diff --git a/CMakeLists.txt b/CMakeLists.txt index 5dbec6c2f..73a34961b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,7 +54,7 @@ if (RPP_BUILD_TESTS OR RPP_BUILD_BENCHMARKS) if (RPP_USE_LLVM_COV) add_test(NAME ${TARGET} COMMAND ${CMAKE_COMMAND} -E env LLVM_PROFILE_FILE=${RPP_TEST_RESULTS_DIR}/${TARGET}.profraw $ -v high) else() - add_test(NAME ${TARGET} COMMAND $ -v high) + add_test(NAME ${TARGET} COMMAND $ -v high -s) endif() endmacro() endif() diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index c7269919b..6430e41b4 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -6,12 +6,12 @@ macro(rpp_handle_3rdparty TARGET_NAME) target_compile_options(${TARGET_NAME} INTERFACE "-w") else() target_compile_options(${TARGET_NAME} PRIVATE "-w") + set_target_properties(${TARGET_NAME} PROPERTIES CXX_CLANG_TIDY "") + set_target_properties(${TARGET_NAME} PROPERTIES CXX_CPPCHECK "") + set_target_properties(${TARGET_NAME} PROPERTIES FOLDER 3rdparty) endif() set_target_properties(${TARGET_NAME} PROPERTIES INTERFACE_SYSTEM_INCLUDE_DIRECTORIES $) - set_target_properties(${TARGET_NAME} PROPERTIES CXX_CLANG_TIDY "") - set_target_properties(${TARGET_NAME} PROPERTIES CXX_CPPCHECK "") - set_target_properties(${TARGET_NAME} PROPERTIES FOLDER 3rdparty) endmacro() # ===================== SFML ======================= diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 1cf382a11..ed2aca2c2 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -33,10 +33,15 @@ namespace rpp::operators::details { } + void stop() { m_stopped = true; } + bool is_stopped() const { return m_stopped; } + bool stop_return_was_stopped() { return m_stopped.exchange(true); } + rpp::utils::pointer_under_lock get_observer() { return m_observer_with_mutex; } private: rpp::utils::value_with_mutex m_observer_with_mutex{}; + std::atomic_bool m_stopped{}; }; template @@ -48,14 +53,14 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - state->dispose(); - state->get_observer()->on_error(err); + if (!state->stop_return_was_stopped()) + state->get_observer()->on_error(err); } void on_completed() const { - state->dispose(); - state->get_observer()->on_completed(); + if (!state->stop_return_was_stopped()) + state->get_observer()->on_completed(); } void set_upstream(const disposable_wrapper& d) { state->add(d); } @@ -69,8 +74,8 @@ namespace rpp::operators::details template void on_next(const T&) const { - take_until_observer_strategy_base::state->dispose(); - take_until_observer_strategy_base::state->get_observer()->on_completed(); + if (!take_until_observer_strategy_base::state->stop_return_was_stopped()) + take_until_observer_strategy_base::state->get_observer()->on_completed(); } }; @@ -80,7 +85,8 @@ namespace rpp::operators::details template void on_next(T&& v) const { - take_until_observer_strategy_base::state->get_observer()->on_next(std::forward(v)); + if (!take_until_observer_strategy_base::state->is_stopped()) + take_until_observer_strategy_base::state->get_observer()->on_next(std::forward(v)); } }; diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 5143a2ce0..9ad1a1800 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -89,22 +89,26 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const noexcept { - const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); - if (disposable->is_disposed()) - return; + { + const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + if (disposable->is_disposed()) + return; + obs_with_timeout->observer.on_error(err); + } disposable->dispose(); - obs_with_timeout->observer.on_error(err); } void on_completed() const noexcept { - const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); - if (disposable->is_disposed()) - return; + { + const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock(); + if (disposable->is_disposed()) + return; + obs_with_timeout->observer.on_completed(); + } disposable->dispose(); - obs_with_timeout->observer.on_completed(); } }; diff --git a/src/tests/rpp/test_take_until.cpp b/src/tests/rpp/test_take_until.cpp index f149c3d95..7d65a96a8 100644 --- a/src/tests/rpp/test_take_until.cpp +++ b/src/tests/rpp/test_take_until.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -223,4 +224,18 @@ TEST_CASE("take_until satisfies disposable contracts") { test_operator_with_disposable(rpp::ops::take_until(rpp::source::never())); test_operator_with_disposable(rpp::ops::take_until(rpp::source::empty())); + test_operator_finish_before_dispose(rpp::ops::take_until(rpp::source::empty())); + test_operator_over_observable_finish_before_dispose([](const auto& ob) { + return rpp::source::never() | rpp::ops::take_until(ob); + }); +} + +TEST_CASE("take_until dispose after completion") +{ + std::vector log{}; + rpp::source::empty() + | rpp::ops::finally([&]() noexcept { log.push_back("finally"); }) + | rpp::ops::subscribe([](int) {}, [&log]() { log.push_back("completed"); }); + + CHECK(log == std::vector{"completed", "finally"}); }