Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ if (RPP_BUILD_TESTS OR RPP_BUILD_BENCHMARKS)
if (RPP_USE_LLVM_COV)
add_test(NAME ${TARGET} COMMAND ${CMAKE_COMMAND} -E env LLVM_PROFILE_FILE=${RPP_TEST_RESULTS_DIR}/${TARGET}.profraw $<TARGET_FILE:${TARGET}> -v high)
else()
add_test(NAME ${TARGET} COMMAND $<TARGET_FILE:${TARGET}> -v high)
add_test(NAME ${TARGET} COMMAND $<TARGET_FILE:${TARGET}> -v high -s)
endif()
endmacro()
endif()
Expand Down
6 changes: 3 additions & 3 deletions cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ macro(rpp_handle_3rdparty TARGET_NAME)
target_compile_options(${TARGET_NAME} INTERFACE "-w")
else()
target_compile_options(${TARGET_NAME} PRIVATE "-w")
set_target_properties(${TARGET_NAME} PROPERTIES CXX_CLANG_TIDY "")
set_target_properties(${TARGET_NAME} PROPERTIES CXX_CPPCHECK "")
set_target_properties(${TARGET_NAME} PROPERTIES FOLDER 3rdparty)
endif()

set_target_properties(${TARGET_NAME} PROPERTIES INTERFACE_SYSTEM_INCLUDE_DIRECTORIES $<TARGET_PROPERTY:${TARGET_NAME},INTERFACE_INCLUDE_DIRECTORIES>)
set_target_properties(${TARGET_NAME} PROPERTIES CXX_CLANG_TIDY "")
set_target_properties(${TARGET_NAME} PROPERTIES CXX_CPPCHECK "")
set_target_properties(${TARGET_NAME} PROPERTIES FOLDER 3rdparty)
endmacro()

# ===================== SFML =======================
Expand Down
20 changes: 13 additions & 7 deletions src/rpp/rpp/operators/take_until.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ namespace rpp::operators::details
{
}

void stop() { m_stopped = true; }
bool is_stopped() const { return m_stopped; }
bool stop_return_was_stopped() { return m_stopped.exchange(true); }

rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer_with_mutex; }

private:
rpp::utils::value_with_mutex<TObserver> m_observer_with_mutex{};
std::atomic_bool m_stopped{};
};

template<rpp::constraint::observer TObserver>
Expand All @@ -48,14 +53,14 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
state->dispose();
state->get_observer()->on_error(err);
if (!state->stop_return_was_stopped())
state->get_observer()->on_error(err);
}

void on_completed() const
{
state->dispose();
state->get_observer()->on_completed();
if (!state->stop_return_was_stopped())
state->get_observer()->on_completed();
}

void set_upstream(const disposable_wrapper& d) { state->add(d); }
Expand All @@ -69,8 +74,8 @@ namespace rpp::operators::details
template<typename T>
void on_next(const T&) const
{
take_until_observer_strategy_base<TObserver>::state->dispose();
take_until_observer_strategy_base<TObserver>::state->get_observer()->on_completed();
if (!take_until_observer_strategy_base<TObserver>::state->stop_return_was_stopped())
take_until_observer_strategy_base<TObserver>::state->get_observer()->on_completed();
}
};

Expand All @@ -80,7 +85,8 @@ namespace rpp::operators::details
template<typename T>
void on_next(T&& v) const
{
take_until_observer_strategy_base<TObserver>::state->get_observer()->on_next(std::forward<T>(v));
if (!take_until_observer_strategy_base<TObserver>::state->is_stopped())
take_until_observer_strategy_base<TObserver>::state->get_observer()->on_next(std::forward<T>(v));
}
};

Expand Down
20 changes: 12 additions & 8 deletions src/rpp/rpp/operators/timeout.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,26 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const noexcept
{
const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
if (disposable->is_disposed())
return;
{
const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
if (disposable->is_disposed())
return;

obs_with_timeout->observer.on_error(err);
}
disposable->dispose();
obs_with_timeout->observer.on_error(err);
}

void on_completed() const noexcept
{
const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
if (disposable->is_disposed())
return;
{
const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
if (disposable->is_disposed())
return;

obs_with_timeout->observer.on_completed();
}
disposable->dispose();
obs_with_timeout->observer.on_completed();
}
};

Expand Down
15 changes: 15 additions & 0 deletions src/tests/rpp/test_take_until.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <rpp/observers/mock_observer.hpp>
#include <rpp/operators/as_blocking.hpp>
#include <rpp/operators/finally.hpp>
#include <rpp/operators/take_until.hpp>
#include <rpp/schedulers/current_thread.hpp>
#include <rpp/sources/empty.hpp>
Expand Down Expand Up @@ -223,4 +224,18 @@ TEST_CASE("take_until satisfies disposable contracts")
{
test_operator_with_disposable<int>(rpp::ops::take_until(rpp::source::never<int>()));
test_operator_with_disposable<int>(rpp::ops::take_until(rpp::source::empty<int>()));
test_operator_finish_before_dispose<int>(rpp::ops::take_until(rpp::source::empty<int>()));
test_operator_over_observable_finish_before_dispose<int>([](const auto& ob) {
return rpp::source::never<int>() | rpp::ops::take_until(ob);
});
}

TEST_CASE("take_until dispose after completion")
{
std::vector<std::string> log{};
rpp::source::empty<int>()
| rpp::ops::finally([&]() noexcept { log.push_back("finally"); })
| rpp::ops::subscribe([](int) {}, [&log]() { log.push_back("completed"); });

CHECK(log == std::vector<std::string>{"completed", "finally"});
}