Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e3706c8
Partially revert commit
AlexInLog Nov 3, 2024
b79ce00
fix
AlexInLog Nov 3, 2024
c4decc8
fix
AlexInLog Nov 3, 2024
652e362
fix
AlexInLog Nov 3, 2024
1cccfe4
fix
AlexInLog Nov 3, 2024
017bd10
update deps
AlexInLog Nov 4, 2024
c525a79
new changes
AlexInLog Nov 4, 2024
e897c6e
Merge branch 'v2' into fix_disposables_logic
AlexInLog Nov 4, 2024
e4d2545
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 4, 2024
44f0d67
Merge branch 'v2' into fix_disposables_logic
AlexInLog Nov 4, 2024
14656b6
Merge branch 'v2' into fix_disposables_logic
AlexInLog Nov 5, 2024
66a95f8
Merge branch 'v2' into fix_disposables_logic
AlexInLog Nov 5, 2024
6ff468a
Merge branch 'v2' into fix_disposables_logic
AlexInLog Nov 5, 2024
f54ed0d
try to fix jinja2
AlexInLog Nov 5, 2024
cf7d666
Update ci v2.yml
AlexInLog Nov 5, 2024
48936b9
Update ci v2.yml
AlexInLog Nov 5, 2024
db6f58c
Update ci v2.yml
AlexInLog Nov 5, 2024
a3145c7
Update ci v2.yml
AlexInLog Nov 5, 2024
793aac4
Update ci v2.yml
AlexInLog Nov 5, 2024
af429e4
Update ci v2.yml
AlexInLog Nov 5, 2024
3b02d73
Update ci v2.yml
AlexInLog Nov 5, 2024
c6fe6da
Update ci v2.yml
AlexInLog Nov 5, 2024
29c8e17
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 5, 2024
a371479
Update ci v2.yml
AlexInLog Nov 5, 2024
6095b8d
Update ci v2.yml
AlexInLog Nov 5, 2024
2006590
Merge branch 'v2' into fix_disposables_logic
AlexInLog Nov 5, 2024
be3d80d
Merge branch 'v2' into fix_disposables_logic
AlexInLog Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ endif()
# ===================== Tests ===================
if (RPP_BUILD_TESTS)
rpp_fetch_library(doctest https://github.com/doctest/doctest.git v2.4.11)
rpp_fetch_library(trompeloeil https://github.com/rollbear/trompeloeil.git main)
rpp_fetch_library(trompeloeil https://github.com/rollbear/trompeloeil.git v48)
endif()


# ==================== Nanobench =================
if (RPP_BUILD_BENCHMARKS)
rpp_fetch_library(nanobench https://github.com/martinus/nanobench.git master)
rpp_fetch_library(nanobench https://github.com/martinus/nanobench.git v4.3.11)
endif()

# ==================== ASIO =====================
Expand Down
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class RppConan(ConanFile):

def requirements(self):
if self.options.with_tests:
self.requires("trompeloeil/47")
self.requires("trompeloeil/48")
self.requires("doctest/2.4.11")

if self.options.with_benchmarks:
Expand Down
20 changes: 13 additions & 7 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ namespace rpp
}
} // namespace rpp

#ifdef RPP_BUILD_RXCPP
namespace rxcpp
{
template<typename... Ts>
Expand All @@ -73,16 +74,17 @@ namespace rxcpp
return rxcpp::observable<>::from(rxcpp::identity_immediate(), std::forward<Ts>(vals)...);
}
} // namespace rxcpp
#endif

int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
{
auto bench = ankerl::nanobench::Bench{}.output(nullptr).warmup(3);
const auto args = std::span{argv, static_cast<size_t>(argc)};
const auto benchmark = find_argument("--benchmark=", args);
const auto section = find_argument("--section=", args);
const auto disable_rxcpp = find_argument("--disable_rxcpp", args).has_value();
const auto disable_rpp = find_argument("--disable_rpp", args).has_value();
const auto dump = find_argument("--dump=", args);
auto bench = ankerl::nanobench::Bench{}.output(nullptr).warmup(3);
const auto args = std::span{argv, static_cast<size_t>(argc)};
const auto benchmark = find_argument("--benchmark=", args);
const auto section = find_argument("--section=", args);
[[maybe_unused]] const auto disable_rxcpp = find_argument("--disable_rxcpp", args).has_value();
const auto disable_rpp = find_argument("--disable_rpp", args).has_value();
const auto dump = find_argument("--dump=", args);

BENCHMARK("General")
{
Expand Down Expand Up @@ -687,8 +689,10 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
});
}
{
#ifdef RPP_BUILD_RXCPP
rxcpp::subjects::subject<int> rxcpp_subj{};
rxcpp_subj.get_observable().subscribe(rxcpp::make_subscriber<int>([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }, [] {}));
#endif
TEST_RXCPP([&]() {
rxcpp_subj.get_subscriber().on_next(1);
});
Expand Down Expand Up @@ -728,11 +732,13 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
}

{
#ifdef RPP_BUILD_RXCPP
rxcpp::subjects::subject<int> rxcpp_subj{};
for (size_t i = 0; i < 100; ++i)
{
rxcpp_subj.get_observable().subscribe(rxcpp::make_subscriber<int>([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }));
}
#endif
TEST_RXCPP([&] {
for (size_t i = 0; i < 100; ++i)
rxcpp_subj.get_subscriber().on_next(i);
Expand Down
13 changes: 4 additions & 9 deletions src/rpp/rpp/disposables/refcount_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,7 @@ namespace rpp
friend class details::refocunt_disposable_inner;
refcount_disposable() = default;

enum class Mode : bool
{
WeakRefStrongSource,
StrongRefRefSource
};
composite_disposable_wrapper add_ref(Mode mode = Mode::WeakRefStrongSource);
composite_disposable_wrapper add_ref();

private:
std::atomic<size_t> m_refcount{0};
Expand Down Expand Up @@ -96,7 +91,7 @@ namespace rpp::details

namespace rpp
{
inline composite_disposable_wrapper refcount_disposable::add_ref(refcount_disposable::Mode mode)
inline composite_disposable_wrapper refcount_disposable::add_ref()
{
auto current_value = m_refcount.load(std::memory_order::seq_cst);
while (true)
Expand All @@ -107,8 +102,8 @@ namespace rpp
// just need atomicity, not guarding anything
if (m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::seq_cst))
{
auto inner = composite_disposable_wrapper::make<details::refocunt_disposable_inner>(mode == Mode::WeakRefStrongSource ? wrapper_from_this() : wrapper_from_this().as_weak());
add(mode == Mode::WeakRefStrongSource ? inner.as_weak() : inner);
auto inner = composite_disposable_wrapper::make<details::refocunt_disposable_inner>(wrapper_from_this());
add(inner.as_weak());
return inner;
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/rpp/rpp/observables/details/chain_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ namespace rpp::details::observables
else if constexpr (rpp::constraint::operator_lift<TStrategy, typename base::value_type>)
m_strategies.subscribe(m_strategy.template lift<typename base::value_type>(std::forward<Observer>(observer)));
else
{
static_assert(rpp::constraint::operator_subscribe<TStrategy, typename base::value_type>);
m_strategy.subscribe(std::forward<Observer>(observer), m_strategies);
}
}

private:
Expand Down
22 changes: 11 additions & 11 deletions src/rpp/rpp/operators/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
namespace rpp::operators::details
{
template<rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... Args>
class combine_latest_state final : public combining_state<Observer>
class combine_latest_disposable final : public combining_disposable<Observer>
{
public:
explicit combine_latest_state(Observer&& observer, const TSelector& selector)
: combining_state<Observer>(std::move(observer), sizeof...(Args))
explicit combine_latest_disposable(Observer&& observer, const TSelector& selector)
: combining_disposable<Observer>(std::move(observer), sizeof...(Args))
, m_selector(selector)
{
}
Expand All @@ -40,31 +40,31 @@ namespace rpp::operators::details

template<size_t I, rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... Args>
struct combine_latest_observer_strategy final
: public combining_observer_strategy<combine_latest_state<Observer, TSelector, Args...>>
: public combining_observer_strategy<combine_latest_disposable<Observer, TSelector, Args...>>
{
using combining_observer_strategy<combine_latest_state<Observer, TSelector, Args...>>::state;
using combining_observer_strategy<combine_latest_disposable<Observer, TSelector, Args...>>::disposable;

template<typename T>
void on_next(T&& v) const
{
// mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one
const auto observer = state->get_observer_under_lock();
state->get_values().template get<I>().emplace(std::forward<T>(v));
const auto observer = disposable->get_observer_under_lock();
disposable->get_values().template get<I>().emplace(std::forward<T>(v));

state->get_values().apply(&apply_impl<decltype(state)>, state, observer);
disposable->get_values().apply(&apply_impl<decltype(disposable)>, disposable, observer);
}

private:
template<typename TState>
static void apply_impl(const TState& disposable, const rpp::utils::pointer_under_lock<Observer>& observer, const std::optional<Args>&... vals)
template<typename TDisposable>
static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock<Observer>& observer, const std::optional<Args>&... vals)
{
if ((vals.has_value() && ...))
observer->on_next(disposable->get_selector()(vals.value()...));
}
};

template<typename TSelector, rpp::constraint::observable... TObservables>
struct combine_latest_t : public combining_operator_t<combine_latest_state, combine_latest_observer_strategy, TSelector, TObservables...>
struct combine_latest_t : public combining_operator_t<combine_latest_disposable, combine_latest_observer_strategy, TSelector, TObservables...>
{
};
} // namespace rpp::operators::details
Expand Down
66 changes: 36 additions & 30 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,29 @@ namespace rpp::operators::details
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
class concat_state_t final : public std::enable_shared_from_this<concat_state_t<TObservable, TObserver>>
class concat_disposable final : public rpp::refcount_disposable
{
public:
concat_state_t(TObserver&& observer)
concat_disposable(TObserver&& observer)
: m_observer{std::move(observer)}
{
const auto d = disposable_wrapper_impl<refcount_disposable>::make();
m_disposable = d.lock();
get_observer()->set_upstream(d);
}

rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer; }
rpp::utils::pointer_under_lock<std::queue<TObservable>> get_queue() { return m_queue; }
const std::shared_ptr<refcount_disposable>& get_disposable() const { return m_disposable; }

std::atomic<ConcatStage>& stage() { return m_stage; }

void drain(rpp::composite_disposable_wrapper refcounted)
{
while (!m_disposable->is_disposed())
while (!is_disposed())
{
const auto observable = get_observable();
if (!observable)
{
stage().store(ConcatStage::None, std::memory_order::relaxed);
refcounted.dispose();
if (m_disposable->is_disposed())
if (is_disposed())
Comment thread
AlexInLog marked this conversation as resolved.
get_observer()->on_completed();
return;
}
Expand All @@ -78,13 +74,11 @@ namespace rpp::operators::details
drain(refcounted);
}


private:
bool handle_observable_impl(const rpp::constraint::decayed_same_as<TObservable> auto& observable, rpp::composite_disposable_wrapper refcounted)
{
stage().store(ConcatStage::Draining, std::memory_order::relaxed);
refcounted.clear();
observable.subscribe(concat_inner_observer_strategy<TObservable, TObserver>{this->shared_from_this(), std::move(refcounted)});
observable.subscribe(concat_inner_observer_strategy<TObservable, TObserver>{disposable_wrapper_impl<concat_disposable>{wrapper_from_this()}.lock(), std::move(refcounted)});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Verify safe usage of wrapper_from_this()

Ensure that wrapper_from_this() is called only when this is managed by a shared_ptr. Using it otherwise can lead to undefined behavior. Confirm that instances of concat_disposable are always owned by a shared_ptr.


ConcatStage current = ConcatStage::Draining;
return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst);
Expand All @@ -102,7 +96,6 @@ namespace rpp::operators::details
}

private:
std::shared_ptr<refcount_disposable> m_disposable{};
rpp::utils::value_with_mutex<TObserver> m_observer;
rpp::utils::value_with_mutex<std::queue<TObservable>> m_queue;
std::atomic<ConcatStage> m_stage{};
Expand All @@ -111,23 +104,23 @@ namespace rpp::operators::details
template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_observer_strategy_base
{
concat_observer_strategy_base(std::shared_ptr<concat_state_t<TObservable, TObserver>> state, rpp::composite_disposable_wrapper refcounted)
: state{std::move(state)}
concat_observer_strategy_base(std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable, rpp::composite_disposable_wrapper refcounted)
: disposable{std::move(disposable)}
, refcounted{std::move(refcounted)}
{
}

concat_observer_strategy_base(std::shared_ptr<concat_state_t<TObservable, TObserver>> state)
: concat_observer_strategy_base{state, state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource)}
concat_observer_strategy_base(std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable)
: concat_observer_strategy_base{disposable, disposable->add_ref()}
{
}

std::shared_ptr<concat_state_t<TObservable, TObserver>> state;
rpp::composite_disposable_wrapper refcounted;
std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable;
rpp::composite_disposable_wrapper refcounted;

void on_error(const std::exception_ptr& err) const
{
state->get_observer()->on_error(err);
disposable->get_observer()->on_error(err);
}

void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); }
Expand All @@ -138,55 +131,68 @@ namespace rpp::operators::details
template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_inner_observer_strategy : public concat_observer_strategy_base<TObservable, TObserver>
{
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;

using base = concat_observer_strategy_base<TObservable, TObserver>;
using base::concat_observer_strategy_base;

template<typename T>
void on_next(T&& v) const
{
base::state->get_observer()->on_next(std::forward<T>(v));
base::disposable->get_observer()->on_next(std::forward<T>(v));
}

void on_completed() const
{
base::refcounted.clear();

ConcatStage current{ConcatStage::Draining};
if (base::state->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
return;

assert(current == ConcatStage::Processing);

base::state->drain(base::refcounted);
base::disposable->drain(base::refcounted);
}
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_observer_strategy : public concat_observer_strategy_base<TObservable, TObserver>
{
using base = concat_observer_strategy_base<TObservable, TObserver>;
using base = concat_observer_strategy_base<TObservable, TObserver>;

static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;

concat_observer_strategy(TObserver&& observer)
: base{std::make_shared<concat_state_t<TObservable, TObserver>>(std::move(observer))}
: base{init_state(std::move(observer))}
{
}

template<typename T>
void on_next(T&& v) const
{
ConcatStage current = ConcatStage::None;
if (base::state->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
base::state->handle_observable(std::forward<T>(v), base::state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource));
if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
base::disposable->handle_observable(std::forward<T>(v), base::disposable->add_ref());
else
base::state->get_queue()->push(std::forward<T>(v));
base::disposable->get_queue()->push(std::forward<T>(v));
}

void on_completed() const
{
base::refcounted.dispose();
if (base::state->get_disposable()->is_disposed())
base::state->get_observer()->on_completed();
if (base::disposable->is_disposed())
base::disposable->get_observer()->on_completed();
}


private:
static std::shared_ptr<concat_disposable<TObservable, TObserver>> init_state(TObserver&& observer)
{
const auto d = disposable_wrapper_impl<concat_disposable<TObservable, TObserver>>::make(std::move(observer));
auto ptr = d.lock();
ptr->get_observer()->set_upstream(d.as_weak());
return ptr;
Comment on lines +190 to +195

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure TObserver supports set_upstream method

In init_state, ptr->get_observer()->set_upstream(d.as_weak()); assumes that TObserver has a set_upstream method. Verify that all possible TObserver types support this method, or consider adding a constraint to enforce this requirement.

}
};

Expand Down
Loading