Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <rpp/operators/fwd.hpp>

#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/utils/utils.hpp>

Expand All @@ -34,7 +35,7 @@ namespace rpp::operators::details
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
class concat_disposable final : public rpp::composite_disposable
class concat_disposable final : public rpp::details::base_disposable
, public rpp::details::enable_wrapper_from_this<concat_disposable<TObservable, TObserver>>
{
public:
Expand Down Expand Up @@ -87,7 +88,7 @@ namespace rpp::operators::details
return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst);
}

void composite_dispose_impl(interface_disposable::Mode) noexcept override
void base_dispose_impl(interface_disposable::Mode) noexcept override
{
for (auto& d : m_child_disposables)
d.dispose();
Expand Down
50 changes: 32 additions & 18 deletions src/rpp/rpp/operators/switch_on_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/disposables/refcount_disposable.hpp>
#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/utils/utils.hpp>

#include <array>

namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver>
class switch_on_next_state_t final : public refcount_disposable
class switch_on_next_state_t final : public rpp::details::base_disposable
{
public:
template<rpp::constraint::decayed_same_as<TObserver> TObs>
Expand All @@ -33,13 +35,21 @@ namespace rpp::operators::details
switch_on_next_state_t(const switch_on_next_state_t&) = delete;
switch_on_next_state_t(switch_on_next_state_t&&) noexcept = delete;

rpp::utils::pointer_under_lock<TObserver> get_observer()
rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer_with_mutex; }
rpp::composite_disposable& get_base_child_disposable() { return m_base_child_disposable; }
rpp::utils::pointer_under_lock<rpp::composite_disposable_wrapper> get_inner_child_disposable() { return m_inner_child_disposable; }

private:
void base_dispose_impl(interface_disposable::Mode) noexcept override
{
return m_observer_with_mutex;
get_base_child_disposable().dispose();
get_inner_child_disposable()->dispose();
}

private:
rpp::utils::value_with_mutex<TObserver> m_observer_with_mutex{};
rpp::utils::value_with_mutex<TObserver> m_observer_with_mutex{};
rpp::composite_disposable m_base_child_disposable{};
rpp::utils::value_with_mutex<rpp::composite_disposable_wrapper> m_inner_child_disposable{composite_disposable_wrapper::empty()};
};

template<rpp::constraint::observer TObserver>
Expand All @@ -48,9 +58,9 @@ namespace rpp::operators::details
public:
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;

switch_on_next_inner_observer_strategy(const std::shared_ptr<switch_on_next_state_t<TObserver>>& state, const composite_disposable_wrapper& refcounted)
switch_on_next_inner_observer_strategy(const std::shared_ptr<switch_on_next_state_t<TObserver>>& state, composite_disposable_wrapper&& refcounted)
: m_state{state}
, m_refcounted{refcounted}
, m_refcounted{std::move(refcounted)}
{
}

Expand All @@ -68,12 +78,11 @@ namespace rpp::operators::details
void on_completed() const
{
m_refcounted.dispose();
if (m_state->is_disposed())
if (m_state->get_base_child_disposable().is_disposed())
m_state->get_observer()->on_completed();
}

void set_upstream(const disposable_wrapper& d) const { m_refcounted.add(d); }

bool is_disposed() const { return m_refcounted.is_disposed(); }

private:
Expand All @@ -98,9 +107,16 @@ namespace rpp::operators::details
template<typename T>
void on_next(T&& v) const
{
m_last_refcount.dispose();
m_last_refcount = m_state->add_ref();
std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, m_last_refcount});
auto new_inner = rpp::composite_disposable_wrapper::make();
{
auto inner = m_state->get_inner_child_disposable();
inner->dispose();
if (m_state->is_disposed())
return;

*inner = new_inner;
}
std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, std::move(new_inner)});
}

void on_error(const std::exception_ptr& err) const
Expand All @@ -110,13 +126,13 @@ namespace rpp::operators::details

void on_completed() const
{
m_this_refcount.dispose();
if (m_state->is_disposed())
m_state->get_base_child_disposable().dispose();
if (m_state->get_inner_child_disposable()->is_disposed())
Comment on lines +129 to +130

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure proper disposal order in 'on_completed'

In the on_completed method, disposing of m_state->get_base_child_disposable() before checking if m_inner_child_disposable is disposed might lead to race conditions or unintended behavior. Ensure that disposing the base disposable doesn't interfere with the inner disposable's state.

Consider reviewing the disposal logic to guarantee that all disposables are properly managed and that completion is signaled correctly.

m_state->get_observer()->on_completed();
}

void set_upstream(const disposable_wrapper& d) const { m_this_refcount.add(d); }
bool is_disposed() const { return m_this_refcount.is_disposed(); }
void set_upstream(const disposable_wrapper& d) const { m_state->get_base_child_disposable().add(d); }
bool is_disposed() const { return m_state->get_base_child_disposable().is_disposed(); }

private:
static std::shared_ptr<switch_on_next_state_t<TObserver>> init_state(TObserver&& observer)
Expand All @@ -129,8 +145,6 @@ namespace rpp::operators::details

private:
std::shared_ptr<switch_on_next_state_t<TObserver>> m_state;
rpp::composite_disposable_wrapper m_this_refcount = m_state->add_ref();
mutable rpp::composite_disposable_wrapper m_last_refcount = composite_disposable_wrapper::empty();
};

