diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index d95860c95..1c9c1dfb0 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -92,6 +92,7 @@ #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/finally.hpp b/src/rpp/rpp/operators/finally.hpp new file mode 100644 index 000000000..18b2d5037 --- /dev/null +++ b/src/rpp/rpp/operators/finally.hpp @@ -0,0 +1,61 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include + +namespace rpp::operators::details +{ + template + struct finally_t + { + template + struct operator_traits + { + using result_type = T; + }; + + template + using updated_disposable_strategy = typename Prev::template add<1>; + + RPP_NO_UNIQUE_ADDRESS LastFn last_fn; + + template + auto lift(Observer&& observer) const + { + observer.set_upstream(make_callback_disposable(last_fn)); + return std::forward(observer); + } + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief Register callback to be called when execution is done and disposable bound to observer is disposed + * + * @param last_fn action callback + * @warning #include + * + * @details action callback needs to be noexcept as it is called on dispose, throwing during this time could potentially break internal disposable state. + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/do.html + */ + template LastFn> + auto finally(LastFn&& last_fn) + { + return details::finally_t>{std::forward(last_fn)}; + } +} // namespace rpp::operators \ No newline at end of file diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 663bf948d..fb5e40b74 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -51,6 +51,9 @@ namespace rpp::operators requires (!utils::is_not_template_callable || std::same_as>) auto filter(Fn&& predicate); + template LastFn> + auto finally(LastFn&& lastFn); + template requires (!utils::is_not_template_callable || rpp::constraint::observable>) auto flat_map(Fn&& callable); diff --git a/src/tests/rpp/test_finally.cpp b/src/tests/rpp/test_finally.cpp new file mode 100644 index 000000000..2214528b2 --- /dev/null +++ b/src/tests/rpp/test_finally.cpp @@ -0,0 +1,115 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include + +#include "disposable_observable.hpp" +#include "mock_observer.hpp" + +TEST_CASE("finally executes only at the end") +{ + auto mock = mock_observer_strategy(); + SECTION("observable with no emissions") + { + auto obs = rpp::source::create([](const auto&) { + }); + size_t invoked = 0; + SECTION("subscribe") + { + obs | rpp::operators::finally([&]() noexcept { ++invoked; }) + | rpp::ops::subscribe(mock); + SECTION("observer obtains values from observable") + { + CHECK(invoked == 1); + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + } + + SECTION("observable with on_completed emission") + { + auto obs = rpp::source::create([](const auto& sub) { + sub.on_completed(); + }); + size_t invoked = 0; + SECTION("subscribe") + { + obs | rpp::operators::finally([&]() noexcept { ++invoked; }) + | rpp::ops::subscribe(mock); + SECTION("observer obtains values from observable") + { + CHECK(invoked == 1); + CHECK(mock.get_total_on_next_count() == 0); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + SECTION("observable with on_next emission") + { + auto obs = rpp::source::create([](const auto& sub) { + sub.on_next(1); + sub.on_completed(); + }); + size_t invoked = 0; + SECTION("subscribe") + { + obs | rpp::operators::finally([&]() noexcept { ++invoked; }) + | rpp::ops::subscribe(mock); + SECTION("observer obtains values from observable") + { + CHECK(invoked == 1); + CHECK(mock.get_total_on_next_count() == 1); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + SECTION("observable with on_error emission") + { + auto obs = rpp::source::create([](const auto& sub) { + sub.on_next(1); + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + size_t invoked = 0; + SECTION("subscribe") + { + obs | rpp::operators::finally([&]() noexcept { ++invoked; }) + | rpp::ops::subscribe(mock); + SECTION("observer obtains values from observable") + { + CHECK(invoked == 1); + CHECK(mock.get_total_on_next_count() == 1); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + } + } + } +} + +TEST_CASE("finally satisfies disposable contracts") +{ + auto observable_disposable = rpp::composite_disposable_wrapper::make(); + { + auto observable = observable_with_disposable(observable_disposable); + + test_operator_with_disposable( + rpp::ops::finally([]() noexcept {})); + } + + CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); +} \ No newline at end of file