Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 52 additions & 69 deletions src/rpp/rpp/observers/dynamic_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,73 +19,24 @@

namespace rpp::details::observers
{
template<typename Type>
class observer_vtable
{
public:
void set_upstream(const disposable_wrapper& d) noexcept { m_vtable.set_upstream_ptr(this, d); }
bool is_disposed() const noexcept { return m_vtable.is_disposed_ptr(this); }

void on_next(const Type& v) const noexcept { m_vtable.on_next_lvalue_ptr(this, v); }
void on_next(Type&& v) const noexcept { m_vtable.on_next_rvalue_ptr(this, std::move(v)); }
void on_error(const std::exception_ptr& err) const noexcept { m_vtable.on_error_ptr(this, err); }
void on_completed() const noexcept { m_vtable.on_completed_ptr(this); }

protected:
struct vtable_t
{
void (*const on_next_lvalue_ptr)(const observer_vtable*, const Type&){};
void (*const on_next_rvalue_ptr)(const observer_vtable*, Type&&){};
void (*const on_error_ptr)(const observer_vtable*, const std::exception_ptr&){};
void (*const on_completed_ptr)(const observer_vtable*){};

void (*const set_upstream_ptr)(observer_vtable*, const disposable_wrapper&){};
bool (*const is_disposed_ptr)(const observer_vtable*){};
};

observer_vtable(vtable_t&& vtable)
: m_vtable{std::move(vtable)}
{
}
template<auto Fn, bool NoExcept>
struct member_ptr_caller_impl;

const vtable_t m_vtable{};
template<bool NoExcept, class T, class R, class... Args, R (T::*F)(Args...) noexcept(NoExcept)>
struct member_ptr_caller_impl<F, NoExcept>
{
static R call(void* data, Args... args) noexcept(NoExcept) { return (static_cast<T*>(data)->*F)(static_cast<Args>(args)...); }
};

template<rpp::constraint::observer TObs>
class type_erased_observer : public observer_vtable<rpp::utils::extract_observer_type_t<TObs>>
template<bool NoExcept, class T, class R, class... Args, R (T::*F)(Args...) const noexcept(NoExcept)>
struct member_ptr_caller_impl<F, NoExcept>
{
using Type = rpp::utils::extract_observer_type_t<TObs>;
using Base = observer_vtable<Type>;
using Vtable = typename Base::vtable_t;

static constexpr const TObs& cast(const Base* ptr)
{
return static_cast<const type_erased_observer*>(ptr)->m_observer;
}

static constexpr TObs& cast(Base* ptr)
{
return static_cast<type_erased_observer*>(ptr)->m_observer;
}
static R call(const void* data, Args... args) noexcept(NoExcept) { return (static_cast<const T*>(data)->*F)(static_cast<Args>(args)...); }
};

public:
type_erased_observer(TObs&& observer)
: Base{Vtable{
.on_next_lvalue_ptr = +[](const Base* b, const Type& v) { cast(b).on_next(v); },
.on_next_rvalue_ptr = +[](const Base* b, Type&& v) { cast(b).on_next(std::move(v)); },
.on_error_ptr = +[](const Base* b, const std::exception_ptr& err) { cast(b).on_error(err); },
.on_completed_ptr = +[](const Base* b) { cast(b).on_completed(); },
.set_upstream_ptr = +[](Base* b, const rpp::disposable_wrapper& d) { cast(b).set_upstream(d); },
.is_disposed_ptr = +[](const Base* b) {
return cast(b).is_disposed();
}}}
, m_observer{std::move(observer)}
{
}
template<auto Fn>
using member_ptr_caller = member_ptr_caller_impl<Fn, noexcept(Fn)>;

private:
TObs m_observer;
};

template<rpp::constraint::decayed_type Type>
class dynamic_strategy final
Expand All @@ -94,20 +45,52 @@ namespace rpp::details::observers
template<rpp::constraint::observer_strategy<Type> Strategy>
requires (!rpp::constraint::decayed_same_as<Strategy, dynamic_strategy<Type>>)
explicit dynamic_strategy(observer<Type, Strategy>&& obs)
: m_observer{std::make_shared<type_erased_observer<observer<Type, Strategy>>>(std::move(obs))}
: m_forwarder{std::make_shared<observer<Type, Strategy>>(std::move(obs))}
, m_vtable{vtable::template create<observer<Type, Strategy>>()}
{
}

void set_upstream(const disposable_wrapper& d) noexcept { m_observer->set_upstream(d); }
bool is_disposed() const noexcept { return m_observer->is_disposed(); }
void set_upstream(const disposable_wrapper& d) noexcept { m_vtable->set_upstream(m_forwarder.get(), d); }

bool is_disposed() const noexcept { return m_vtable->is_disposed(m_forwarder.get()); }

void on_next(const Type& v) const noexcept { m_observer->on_next(v); }
void on_next(Type&& v) const noexcept { m_observer->on_next(std::move(v)); }
void on_error(const std::exception_ptr& err) const noexcept { m_observer->on_error(err); }
void on_completed() const noexcept { m_observer->on_completed(); }
void on_next(const Type& v) const noexcept { m_vtable->on_next_lvalue(m_forwarder.get(), v); }

void on_next(Type&& v) const noexcept { m_vtable->on_next_rvalue(m_forwarder.get(), std::move(v)); }

void on_error(const std::exception_ptr& err) const noexcept { m_vtable->on_error(m_forwarder.get(), err); }

void on_completed() const noexcept { m_vtable->on_completed(m_forwarder.get()); }

private:
struct vtable
{
void (*on_next_lvalue)(const void*, const Type&){};
void (*on_next_rvalue)(const void*, Type&&){};
void (*on_error)(const void*, const std::exception_ptr&){};
void (*on_completed)(const void*){};

void (*set_upstream)(void*, const disposable_wrapper&){};
bool (*is_disposed)(const void*){};

template<rpp::constraint::observer Strategy>
static const vtable* create() noexcept
{
static vtable s_res{
.on_next_lvalue = &member_ptr_caller<static_cast<typename Strategy::on_next_lvalue>(&Strategy::on_next)>::call,
.on_next_rvalue = &member_ptr_caller<static_cast<typename Strategy::on_next_rvalue>(&Strategy::on_next)>::call,
.on_error = &member_ptr_caller<&Strategy::on_error>::call,
.on_completed = &member_ptr_caller<&Strategy::on_completed>::call,
.set_upstream = &member_ptr_caller<&Strategy::set_upstream>::call,
.is_disposed = &member_ptr_caller<&Strategy::is_disposed>::call,
};
return &s_res;
}
};

private:
std::shared_ptr<observer_vtable<Type>> m_observer;
std::shared_ptr<void> m_forwarder;
const vtable* m_vtable;
};
} // namespace rpp::details::observers

