diff --git a/.clang-format b/.clang-format
index 5f1d37131..0adc3b90f 100644
--- a/.clang-format
+++ b/.clang-format
@@ -63,6 +63,8 @@ DerivePointerAlignment: false
EmptyLineAfterAccessModifier: Never
EmptyLineBeforeAccessModifier: Always
ExperimentalAutoDetectBinPacking: true
+# AllowShortCompoundRequirementOnASingleLine: true
+RequiresExpressionIndentation: OuterScope
FixNamespaceComments: true
IncludeBlocks: Regroup
IncludeCategories:
diff --git a/.clang-tidy b/.clang-tidy
index c8f8f1c39..f969c9178 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -4,11 +4,11 @@ WarningsAsErrors: '*'
HeaderFilterRegex: './src/.*'
AnalyzeTemporaryDtors: false
FormatStyle: 'file'
-HeaderFileExtensions:
- - h
- - hpp
-ImplementationFileExtensions:
- - cpp
+# HeaderFileExtensions:
+# - h
+# - hpp
+# ImplementationFileExtensions:
+# - cpp
CheckOptions:
- { key: readability-identifier-naming.NamespaceCase, value: lower_case }
- { key: readability-identifier-naming.ClassCase, value: lower_case }
diff --git a/docs/readme.md b/docs/readme.md
index 2a5efa0a1..b9a6d53d4 100644
--- a/docs/readme.md
+++ b/docs/readme.md
@@ -143,7 +143,7 @@ RPP follows this contract and especially this part. It means, that:
2. Any user-provided callbacks (for operators or observers) can be not thread-safe due to thread-safety of observable is guaranteed.
For example: internal logic of `take` operator doesn't use mutexes or atomics due to underlying observable **MUST** emit items serially
3. When you implement your own operator via `create` be careful to **follow this contract**!
-4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use serialized_subject instead if can't guarantee serial emissions!
+4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use serialized_* instead if can't guarantee serial emissions!
It means, that for example:
diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp
index 16b60c059..479790949 100644
--- a/src/rpp/rpp/observables/connectable_observable.hpp
+++ b/src/rpp/rpp/observables/connectable_observable.hpp
@@ -68,12 +68,14 @@ namespace rpp
*
* @ingroup observables
*/
- template
+ template
class connectable_observable final : public decltype(std::declval().get_observable())
{
using base = decltype(std::declval().get_observable());
public:
+ static_assert(rpp::constraint::subject);
+
connectable_observable(const OriginalObservable& original_observable, const Subject& subject = Subject{})
: base{subject.get_observable()}
, m_original_observable{original_observable}
diff --git a/src/rpp/rpp/observables/fwd.hpp b/src/rpp/rpp/observables/fwd.hpp
index f882a001b..7411fdca4 100644
--- a/src/rpp/rpp/observables/fwd.hpp
+++ b/src/rpp/rpp/observables/fwd.hpp
@@ -10,7 +10,6 @@
#pragma once
#include
-#include
#include
#include
@@ -70,7 +69,7 @@ namespace rpp::constraint
namespace rpp
{
- template
+ template
class connectable_observable;
} // namespace rpp
diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp
index c8e4bfd68..89bb5ee32 100644
--- a/src/rpp/rpp/operators/concat.hpp
+++ b/src/rpp/rpp/operators/concat.hpp
@@ -90,7 +90,7 @@ namespace rpp::operators::details
{
auto queue = get_queue();
if (queue->empty())
- return {};
+ return std::nullopt;
auto observable = queue->front();
queue->pop();
return observable;
diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp
index a9e05bee8..fdc059082 100644
--- a/src/rpp/rpp/operators/details/forwarding_subject.hpp
+++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp
@@ -11,9 +11,10 @@
#include
+#include
#include
#include
-#include
+#include
#include
#include
@@ -21,13 +22,13 @@
namespace rpp::operators::details
{
template
- class forwarding_strategy
+ class forwarding_subject
{
struct observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
- std::shared_ptr> state{};
+ std::shared_ptr> state{};
void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }
@@ -41,9 +42,9 @@ namespace rpp::operators::details
};
public:
- using expected_disposable_strategy = typename rpp::details::observables::deduce_disposable_strategy_t>::template add<1>;
+ using expected_disposable_strategy = typename rpp::details::observables::deduce_disposable_strategy_t>::template add<1>;
- explicit forwarding_strategy(disposable_wrapper_impl refcount)
+ explicit forwarding_subject(disposable_wrapper_impl refcount)
: m_refcount{std::move(refcount)}
{
}
@@ -53,12 +54,16 @@ namespace rpp::operators::details
return rpp::observer{m_state.lock()};
}
- template TObs>
- void on_subscribe(TObs&& observer) const
+ auto get_observable() const
{
- if (const auto locked = m_refcount.lock())
- observer.set_upstream(locked->add_ref());
- m_state.lock()->on_subscribe(std::forward(observer));
+ return subjects::details::create_subject_on_subscribe_observable([state = m_state.as_weak(), refcount = m_refcount] TObs>(TObs&& observer) {
+ if (const auto locked_state = state.lock())
+ {
+ if (const auto locked = refcount.lock())
+ observer.set_upstream(locked->add_ref());
+ locked_state->on_subscribe(std::forward(observer));
+ }
+ });
}
rpp::composite_disposable_wrapper get_disposable() const
@@ -67,10 +72,7 @@ namespace rpp::operators::details
}
private:
- disposable_wrapper_impl m_refcount;
- disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make();
+ disposable_wrapper_impl m_refcount;
+ disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make();
};
-
- template
- using forwarding_subject = subjects::details::base_subject>;
} // namespace rpp::operators::details
\ No newline at end of file
diff --git a/src/rpp/rpp/operators/details/strategy.hpp b/src/rpp/rpp/operators/details/strategy.hpp
index 4b6ba35d9..c6bdb1097 100644
--- a/src/rpp/rpp/operators/details/strategy.hpp
+++ b/src/rpp/rpp/operators/details/strategy.hpp
@@ -46,7 +46,7 @@ namespace rpp::operators::details
static auto apply(Observer&& observer, const Args&... vals)
{
static_assert(rpp::constraint::observer_of_type, typename Operator::template operator_traits::result_type>);
- return rpp::observer::template observer_strategy>>{std::forward(observer), vals...};
+ return rpp::observer::template observer_strategy>>{std::forward(observer), vals...}; // NOLINT
}
private:
diff --git a/src/rpp/rpp/sources/create.hpp b/src/rpp/rpp/sources/create.hpp
index b65f15253..da1bdd27c 100644
--- a/src/rpp/rpp/sources/create.hpp
+++ b/src/rpp/rpp/sources/create.hpp
@@ -18,7 +18,8 @@ namespace rpp::details
template OnSubscribe>
struct create_strategy
{
- using value_type = Type;
+ using value_type = Type;
+ using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t;
RPP_NO_UNIQUE_ADDRESS OnSubscribe subscribe;
};
diff --git a/src/rpp/rpp/subjects.hpp b/src/rpp/rpp/subjects.hpp
index 3e5791c32..bf5ed2934 100644
--- a/src/rpp/rpp/subjects.hpp
+++ b/src/rpp/rpp/subjects.hpp
@@ -18,4 +18,4 @@
*/
#include
-#include
\ No newline at end of file
+#include
\ No newline at end of file
diff --git a/src/rpp/rpp/subjects/details/base_subject.hpp b/src/rpp/rpp/subjects/details/base_subject.hpp
deleted file mode 100644
index be2fe52ab..000000000
--- a/src/rpp/rpp/subjects/details/base_subject.hpp
+++ /dev/null
@@ -1,62 +0,0 @@
-// ReactivePlusPlus library
-//
-// Copyright Aleksey Loginov 2022 - present.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// https://www.boost.org/LICENSE_1_0.txt)
-//
-// Project home: https://github.com/victimsnino/ReactivePlusPlus
-
-#pragma once
-
-#include
-#include
-
-#include
-
-namespace rpp::subjects::details
-{
- template Strategy>
- class base_subject
- {
- struct on_subscribe
- {
- using value_type = T;
- using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t;
-
- Strategy strategy;
-
- template TObs>
- void subscribe(TObs&& sub) const
- {
- strategy.on_subscribe(std::forward(sub));
- }
- };
-
- public:
- template
- requires (rpp::constraint::is_constructible_from && !rpp::constraint::variadic_decayed_same_as)
- explicit base_subject(Args&&... args)
- : m_strategy{std::forward(args)...}
- {
- }
-
- auto get_observer() const
- {
- return m_strategy.get_observer();
- }
-
- auto get_observable() const
- {
- return rpp::observable{m_strategy};
- }
-
- auto get_disposable() const
- {
- return m_strategy.get_disposable();
- }
-
- private:
- Strategy m_strategy{};
- };
-} // namespace rpp::subjects::details
diff --git a/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp
new file mode 100644
index 000000000..06841aa02
--- /dev/null
+++ b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp
@@ -0,0 +1,32 @@
+// ReactivePlusPlus library
+//
+// Copyright Aleksey Loginov 2022 - present.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// https://www.boost.org/LICENSE_1_0.txt)
+//
+// Project home: https://github.com/victimsnino/ReactivePlusPlus
+
+#pragma once
+
+#include
+
+#include
+
+namespace rpp::subjects::details
+{
+ template OnSubscribe, typename DisposableStrategy>
+ struct subject_on_subscribe_strategy
+ {
+ using value_type = Type;
+ using expected_disposable_strategy = DisposableStrategy;
+
+ RPP_NO_UNIQUE_ADDRESS OnSubscribe subscribe;
+ };
+
+ template OnSubscribe>
+ auto create_subject_on_subscribe_observable(OnSubscribe&& on_subscribe)
+ {
+ return rpp::observable, DisposableStrategy>>(std::forward(on_subscribe));
+ }
+} // namespace rpp::subjects::details
diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp
index 742662a1c..de624fa1e 100644
--- a/src/rpp/rpp/subjects/details/subject_state.hpp
+++ b/src/rpp/rpp/subjects/details/subject_state.hpp
@@ -35,9 +35,9 @@ namespace rpp::subjects::details
{
};
- template
- class subject_state : public std::enable_shared_from_this>
- , public composite_disposable
+ template
+ class subject_state : public composite_disposable
+ , public rpp::details::enable_wrapper_from_this>
{
using shared_observers = std::shared_ptr>>;
using state_t = std::variant;
@@ -45,6 +45,8 @@ namespace rpp::subjects::details
public:
using expected_disposable_strategy = rpp::details::observables::atomic_fixed_disposable_strategy_selector<1>;
+ subject_state() = default;
+
template TObs>
void on_subscribe(TObs&& observer)
{
@@ -72,21 +74,28 @@ namespace rpp::subjects::details
void on_next(const Type& v)
{
+ std::lock_guard lock{m_serialized_mutex};
if (const auto observers = extract_observers_under_lock_if_there())
rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_next(v); });
}
void on_error(const std::exception_ptr& err)
{
- if (const auto observers = exchange_observers_under_lock_if_there(err))
- rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); });
+ {
+ std::lock_guard lock{m_serialized_mutex};
+ if (const auto observers = exchange_observers_under_lock_if_there(err))
+ rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); });
+ }
dispose();
}
void on_completed()
{
- if (const auto observers = exchange_observers_under_lock_if_there(completed{}))
- rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer::on_completed>{});
+ {
+ std::lock_guard lock{m_serialized_mutex};
+ if (const auto observers = exchange_observers_under_lock_if_there(completed{}))
+ rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer::on_completed>{});
+ }
dispose();
}
@@ -99,7 +108,7 @@ namespace rpp::subjects::details
void set_upstream(rpp::dynamic_observer& obs)
{
obs.set_upstream(rpp::disposable_wrapper{make_callback_disposable(
- [weak = this->weak_from_this()]() noexcept // NOLINT(bugprone-exception-escape)
+ [weak = this->wrapper_from_this().as_weak()]() noexcept // NOLINT(bugprone-exception-escape)
{
if (const auto shared = weak.lock())
{
@@ -163,7 +172,8 @@ namespace rpp::subjects::details
}
private:
- state_t m_state{};
- std::mutex m_mutex{};
+ state_t m_state{};
+ std::mutex m_mutex{};
+ RPP_NO_UNIQUE_ADDRESS std::conditional_t m_serialized_mutex{};
};
} // namespace rpp::subjects::details
\ No newline at end of file
diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp
index 816bb0c13..22daa29ed 100644
--- a/src/rpp/rpp/subjects/fwd.hpp
+++ b/src/rpp/rpp/subjects/fwd.hpp
@@ -10,53 +10,48 @@
#pragma once
#include
+#include
#include
#include
#include
-namespace rpp::subjects::details
+namespace rpp::subjects
{
- namespace constraint
- {
- template
- concept subject_strategy = requires(Strategy t, rpp::details::observers::fake_observer&& obs) {
- {
- t.get_observer()
- } -> rpp::constraint::observer;
- t.on_subscribe(std::move(obs));
- {
- t.get_disposable()
- } -> rpp::constraint::decayed_any_of;
- };
- } // namespace constraint
- template Strategy>
- class base_subject;
-
template
- class publish_strategy;
+ class publish_subject;
template
- class serialized_strategy;
-} // namespace rpp::subjects::details
+ class serialized_publish_subject;
+
-namespace rpp::subjects
-{
template
- class publish_subject;
+ class replay_subject;
template
- class serialized_subject;
-} // namespace rpp::subjects
+ class serialized_replay_subject;
-namespace rpp::subjects::utils
-{
- template
- using extract_subject_type_t = typename rpp::utils::extract_base_type_params_t::template type_at_index_t<0>;
-} // namespace rpp::subjects::utils
+
+} // namespace rpp::subjects
namespace rpp::constraint
{
template
- concept subject = rpp::utils::is_base_of_v;
+ concept subject = requires(const T& subj) {
+ {
+ subj.get_observer()
+ } -> rpp::constraint::observer;
+ {
+ subj.get_observable()
+ } -> rpp::constraint::observable;
+ {
+ subj.get_disposable()
+ } -> rpp::constraint::decayed_any_of;
+ };
} // namespace rpp::constraint
+
+namespace rpp::subjects::utils
+{
+ template
+ using extract_subject_type_t = rpp::utils::extract_observer_type_t().get_observer())>;
+} // namespace rpp::subjects::utils
diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp
index 2063dcfc7..2237768f8 100644
--- a/src/rpp/rpp/subjects/publish_subject.hpp
+++ b/src/rpp/rpp/subjects/publish_subject.hpp
@@ -13,21 +13,19 @@
#include
#include
-#include
+#include
#include
-#include
-
namespace rpp::subjects::details
{
- template
- class publish_strategy
+ template
+ class publish_subject_base
{
struct observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
- std::shared_ptr> state{};
+ std::shared_ptr> state{};
void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }
@@ -41,17 +39,18 @@ namespace rpp::subjects::details
};
public:
- using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>;
+ using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>;
+
+ publish_subject_base() = default;
auto get_observer() const
{
return rpp::observer{m_state.lock()};
}
- template TObs>
- void on_subscribe(TObs&& observer) const
+ auto get_observable() const
{
- m_state.lock()->on_subscribe(std::forward(observer));
+ return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { state.lock()->on_subscribe(std::forward(observer)); });
}
rpp::disposable_wrapper get_disposable() const
@@ -60,10 +59,9 @@ namespace rpp::subjects::details
}
private:
- disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make();
+ disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make();
};
} // namespace rpp::subjects::details
-
namespace rpp::subjects
{
/**
@@ -71,7 +69,7 @@ namespace rpp::subjects
*
* @details Each observer obtains only values which emitted after corresponding subscribe. on_error/on_completer/unsubscribe cached and provided to new observers if any
*
- * @warning this subject is not synchronized/serialized! It means, that expected to call callbacks of observer in the serialized way to follow observable contract: "Observables must issue notifications to observers serially (not in parallel).". If you are not sure or need extra serialization, please, use serialized_subject.
+ * @warning this subject is not synchronized/serialized! It means, that expected to call callbacks of observer in the serialized way to follow observable contract: "Observables must issue notifications to observers serially (not in parallel).". If you are not sure or need extra serialization, please, use serialized_publish_subject.
*
* @tparam Type value provided by this subject
*
@@ -79,9 +77,23 @@ namespace rpp::subjects
* @see https://reactivex.io/documentation/subject.html
*/
template
- class publish_subject final : public details::base_subject>
+ class publish_subject final : public details::publish_subject_base
+ {
+ public:
+ using details::publish_subject_base::publish_subject_base;
+ };
+
+ /**
+ * @brief Serialized version of rpp::subjects::publish_subject
+ * @details When you are using ordinary rpp::subjects::publish_subject, then you must take care not to call its on_next method (or its other on methods) in async way.
+ *
+ * @ingroup subjects
+ * @see https://reactivex.io/documentation/subject.html
+ */
+ template
+ class serialized_publish_subject final : public details::publish_subject_base
{
public:
- using details::base_subject>::base_subject;
+ using details::publish_subject_base::publish_subject_base;
};
} // namespace rpp::subjects
\ No newline at end of file
diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp
index 6d26faf8d..c86477520 100644
--- a/src/rpp/rpp/subjects/replay_subject.hpp
+++ b/src/rpp/rpp/subjects/replay_subject.hpp
@@ -12,172 +12,128 @@
#include
#include
+#include
#include
-#include
+#include
#include
-#include
-#include
+#include
#include
namespace rpp::subjects::details
{
template
- class replay_strategy
+ class replay_subject_base
{
- struct replay_state final : public subject_state
+ struct replay_state final : public subject_state
{
- replay_state(std::optional count, std::optional duration)
- : count(count)
- , duration(duration)
+ replay_state(size_t limit = std::numeric_limits::max(), rpp::schedulers::duration duration_limit = std::numeric_limits::max())
+ : m_limit(limit)
+ , m_duration_limit(duration_limit)
{
}
- auto collect_duration()
+ void add_value(const Type& v)
{
- if (duration.has_value())
- {
- auto now = rpp::schedulers::clock_type::now();
- while (!values.empty() && (now - values.front().second > duration.value()))
- {
- values.pop_front();
- }
- return now;
- }
- return rpp::schedulers::clock_type::time_point{};
+ std::unique_lock lock{m_values_mutex};
+ while (m_values.size() >= m_limit)
+ m_values.pop_front();
+
+ m_values.emplace_back(v, deduce_timepoint());
}
- void collect_bound()
+ struct value_with_time
{
- if (count.has_value())
+ value_with_time(const Type& v, rpp::schedulers::clock_type::time_point timepoint)
+ : value{v}
+ , timepoint{timepoint}
{
- if (values.size() == count.value())
- {
- values.pop_front();
- }
}
+
+ Type value;
+ rpp::schedulers::clock_type::time_point timepoint;
+ };
+
+
+ std::deque get_actual_values()
+ {
+ std::unique_lock lock{m_values_mutex};
+ deduce_timepoint();
+ return m_values;
}
- template
- void collect(T&& v)
+ private:
+ rpp::schedulers::clock_type::time_point deduce_timepoint()
{
- std::unique_lock lock{list_mutex};
- collect_bound();
- const auto time_point = collect_duration();
+ if (std::numeric_limits::max() == m_duration_limit)
+ return rpp::schedulers::clock_type::time_point{};
- values.emplace_back(std::forward(v), time_point);
+ auto now = rpp::schedulers::clock_type::now();
+ while (!m_values.empty() && (now - m_values.front().timepoint > m_duration_limit))
+ m_values.pop_front();
+ return now;
}
- std::optional count;
- std::optional duration;
-
- std::list> values{};
+ private:
+ std::mutex m_values_mutex{};
+ std::deque m_values{};
- std::mutex list_mutex{};
- std::mutex serialized_mutex{};
+ const size_t m_limit;
+ const rpp::schedulers::duration m_duration_limit;
};
struct observer_strategy
{
- std::shared_ptr state;
+ using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
- template
- void collect_and_on_next(T&& v) const
- requires Serialized
- {
- state->collect(std::forward(v));
-
- std::unique_lock lock{state->serialized_mutex};
- state->on_next(state->values.back().first);
- }
-
- template
- void collect_and_on_next(T&& v) const
- {
- state->collect(std::forward(v));
- state->on_next(state->values.back().first);
- }
+ std::shared_ptr state;
void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }
- bool is_disposed() const noexcept
- {
- return state->is_disposed();
- }
-
- void on_next(Type&& v) const
- {
- collect_and_on_next(std::move(v));
- }
+ bool is_disposed() const noexcept { return state->is_disposed(); }
void on_next(const Type& v) const
{
- collect_and_on_next(v);
- }
-
- void on_error(const std::exception_ptr& err) const
- requires Serialized
- {
- std::unique_lock lock{state->serialized_mutex};
- state->on_error(err);
+ state->add_value(v);
+ state->on_next(v);
}
- void on_error(const std::exception_ptr& err) const
- {
- state->on_error(err);
- }
-
- void on_completed() const
- requires Serialized
- {
- std::unique_lock lock{state->serialized_mutex};
- state->on_completed();
- }
+ void on_error(const std::exception_ptr& err) const { state->on_error(err); }
- void on_completed() const
- {
- state->on_completed();
- }
+ void on_completed() const { state->on_completed(); }
};
public:
- replay_strategy()
- : m_state(disposable_wrapper_impl::make(std::nullopt, std::nullopt))
+ using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>;
+
+ replay_subject_base()
+ : m_state{disposable_wrapper_impl::make()}
{
}
- replay_strategy(size_t count)
- : m_state{disposable_wrapper_impl::make(std::max(1, count), std::nullopt)}
+ replay_subject_base(size_t count)
+ : m_state{disposable_wrapper_impl::make(std::max(1, count))}
{
}
- replay_strategy(size_t count, rpp::schedulers::duration duration)
+ replay_subject_base(size_t count, rpp::schedulers::duration duration)
: m_state{disposable_wrapper_impl::make(std::max(1, count), duration)}
{
}
- using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
-
auto get_observer() const
{
return rpp::observer{m_state.lock()};
}
- template TObs>
- void on_subscribe(TObs&& observer) const
+ auto get_observable() const
{
- if (const auto locked = m_state.lock())
- {
- {
- std::unique_lock lock{locked->list_mutex};
- locked->collect_duration();
- for (const auto& value : locked->values)
- {
- observer.on_next(value.first);
- }
- }
+ return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) {
+ const auto locked = state.lock();
+ for (auto&& value : locked->get_actual_values())
+ observer.on_next(std::move(value.value));
locked->on_subscribe(std::forward(observer));
- }
+ });
}
rpp::disposable_wrapper get_disposable() const
@@ -204,22 +160,23 @@ namespace rpp::subjects
* @see https://reactivex.io/documentation/subject.html
*/
template
- class replay_subject final : public details::base_subject>
+ class replay_subject final : public details::replay_subject_base
{
public:
- using details::base_subject>::base_subject;
+ using details::replay_subject_base::replay_subject_base;
};
/**
* @brief Same as rpp::subjects::replay_subject but on_next/on_error/on_completed calls are serialized via mutex.
+ * @details When you are using ordinary rpp::subjects::replay_subject, then you must take care not to call its on_next method (or its other on methods) in async way.
*
* @ingroup subjects
* @see https://reactivex.io/documentation/subject.html
*/
template
- class serialized_replay_subject final : public details::base_subject>
+ class serialized_replay_subject final : public details::replay_subject_base
{
public:
- using details::base_subject>::base_subject;
+ using details::replay_subject_base::replay_subject_base;
};
} // namespace rpp::subjects
\ No newline at end of file
diff --git a/src/rpp/rpp/subjects/serialized_subject.hpp b/src/rpp/rpp/subjects/serialized_subject.hpp
deleted file mode 100644
index 32fce8914..000000000
--- a/src/rpp/rpp/subjects/serialized_subject.hpp
+++ /dev/null
@@ -1,101 +0,0 @@
-// ReactivePlusPlus library
-//
-// Copyright Aleksey Loginov 2022 - present.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// https://www.boost.org/LICENSE_1_0.txt)
-//
-// Project home: https://github.com/victimsnino/ReactivePlusPlus
-
-#pragma once
-
-#include
-
-#include
-#include
-#include
-#include
-
-#include
-
-namespace rpp::subjects::details
-{
- template
- class serialized_strategy
- {
- struct serialized_state final : public subject_state
- {
- std::mutex mutex{};
- };
-
- struct observer_strategy
- {
- using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
-
- std::shared_ptr state{};
-
- void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }
-
- bool is_disposed() const noexcept
- {
- return state->is_disposed();
- }
-
- void on_next(const Type& v) const
- {
- std::lock_guard lock{state->mutex};
- state->on_next(v);
- }
-
- void on_error(const std::exception_ptr& err) const
- {
- std::lock_guard lock{state->mutex};
- state->on_error(err);
- }
-
- void on_completed() const
- {
- std::lock_guard lock{state->mutex};
- state->on_completed();
- }
- };
-
- public:
- using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>;
-
- auto get_observer() const
- {
- return rpp::observer{m_state.lock()};
- }
-
- template TObs>
- void on_subscribe(TObs&& observer) const
- {
- m_state.lock()->on_subscribe(std::forward(observer));
- }
-
- rpp::composite_disposable_wrapper get_disposable() const
- {
- return m_state;
- }
-
- private:
- disposable_wrapper_impl m_state = disposable_wrapper_impl::make();
- };
-} // namespace rpp::subjects::details
-
-namespace rpp::subjects
-{
- /**
- * @brief Same as rpp::subjects::publish_subject, but on_next/on_error/on_completed calls are serialized via mutex
- *
- * @ingroup subjects
- * @see https://reactivex.io/documentation/subject.html
- */
- template
- class serialized_subject final : public details::base_subject>
- {
- public:
- using details::base_subject>::base_subject;
- };
-} // namespace rpp::subjects
\ No newline at end of file
diff --git a/src/rpp/rpp/utils/utils.hpp b/src/rpp/rpp/utils/utils.hpp
index 466cf4779..65c33865a 100644
--- a/src/rpp/rpp/utils/utils.hpp
+++ b/src/rpp/rpp/utils/utils.hpp
@@ -246,6 +246,13 @@ namespace rpp::utils
T m_value;
};
+ struct none_mutex
+ {
+ static constexpr void lock() {}
+ static constexpr void unlock() {}
+ static constexpr void try_lock() {}
+ };
+
#define RPP_CALL_DURING_CONSTRUCTION(...) RPP_NO_UNIQUE_ADDRESS rpp::utils::none _ = [&]() { \
__VA_ARGS__; \
return rpp::utils::none{}; \
diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp
index ad989827a..a50469f66 100644
--- a/src/tests/rpp/test_subjects.cpp
+++ b/src/tests/rpp/test_subjects.cpp
@@ -15,7 +15,6 @@
#include
#include
#include
-#include
#include "copy_count_tracker.hpp"
#include "mock_observer.hpp"
@@ -232,7 +231,7 @@ TEST_CASE("publish subject caches error/completed")
}
}
-TEMPLATE_TEST_CASE("serialized subjects handles race condition", "", rpp::subjects::serialized_subject, rpp::subjects::serialized_replay_subject)
+TEMPLATE_TEST_CASE("serialized subjects handles race condition", "", rpp::subjects::serialized_publish_subject, rpp::subjects::serialized_replay_subject)
{
auto subj = TestType{};
@@ -398,16 +397,16 @@ TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp
{
auto sub = TestType{};
- sub.get_observable().subscribe([](const copy_count_tracker& tracker) {
- CHECK(tracker.get_copy_count() == 0);
- CHECK(tracker.get_move_count() == 1); // 1 move to internal replay buffer
+ sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT
+ CHECK(tracker.get_copy_count() == 2); // 1 copy to internal replay buffer + 1 copy to this observer
+ CHECK(tracker.get_move_count() == 0);
});
sub.get_observer().on_next(copy_count_tracker{});
- sub.get_observable().subscribe([](const copy_count_tracker& tracker) {
- CHECK(tracker.get_copy_count() == 0);
- CHECK(tracker.get_move_count() == 1); // 1 move to internal replay buffer
+ sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT
+ CHECK(tracker.get_copy_count() == 2 + 1); // + 1 copy values from buffer for this observer
+ CHECK(tracker.get_move_count() == 0 + 1); // + 1 move to this observer
});
}
@@ -416,16 +415,16 @@ TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp
copy_count_tracker tracker{};
auto sub = TestType{};
- sub.get_observable().subscribe([](const copy_count_tracker& tracker) {
- CHECK(tracker.get_copy_count() == 1); // 1 copy to internal replay buffer
+ sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT
+ CHECK(tracker.get_copy_count() == 2); // 1 copy to internal replay buffer + 1 copy to this observer
CHECK(tracker.get_move_count() == 0);
});
sub.get_observer().on_next(tracker);
- sub.get_observable().subscribe([](const copy_count_tracker& tracker) {
- CHECK(tracker.get_copy_count() == 1); // 1 copy to internal replay buffer
- CHECK(tracker.get_move_count() == 0);
+ sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT
+ CHECK(tracker.get_copy_count() == 2 + 1); // + 1 copy values from buffer for this observer
+ CHECK(tracker.get_move_count() == 0 + 1); // + 1 move to this observer
});
}
}
\ No newline at end of file