From a096744c90a9f85875a97b62de58dace00d1bcab Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 22 Dec 2023 16:51:12 +0300 Subject: [PATCH] Extend doc with examples --- src/examples/rpp/doxygen/as_blocking.cpp | 20 ++++++++ src/examples/rpp/doxygen/connect.cpp | 49 +++++++++++++++++++ src/examples/rpp/doxygen/ref_count.cpp | 19 +++++-- .../rpp/observables/blocking_observable.hpp | 14 ++++-- .../observables/connectable_observable.hpp | 12 +++++ .../rpp/observables/dynamic_observable.hpp | 8 +++ src/rpp/rpp/observables/fwd.hpp | 18 ------- .../rpp/observables/grouped_observable.hpp | 10 ++++ src/rpp/rpp/operators/as_blocking.hpp | 3 ++ 9 files changed, 129 insertions(+), 24 deletions(-) create mode 100644 src/examples/rpp/doxygen/as_blocking.cpp create mode 100644 src/examples/rpp/doxygen/connect.cpp diff --git a/src/examples/rpp/doxygen/as_blocking.cpp b/src/examples/rpp/doxygen/as_blocking.cpp new file mode 100644 index 000000000..d21b5a006 --- /dev/null +++ b/src/examples/rpp/doxygen/as_blocking.cpp @@ -0,0 +1,20 @@ +#include "rpp/sources/fwd.hpp" +#include + +#include + +/** + * \example as_blocking.cpp + **/ + +int main() // NOLINT(bugprone-exception-escape) +{ + //! [as_blocking] + rpp::source::just(1) + | rpp::operators::delay(std::chrono::seconds{1}, rpp::schedulers::new_thread{}) // <-- emit from another thread with delay + | rpp::operators::as_blocking() + | rpp::operators::subscribe([](int){}, [](){std::cout << "COMPLETED" << std::endl; }); + std::cout << "done" << std::endl; + // output: COMPLETED done + //! [as_blocking] +} \ No newline at end of file diff --git a/src/examples/rpp/doxygen/connect.cpp b/src/examples/rpp/doxygen/connect.cpp new file mode 100644 index 000000000..2f3932f40 --- /dev/null +++ b/src/examples/rpp/doxygen/connect.cpp @@ -0,0 +1,49 @@ +#include "rpp/sources/fwd.hpp" +#include + +#include + +/** + * \example connect.cpp + **/ + +int main() // NOLINT(bugprone-exception-escape) +{ + //! [connect] + const auto observable = rpp::source::interval(std::chrono::milliseconds{50}, rpp::schedulers::new_thread{}) + | rpp::ops::map([](int v) { + std::cout << "value in map" << v << std::endl; + return v; + }) + | rpp::ops::publish(); + + std::cout << "CONNECT" << std::endl; + auto d = observable.connect(); // subscribe happens right now + + std::this_thread::sleep_for(std::chrono::milliseconds{150}); + + std::cout << "SUBSCRIBE" << std::endl; + observable.subscribe([](int v) { std::cout << "observer value " << v << std::endl; }); + + std::this_thread::sleep_for(std::chrono::milliseconds{150}); + + d.dispose(); + std::cout << "DISPOSE" << std::endl; + + std::this_thread::sleep_for(std::chrono::milliseconds{150}); + + // possible output: + // CONNECT + // value in map0 + // value in map1 + // value in map2 + // SUBSCRIBE + // value in map3 + // observer value 3 + // value in map4 + // observer value 4 + // value in map5 + // observer value 5 + // DISPOSE + //! [connect] +} \ No newline at end of file diff --git a/src/examples/rpp/doxygen/ref_count.cpp b/src/examples/rpp/doxygen/ref_count.cpp index 9165e5284..967ff6b26 100644 --- a/src/examples/rpp/doxygen/ref_count.cpp +++ b/src/examples/rpp/doxygen/ref_count.cpp @@ -8,18 +8,31 @@ int main() // NOLINT(bugprone-exception-escape) { { //! [ref_count] - auto observable = rpp::source::just(1, 2, 3) | rpp::operators::multicast(); + auto observable = rpp::source::create([](const auto& observer) + { + std::cout << "SUBSCRIBE" << std::endl; + for(int i =0; i < 3; ++i) { + observer.on_next(i); + } + observer.on_completed(); + }) | rpp::operators::multicast(); + + std::cout << "subscribe first" << std::endl; observable.subscribe([](int v) {std::cout << "#1 " << v << std::endl; }); // No Output + std::cout << "subscribe with ref_count" << std::endl; observable.ref_count().subscribe([](int v) {std::cout << "#2 " << v << std::endl; }); // Output: + // subscribe first + // subscribe with ref_count + // SUBSCRIBE + // #1 0 + // #2 0 // #1 1 // #2 1 // #1 2 // #2 2 - // #1 3 - // #2 3 //! [ref_count] } diff --git a/src/rpp/rpp/observables/blocking_observable.hpp b/src/rpp/rpp/observables/blocking_observable.hpp index 59ab3b249..619dc017c 100644 --- a/src/rpp/rpp/observables/blocking_observable.hpp +++ b/src/rpp/rpp/observables/blocking_observable.hpp @@ -22,13 +22,13 @@ namespace rpp::details::observables class blocking_disposble final : public base_disposable { public: - void wait() + void wait() { std::unique_lock lock{m_mutex}; m_cv.wait(lock, [this] { return m_completed; }); } - void dispose_impl() noexcept override + void dispose_impl() noexcept override { { std::lock_guard lock{m_mutex}; @@ -66,7 +66,7 @@ class blocking_strategy auto d = std::make_shared(); obs.set_upstream(d); m_original.subscribe(std::move(obs)); - + if (!d->is_disposed()) d->wait(); } @@ -78,6 +78,14 @@ class blocking_strategy namespace rpp { +/** + * @brief Extension over rpp::observable with set of blocking operators - it waits till completion of underlying observable + * + * @par Example: + * @snippet as_blocking.cpp as_blocking + * + * @ingroup observables + */ template Strategy> class blocking_observable : public observable> { public: diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp index 7ebe6da0c..a73eada12 100644 --- a/src/rpp/rpp/observables/connectable_observable.hpp +++ b/src/rpp/rpp/observables/connectable_observable.hpp @@ -63,6 +63,11 @@ struct ref_count_on_subscribe_t class connectable_observable final : public decltype(std::declval().get_observable()) { @@ -83,6 +88,13 @@ class connectable_observable final : public decltype(std::declval().get { } + /** + * @brief Connects to underlying observable right-now making it hot-observable + * + * @par Example: + * @snippet connect.cpp connect + * + */ rpp::disposable_wrapper connect(rpp::composite_disposable_wrapper wrapper = {}) const { std::unique_lock lock(m_state->mutex); diff --git a/src/rpp/rpp/observables/dynamic_observable.hpp b/src/rpp/rpp/observables/dynamic_observable.hpp index 0e4e6f376..614c39814 100644 --- a/src/rpp/rpp/observables/dynamic_observable.hpp +++ b/src/rpp/rpp/observables/dynamic_observable.hpp @@ -77,6 +77,14 @@ class dynamic_strategy final namespace rpp { +/** + * @brief Type-erased version of the `rpp::observable`. Any observable can be converted to dynamic_observable via `rpp::observable::as_dynamic` member function. + * @details To provide type-erasure it uses `std::shared_ptr`. As a result it has worse performance. + * + * @tparam Type of value this obsevalbe can provide + * + * @ingroup observables + */ template class dynamic_observable : public observable> { public: diff --git a/src/rpp/rpp/observables/fwd.hpp b/src/rpp/rpp/observables/fwd.hpp index 25769c0a9..89d304b17 100644 --- a/src/rpp/rpp/observables/fwd.hpp +++ b/src/rpp/rpp/observables/fwd.hpp @@ -42,30 +42,12 @@ class observable_chain_strategy; template Strategy> class observable; -/** - * @brief Type-erased version of the `rpp::observable`. Any observable can be converted to dynamic_observable via `rpp::observable::as_dynamic` member function. - * @details To provide type-erasure it uses `std::shared_ptr`. As a result it has worse performance. - * - * @tparam Type of value this obsevalbe can provide - * - * @ingroup observables - */ template class dynamic_observable; -/** - * @brief `rpp::blocking_observable` blocks `subscribe` call till on_completed/on_error happens. - */ template Strategy> class blocking_observable; -/** - * @brief Extension over raw observable which also has `get_key()` member function. Used in `group_by` operator to represent grouped observable - * - * @tparam KeyType is type of key - * @tparam Type of value this obsevalbe can provide - * @tparam Strategy is observable strategy - */ template Strategy> class grouped_observable; } diff --git a/src/rpp/rpp/observables/grouped_observable.hpp b/src/rpp/rpp/observables/grouped_observable.hpp index b12b726ef..a394b5d6b 100644 --- a/src/rpp/rpp/observables/grouped_observable.hpp +++ b/src/rpp/rpp/observables/grouped_observable.hpp @@ -13,6 +13,16 @@ namespace rpp { + +/** + * @brief Extension over rpp::observable for some "subset" of values from original observable grouped by some key. It has `get_key()` member function. Used in `group_by` operator to represent grouped observable + * + * @tparam KeyType is type of key + * @tparam Type of value this obsevalbe can provide + * @tparam Strategy is observable strategy + * + * @ingroup observables + */ template Strategy> class grouped_observable final : public observable { diff --git a/src/rpp/rpp/operators/as_blocking.hpp b/src/rpp/rpp/operators/as_blocking.hpp index 8075963f3..262e72d1d 100644 --- a/src/rpp/rpp/operators/as_blocking.hpp +++ b/src/rpp/rpp/operators/as_blocking.hpp @@ -39,6 +39,9 @@ namespace rpp::operators * @brief Converts `rpp::observable` to `rpp::blocking_observable` * @details `rpp::blocking_observable` blocks `subscribe` call till on_completed/on_error happens. * + * @par Example: + * @snippet as_blocking.cpp as_blocking + * * @ingroup utility_operators */ inline auto as_blocking()