Expand Down
121 changes: 53 additions & 68 deletions src/rpp/rpp/subjects/details/subject_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
#include <rpp/utils/utils.hpp>

#include <algorithm>
#include <deque>
#include <memory>
#include <mutex>
#include <variant>
#include <vector>

namespace rpp::subjects::details
{
Expand All @@ -39,36 +39,7 @@ namespace rpp::subjects::details
class subject_state : public composite_disposable
, public rpp::details::enable_wrapper_from_this<subject_state<Type, Serialized>>
{
template<rpp::constraint::observer TObs>
class disposable_with_observer : public rpp::details::observers::type_erased_observer<TObs>
, public rpp::details::base_disposable
{
public:
disposable_with_observer(TObs&& observer, std::weak_ptr<subject_state> state)
: rpp::details::observers::type_erased_observer<TObs>{std::move(observer)}
, m_state{std::move(state)}
{
}

private:
void base_dispose_impl(interface_disposable::Mode) noexcept override
{
if (const auto shared = m_state.lock())
{
std::unique_lock lock{shared->m_mutex};
process_state_unsafe(shared->m_state,
[&](const shared_observers& observers) {
shared->m_state = cleanup_observers(observers, this);
});
}
}

std::weak_ptr<subject_state> m_state{};
};

using observer = std::shared_ptr<rpp::details::observers::observer_vtable<Type>>;
using observers = std::deque<observer>;
using shared_observers = std::shared_ptr<observers>;
using shared_observers = std::shared_ptr<std::vector<rpp::dynamic_observer<Type>>>;
using state_t = std::variant<shared_observers, std::exception_ptr, completed, disposed>;

public:
Expand All @@ -83,21 +54,13 @@ namespace rpp::subjects::details
process_state_unsafe(
m_state,
[&](const shared_observers& observers) {
auto d = disposable_wrapper_impl<disposable_with_observer<std::decay_t<TObs>>>::make(std::forward<TObs>(observer), this->wrapper_from_this().lock());
auto ptr = d.lock();
if (!observers)
{
auto new_observers = std::make_shared<subject_state::observers>();
new_observers->emplace_back(ptr);
m_state = std::move(new_observers);
}
else
{
observers->emplace_back(ptr);
}
auto new_observers = make_copy_of_subscribed_observers(true, observers);
auto observer_as_dynamic = std::forward<TObs>(observer).as_dynamic();
new_observers->push_back(observer_as_dynamic);
m_state = std::move(new_observers);

lock.unlock();
ptr->set_upstream(d.as_weak());
set_upstream(observer_as_dynamic);
},
[&](const std::exception_ptr& err) {
lock.unlock();
Expand All @@ -111,31 +74,17 @@ namespace rpp::subjects::details

void on_next(const Type& v)
{
std::unique_lock observers_lock{m_mutex};

if (!std::holds_alternative<shared_observers>(m_state))
return;

// we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call
const auto observers = std::get<shared_observers>(m_state);
if (!observers)
return;

const auto begin = observers->cbegin();
const auto end = observers->cend();

observers_lock.unlock();

std::lock_guard lock{m_serialized_mutex};
std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); });
if (const auto observers = extract_observers_under_lock_if_there())
rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_next(v); });
}