struct switch_on_next_t : lift_operator<switch_on_next_t>
Expand Down
119 changes: 64 additions & 55 deletions src/tests/rpp/test_switch_on_next.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <doctest/doctest.h>

#include <rpp/observables/dynamic_observable.hpp>
#include <rpp/observers/mock_observer.hpp>
#include <rpp/operators/switch_on_next.hpp>
#include <rpp/sources/empty.hpp>
#include <rpp/sources/error.hpp>
Expand All @@ -21,86 +20,76 @@

#include "copy_count_tracker.hpp"
#include "disposable_observable.hpp"
#include "rpp_trompeloil.hpp"

TEST_CASE("switch_on_next switches observable after obtaining new one")
{
auto mock = mock_observer_strategy<int>();
mock_observer<int> mock{};
trompeloeil::sequence s{};

SUBCASE("just observable of just observables")
{
auto observable = rpp::source::just(rpp::source::just(1), rpp::source::just(2), rpp::source::just(3));
SUBCASE("subscribe on it via switch_on_next")
SUBCASE("subscribe on it via switch_on_next - obtains values as from concat")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_next_lvalue(2)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("obtains values as from concat")
{
CHECK(mock.get_received_values() == std::vector{1, 2, 3});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
SUBCASE("just observable of just observables where second is error")
{
auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{""})).as_dynamic(),
rpp::source::just(3).as_dynamic());
SUBCASE("subscribe on it via switch_on_next")
SUBCASE("subscribe on it via switch_on_next - obtains values as from concat but stops on error")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(s);

observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("obtains values as from concat but stops on error")
{
CHECK(mock.get_received_values() == std::vector{1});
CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}
}
}
SUBCASE("just observable of just observables where second is completed")
{
auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::empty<int>().as_dynamic(),
rpp::source::just(3).as_dynamic());
SUBCASE("subscribe on it via switch_on_next")
SUBCASE("subscribe on it via switch_on_next - obtains values as from concat")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);

observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("obtains values as from concat")
{
CHECK(mock.get_received_values() == std::vector{1, 3});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
SUBCASE("just observable of just observables where second is never")
{
auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::never<int>().as_dynamic(),
rpp::source::just(3).as_dynamic());
SUBCASE("subscribe on it via switch_on_next")
SUBCASE("subscribe on it via switch_on_next - obtains values as from concat")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("obtains values as from concat")
{
CHECK(mock.get_received_values() == std::vector{1, 3});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
SUBCASE("just observable of just observables where last is never")
{
auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::just(3).as_dynamic(),
rpp::source::never<int>().as_dynamic());
SUBCASE("subscribe on it via switch_on_next")
SUBCASE("subscribe on it via switch_on_next - obtains values as from concat but no complete")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s);

observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("obtains values as from concat but no complete")
{
CHECK(mock.get_received_values() == std::vector{1, 3});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
}
}
SUBCASE("subject of just subjects")
Expand All @@ -120,12 +109,9 @@ TEST_CASE("switch_on_next switches observable after obtaining new one")

SUBCASE("Only value from first subject obtained")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
subj_1.get_observer().on_next(1);
subj_2.get_observer().on_next(2);

CHECK(mock.get_received_values() == std::vector{1});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
SUBCASE("send second subject and send values for all subjects")
{
Expand All @@ -134,36 +120,59 @@ TEST_CASE("switch_on_next switches observable after obtaining new one")
SUBCASE("Only value from second subject obtained")
{
subj_1.get_observer().on_next(1);
subj_2.get_observer().on_next(2);

CHECK(mock.get_received_values() == std::vector{2});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
REQUIRE_CALL(*mock, on_next_lvalue(2)).IN_SEQUENCE(s);
subj_2.get_observer().on_next(2);
}
}
SUBCASE("original subject completes but provided send value")
{
subj_of_subjects.get_observer().on_completed();

REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
subj_1.get_observer().on_next(1);
subj_2.get_observer().on_next(2);
SUBCASE("value obtained")
{
CHECK(mock.get_received_values() == std::vector{1});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
SUBCASE("subject sends on_completed")
{
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
subj_1.get_observer().on_completed();
SUBCASE("subsriber completed")
{
CHECK(mock.get_on_completed_count() == 1);
}
}
}
}
}
}
SUBCASE("switch_on_next completes right")
{
rpp::subjects::publish_subject<rpp::dynamic_observable<int>> subj{};

subj.get_observable() | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("on_completed from base")
{
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
subj.get_observer().on_completed();
}

SUBCASE("on_completed from inner + then from base")
{
subj.get_observer().on_next(rpp::source::empty<int>());

REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
subj.get_observer().on_completed();
}

SUBCASE("on_completed from base + then from inner")
{
subj.get_observer().on_next(rpp::source::empty<int>());
subj.get_observer().on_next(rpp::source::never<int>());

rpp::subjects::publish_subject<int> inner{};
subj.get_observer().on_next(inner.get_observable());
subj.get_observer().on_completed();

REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
inner.get_observer().on_completed();
}
}
}

TEST_CASE("switch_on_next doesn't produce extra copies")
Expand Down