Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions src/rpp/rpp/observables/specific_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,38 +73,23 @@ class specific_observable : public interface_observable<Type, specific_observabl
template<constraint::subscriber_of_type<Type> TSub>
void actual_subscribe(const TSub& subscriber) const
{
// will be scheduled immediately -> reference can be passed
const auto safe_subscribe = [&]
{
try
{
m_on_subscribe(subscriber);
}
catch (...)
{
if (subscriber.is_subscribed())
subscriber.on_error(std::current_exception());
else
throw;
}
return schedulers::optional_duration{};
};

// take ownership over current thread as early as possible to delay all next "current_thread" schedulings. For example, scheduling of emissions from "just" to delay it till whole chain is subscribed and ready to listened emissions
// For example, if we have
// rpp::source::just(rpp::schedulers::current_thread{}, 1,2).combine_latest(rpp::source::just(rpp::schedulers::current_thread{}, 1,2))
//
// then we expect to see emissions like (1,1) (2,1) (2,2) instead of (2,1) (2,2). TO do it we need to "take ownership" over queue to prevent ANY immediate schedulings from ANY next subscriptions
if (schedulers::current_thread::is_queue_owned())
const auto drain_on_exit_if_needed = schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
try
{
safe_subscribe();
m_on_subscribe(subscriber);
}
else
catch (...)
{
// we need to submit work into queue to take ownership over it. We can submit work with time_point "zero" due to anyway queue is empty and it doesn't make sense, but we can take performance boost due to avoiding extra calls to "now"
schedulers::current_thread::create_worker(subscriber.get_subscription()).schedule(schedulers::time_point{}, safe_subscribe);
if (subscriber.is_subscribed())
subscriber.on_error(std::current_exception());
else
throw;
}

}

private:
Expand Down
13 changes: 12 additions & 1 deletion src/rpp/rpp/schedulers/trampoline_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class trampoline final : public details::scheduler_tag

static void drain_queue()
{
if (!s_queue.has_value())
return;

auto reset_at_final = utils::finally_action{ [] { s_queue.reset(); } };
std::optional<trampoline_schedulable> function{};

Expand Down Expand Up @@ -152,7 +155,15 @@ class trampoline final : public details::scheduler_tag
inline static thread_local std::optional<std::priority_queue<current_thread_schedulable>> s_queue{};

public:
static bool is_queue_owned() { return s_queue.has_value(); }
static utils::finally_action<void (*)()> own_queue_and_drain_finally_if_not_owned()
{
const bool someone_owns_queue = s_queue.has_value();

if (!someone_owns_queue)
s_queue = std::priority_queue<current_thread_schedulable>{};

return {!someone_owns_queue ? &drain_queue : +[] {}};
}

static auto create_worker(const rpp::composite_subscription& sub = composite_subscription{})
{
Expand Down