void on_error(const std::exception_ptr& err)
{
{
std::lock_guard lock{m_serialized_mutex};
if (const auto observers = exchange_observers_under_lock_if_there(err))
rpp::utils::for_each(*observers, [&](const observer& obs) { obs->on_error(err); });
rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); });
}
dispose();
}
Expand All @@ -145,7 +94,7 @@ namespace rpp::subjects::details
{
std::lock_guard lock{m_serialized_mutex};
if (const auto observers = exchange_observers_under_lock_if_there(completed{}))
rpp::utils::for_each(*observers, [](const observer& obs) { obs->on_completed(); });
rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer<Type>::on_completed>{});
}
dispose();
}
Expand All @@ -156,26 +105,62 @@ namespace rpp::subjects::details
exchange_observers_under_lock_if_there(disposed{});
}

static shared_observers cleanup_observers(const shared_observers& current_subs, const rpp::details::observers::observer_vtable<Type>* to_delete)
void set_upstream(rpp::dynamic_observer<Type>& obs)
{
auto subs = std::make_shared<observers>();
obs.set_upstream(rpp::disposable_wrapper{make_callback_disposable(
[weak = this->wrapper_from_this().as_weak()]() noexcept // NOLINT(bugprone-exception-escape)
{
if (const auto shared = weak.lock())
{
std::unique_lock lock{shared->m_mutex};
process_state_unsafe(shared->m_state,
[&](const shared_observers& observers) {
shared->m_state = make_copy_of_subscribed_observers(false, observers);
});
}
})});
}

static shared_observers make_copy_of_subscribed_observers(bool add, const shared_observers& current_subs)
{
auto subs = std::make_shared<std::vector<dynamic_observer<Type>>>();
subs->reserve(deduce_new_size(add, current_subs));
if (current_subs)
{
std::copy_if(current_subs->cbegin(),
current_subs->cend(),
std::back_inserter(*subs),
[&to_delete](const observer& obs) {
return to_delete != obs.get() && !obs->is_disposed();
});
rpp::utils::static_not_mem_fn<&dynamic_observer<Type>::is_disposed>{});
}
return subs;
}

static size_t deduce_new_size(bool add, const shared_observers& current_subs)
{
if (!current_subs)
return add ? 1 : 0;

if (add)
return current_subs->size() + 1;

return std::max(current_subs->size(), size_t{1}) - 1;
}

static void process_state_unsafe(const state_t& state, const auto&... actions)
{
std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state);
}

shared_observers extract_observers_under_lock_if_there()
{
std::lock_guard lock{m_mutex};

if (!std::holds_alternative<shared_observers>(m_state))
return {};

return std::get<shared_observers>(m_state);
}

shared_observers exchange_observers_under_lock_if_there(state_t&& new_val)
{
std::lock_guard lock{m_mutex};
Expand All @@ -187,7 +172,7 @@ namespace rpp::subjects::details
}

private:
state_t m_state;
state_t m_state{};
std::mutex m_mutex{};
RPP_NO_UNIQUE_ADDRESS std::conditional_t<Serialized, std::mutex, rpp::utils::none_mutex> m_serialized_mutex{};
};
Expand Down
33 changes: 0 additions & 33 deletions src/tests/rpp/test_subjects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <rpp/subjects/replay_subject.hpp>

#include "copy_count_tracker.hpp"
#include "rpp_trompeloil.hpp"

#include <thread>

Expand Down Expand Up @@ -138,38 +137,6 @@ TEST_CASE("publish subject multicasts values")
}
}

TEST_CASE("subject can be modified from on_next call")
{
rpp::subjects::publish_subject<int> subject{};
mock_observer<int> inner_mock{};

SECTION("subscribe inside on_next")
{
subject.get_observable().subscribe([&subject, &inner_mock](int) {
subject.get_observable().subscribe(inner_mock);
});

subject.get_observer().on_next(1);

REQUIRE_CALL(*inner_mock, on_next_lvalue(2));
subject.get_observer().on_next(2);
}

SECTION("unsubscribe inside on_next")
{
auto d = rpp::composite_disposable_wrapper::make();

subject.get_observable().subscribe([d](int) {
d.clear();
});
subject.get_observable().subscribe(d, inner_mock);

REQUIRE_CALL(*inner_mock, on_next_lvalue(1));
subject.get_observer().on_next(1);
subject.get_observer().on_next(2);
}
}

TEST_CASE("publish subject caches error/completed")
{
auto mock = mock_observer_strategy<int>{};
Expand Down