Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/examples/rpp/doxygen/as_blocking.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "rpp/sources/fwd.hpp"
#include <rpp/rpp.hpp>

#include <iostream>

/**
* \example as_blocking.cpp
**/

int main() // NOLINT(bugprone-exception-escape)
{
//! [as_blocking]
rpp::source::just(1)
| rpp::operators::delay(std::chrono::seconds{1}, rpp::schedulers::new_thread{}) // <-- emit from another thread with delay
| rpp::operators::as_blocking()
| rpp::operators::subscribe([](int){}, [](){std::cout << "COMPLETED" << std::endl; });
std::cout << "done" << std::endl;
// output: COMPLETED done
//! [as_blocking]
}
49 changes: 49 additions & 0 deletions src/examples/rpp/doxygen/connect.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "rpp/sources/fwd.hpp"
#include <rpp/rpp.hpp>

#include <iostream>

/**
* \example connect.cpp
**/

int main() // NOLINT(bugprone-exception-escape)
{
//! [connect]
const auto observable = rpp::source::interval(std::chrono::milliseconds{50}, rpp::schedulers::new_thread{})
| rpp::ops::map([](int v) {
std::cout << "value in map" << v << std::endl;
return v;
})
| rpp::ops::publish();

std::cout << "CONNECT" << std::endl;
auto d = observable.connect(); // subscribe happens right now

std::this_thread::sleep_for(std::chrono::milliseconds{150});

std::cout << "SUBSCRIBE" << std::endl;
observable.subscribe([](int v) { std::cout << "observer value " << v << std::endl; });

std::this_thread::sleep_for(std::chrono::milliseconds{150});

d.dispose();
std::cout << "DISPOSE" << std::endl;

std::this_thread::sleep_for(std::chrono::milliseconds{150});

// possible output:
// CONNECT
// value in map0
// value in map1
// value in map2
// SUBSCRIBE
// value in map3
// observer value 3
// value in map4
// observer value 4
// value in map5
// observer value 5
// DISPOSE
//! [connect]
}
19 changes: 16 additions & 3 deletions src/examples/rpp/doxygen/ref_count.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,31 @@ int main() // NOLINT(bugprone-exception-escape)
{
{
//! [ref_count]
auto observable = rpp::source::just(1, 2, 3) | rpp::operators::multicast();
auto observable = rpp::source::create<int>([](const auto& observer)
{
std::cout << "SUBSCRIBE" << std::endl;
for(int i =0; i < 3; ++i) {
observer.on_next(i);
}
observer.on_completed();
}) | rpp::operators::multicast();

std::cout << "subscribe first" << std::endl;
observable.subscribe([](int v) {std::cout << "#1 " << v << std::endl; });
// No Output

std::cout << "subscribe with ref_count" << std::endl;
observable.ref_count().subscribe([](int v) {std::cout << "#2 " << v << std::endl; });
// Output:
// subscribe first
// subscribe with ref_count
// SUBSCRIBE
// #1 0
// #2 0
// #1 1
// #2 1
// #1 2
// #2 2
// #1 3
// #2 3

//! [ref_count]
}
Expand Down
14 changes: 11 additions & 3 deletions src/rpp/rpp/observables/blocking_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ namespace rpp::details::observables
class blocking_disposble final : public base_disposable
{
public:
void wait()
void wait()
{
std::unique_lock lock{m_mutex};
m_cv.wait(lock, [this] { return m_completed; });
}

void dispose_impl() noexcept override
void dispose_impl() noexcept override
{
{
std::lock_guard lock{m_mutex};
Expand Down Expand Up @@ -66,7 +66,7 @@ class blocking_strategy
auto d = std::make_shared<blocking_disposble>();
obs.set_upstream(d);
m_original.subscribe(std::move(obs));

if (!d->is_disposed())
d->wait();
}
Expand All @@ -78,6 +78,14 @@ class blocking_strategy

namespace rpp
{
/**
* @brief Extension over rpp::observable with set of blocking operators - it waits till completion of underlying observable
*
* @par Example:
* @snippet as_blocking.cpp as_blocking
*
* @ingroup observables
*/
template<constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class blocking_observable : public observable<Type, details::observables::blocking_strategy<Type, Strategy>> {
public:
Expand Down
12 changes: 12 additions & 0 deletions src/rpp/rpp/observables/connectable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ struct ref_count_on_subscribe_t<rpp::connectable_observable<OriginalObservable,

namespace rpp
{
/**
* @brief Extension over raw observable with ability to be manually connected at any time or ref_counting (sharing same observable between multiple observers)
*
* @ingroup observables
*/
template<rpp::constraint::observable OriginalObservable, rpp::constraint::subject Subject>
class connectable_observable final : public decltype(std::declval<Subject>().get_observable())
{
Expand All @@ -83,6 +88,13 @@ class connectable_observable final : public decltype(std::declval<Subject>().get
{
}

/**
* @brief Connects to underlying observable right-now making it hot-observable
*
* @par Example:
* @snippet connect.cpp connect
*
*/
rpp::disposable_wrapper connect(rpp::composite_disposable_wrapper wrapper = {}) const
{
std::unique_lock lock(m_state->mutex);
Expand Down
8 changes: 8 additions & 0 deletions src/rpp/rpp/observables/dynamic_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ class dynamic_strategy final

namespace rpp
{
/**
* @brief Type-erased version of the `rpp::observable`. Any observable can be converted to dynamic_observable via `rpp::observable::as_dynamic` member function.
* @details To provide type-erasure it uses `std::shared_ptr`. As a result it has worse performance.
*
* @tparam Type of value this obsevalbe can provide
*
* @ingroup observables
*/
template<constraint::decayed_type Type>
class dynamic_observable : public observable<Type, details::observables::dynamic_strategy<Type>> {
public:
Expand Down
18 changes: 0 additions & 18 deletions src/rpp/rpp/observables/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,12 @@ class observable_chain_strategy;
template<constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class observable;

/**
* @brief Type-erased version of the `rpp::observable`. Any observable can be converted to dynamic_observable via `rpp::observable::as_dynamic` member function.
* @details To provide type-erasure it uses `std::shared_ptr`. As a result it has worse performance.
*
* @tparam Type of value this obsevalbe can provide
*
* @ingroup observables
*/
template<constraint::decayed_type Type>
class dynamic_observable;

/**
* @brief `rpp::blocking_observable` blocks `subscribe` call till on_completed/on_error happens.
*/
template<constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class blocking_observable;

/**
* @brief Extension over raw observable which also has `get_key()` member function. Used in `group_by` operator to represent grouped observable
*
* @tparam KeyType is type of key
* @tparam Type of value this obsevalbe can provide
* @tparam Strategy is observable strategy
*/
template<constraint::decayed_type KeyType, constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class grouped_observable;
}
Expand Down
10 changes: 10 additions & 0 deletions src/rpp/rpp/observables/grouped_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@

namespace rpp
{

/**
* @brief Extension over rpp::observable for some "subset" of values from original observable grouped by some key. It has `get_key()` member function. Used in `group_by` operator to represent grouped observable
*
* @tparam KeyType is type of key
* @tparam Type of value this obsevalbe can provide
* @tparam Strategy is observable strategy
*
* @ingroup observables
*/
template<constraint::decayed_type KeyType, constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class grouped_observable final : public observable<Type, Strategy>
{
Expand Down
3 changes: 3 additions & 0 deletions src/rpp/rpp/operators/as_blocking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ namespace rpp::operators
* @brief Converts `rpp::observable` to `rpp::blocking_observable`
* @details `rpp::blocking_observable` blocks `subscribe` call till on_completed/on_error happens.
*
* @par Example:
* @snippet as_blocking.cpp as_blocking
*
* @ingroup utility_operators
*/
inline auto as_blocking()
Expand Down