Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@

#include <rpp/operators/as_blocking.hpp>
#include <rpp/operators/delay.hpp>
#include <rpp/operators/finally.hpp>
#include <rpp/operators/observe_on.hpp>
#include <rpp/operators/repeat.hpp>
#include <rpp/operators/subscribe_on.hpp>
Expand Down
61 changes: 61 additions & 0 deletions src/rpp/rpp/operators/finally.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/operators/details/strategy.hpp>

namespace rpp::operators::details
{
template<rpp::constraint::decayed_type LastFn>
struct finally_t
{
template<rpp::constraint::decayed_type T>
struct operator_traits
{
using result_type = T;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = typename Prev::template add<1>;

RPP_NO_UNIQUE_ADDRESS LastFn last_fn;

template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
auto lift(Observer&& observer) const
{
observer.set_upstream(make_callback_disposable(last_fn));
return std::forward<Observer>(observer);
}
};
} // namespace rpp::operators::details

namespace rpp::operators
{
/**
* @brief Register callback to be called when execution is done and disposable bound to observer is disposed
*
* @param last_fn action callback
* @warning #include <rpp/operators/finally.hpp>
*
* @details action callback needs to be noexcept as it is called on dispose, throwing during this time could potentially break internal disposable state.
*
* @ingroup utility_operators
* @see https://reactivex.io/documentation/operators/do.html
*/
template<std::invocable<> LastFn>
auto finally(LastFn&& last_fn)
{
return details::finally_t<std::decay_t<LastFn>>{std::forward<LastFn>(last_fn)};
}
} // namespace rpp::operators
3 changes: 3 additions & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ namespace rpp::operators
requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto filter(Fn&& predicate);

template<std::invocable<> LastFn>
auto finally(LastFn&& lastFn);

template<typename Fn>
requires (!utils::is_not_template_callable<Fn> || rpp::constraint::observable<std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto flat_map(Fn&& callable);
Expand Down
115 changes: 115 additions & 0 deletions src/tests/rpp/test_finally.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#include <snitch/snitch.hpp>

#include <rpp/operators/finally.hpp>
#include <rpp/sources/create.hpp>

#include "disposable_observable.hpp"
#include "mock_observer.hpp"

TEST_CASE("finally executes only at the end")
{
auto mock = mock_observer_strategy<int>();
SECTION("observable with no emissions")
{
auto obs = rpp::source::create<int>([](const auto&) {
});
size_t invoked = 0;
SECTION("subscribe")
{
obs | rpp::operators::finally([&]() noexcept { ++invoked; })
Comment thread
AlexInLog marked this conversation as resolved.
| rpp::ops::subscribe(mock);
SECTION("observer obtains values from observable")
{
CHECK(invoked == 1);
CHECK(mock.get_total_on_next_count() == 0);
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
}
}

SECTION("observable with on_completed emission")
{
auto obs = rpp::source::create<int>([](const auto& sub) {
sub.on_completed();
});
size_t invoked = 0;
SECTION("subscribe")
{
obs | rpp::operators::finally([&]() noexcept { ++invoked; })
| rpp::ops::subscribe(mock);
SECTION("observer obtains values from observable")
{
CHECK(invoked == 1);
CHECK(mock.get_total_on_next_count() == 0);
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}

SECTION("observable with on_next emission")
{
auto obs = rpp::source::create<int>([](const auto& sub) {
sub.on_next(1);
sub.on_completed();
});
size_t invoked = 0;
SECTION("subscribe")
{
obs | rpp::operators::finally([&]() noexcept { ++invoked; })
| rpp::ops::subscribe(mock);
SECTION("observer obtains values from observable")
{
CHECK(invoked == 1);
CHECK(mock.get_total_on_next_count() == 1);
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}

SECTION("observable with on_error emission")
{
auto obs = rpp::source::create<int>([](const auto& sub) {
sub.on_next(1);
sub.on_error(std::make_exception_ptr(std::runtime_error{""}));
});
size_t invoked = 0;
SECTION("subscribe")
{
obs | rpp::operators::finally([&]() noexcept { ++invoked; })
| rpp::ops::subscribe(mock);
SECTION("observer obtains values from observable")
{
CHECK(invoked == 1);
CHECK(mock.get_total_on_next_count() == 1);
CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}
}
}
}

TEST_CASE("finally satisfies disposable contracts")
{
auto observable_disposable = rpp::composite_disposable_wrapper::make();
{
auto observable = observable_with_disposable<int>(observable_disposable);

test_operator_with_disposable<int>(
rpp::ops::finally([]() noexcept {}));
}

CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2);
}