Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/examples/rpp/doxygen/window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ int main()
//! [window]
rpp::source::just(1, 2, 3, 4, 5)
| rpp::operators::window(3)
| rpp::operators::subscribe([](const rpp::windowed_observable<int>& v)
| rpp::operators::subscribe([](const rpp::window_observable<int>& v)
{
std::cout << "\nNew observable " << std::endl;
v.subscribe([](int v) {std::cout << v << " "; });
Expand Down
45 changes: 45 additions & 0 deletions src/examples/rpp/doxygen/window_toggle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include <rpp/rpp.hpp>

#include <iostream>

/**
* @example window_toggle.cpp
**/
int main()
{

//! [window_toggle]
size_t counter{};
auto source = rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 3, 4, 5) | rpp::operators::publish() | rpp::operators::ref_count();
source
| rpp::operators::window_toggle(source, [source](int){ return source | rpp::ops::filter([](int v) { return v % 2 == 0; }); })
| rpp::operators::subscribe([&counter](const rpp::window_toggle_observable<int>& obs)
{
std::cout << "New observable " << ++counter << std::endl;
obs.subscribe([counter](int v) {std::cout << counter << ": " << v << " " << std::endl; }, [counter]() { std::cout << "closing " << counter << std::endl; });
});
// Output:
// New observable 1
// 1: 1
// New observable 2
// 1: 2
// 2: 2
// closing 1
// New observable 3
// 2: 3
// 3: 3
// New observable 4
// 2: 4
// 3: 4
// 4: 4
// closing 2
// closing 3
// New observable 5
// 4: 5
// 5: 5
// closing 4
// closing 5
//! [window_toggle]

return 0;
}
2 changes: 1 addition & 1 deletion src/rpp/rpp/observables/connectable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct ref_count_on_subscribe_t;
template<rpp::constraint::observable OriginalObservable, rpp::constraint::subject Subject>
struct ref_count_on_subscribe_t<rpp::connectable_observable<OriginalObservable, Subject>>
{
rpp::connectable_observable<OriginalObservable, Subject> original_observable{};
rpp::connectable_observable<OriginalObservable, Subject> original_observable;

struct state_t
{
Expand Down
16 changes: 15 additions & 1 deletion src/rpp/rpp/observables/observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,24 @@ class observable
[[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(observer<Type, ObserverStrategy>&& observer) const
{
if (!observer.is_disposed())
m_strategy.subscribe(rpp::composite_disposable_wrapper{std::make_shared<rpp::composite_disposable_impl<typename expected_disposable_strategy::disposable_container>>()}, std::move(observer));
return m_strategy.subscribe(rpp::composite_disposable_wrapper{std::make_shared<rpp::composite_disposable_impl<typename expected_disposable_strategy::disposable_container>>()}, std::move(observer));
return {};
}

/**
* @brief Subscribes observer strategy to emissions from this observable.
*
* @details This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.
* @warning This overloading has some performance penalties, use it only when you really need to use disposable
* @return composite_disposable_wrapper is disposable to be able to dispose observer when it needed
*/
template<constraint::observer_strategy<Type> ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
[[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(ObserverStrategy&& observer_strategy) const
{
return subscribe(rpp::composite_disposable_wrapper{std::make_shared<rpp::composite_disposable_impl<typename expected_disposable_strategy::disposable_container>>()}, std::forward<ObserverStrategy>(observer_strategy));
}

/**
* @brief Subscribe passed observer to emissions from this observable.
*
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <rpp/operators/scan.hpp>
#include <rpp/operators/subscribe.hpp>
#include <rpp/operators/window.hpp>
#include <rpp/operators/window_toggle.hpp>

/**
* @defgroup filtering_operators Filtering Operators
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ struct combine_latest_t
template<rpp::constraint::observer Observer, typename... Strategies>
void subscribe(Observer&& observer, const observable_chain_strategy<Strategies...>& observable_strategy) const
{
// Need to take ownership over current_thread in case of inner-observables also uses them
// Need to take ownership over current_thread in case of inner-observables also using it
auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
observables.apply(&subscribe_impl<Observer, Strategies...>, std::forward<Observer>(observer), observable_strategy, selector);
}
Expand Down
4 changes: 2 additions & 2 deletions src/rpp/rpp/operators/details/forwarding_subject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ class forwarding_strategy
m_state->on_subscribe(std::forward<TObs>(observer));
}

rpp::disposable_wrapper get_disposable() const
rpp::composite_disposable_wrapper get_disposable() const
{
return rpp::disposable_wrapper::from_weak(m_state);
return rpp::composite_disposable_wrapper::from_weak(m_state);
}

private:
Expand Down
3 changes: 0 additions & 3 deletions src/rpp/rpp/operators/details/strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
#include <rpp/utils/tuple.hpp>
#include <rpp/utils/utils.hpp>

#include <exception>
#include <variant>

namespace rpp::operators::details
{
template<typename SubscribeStrategy, rpp::constraint::decayed_type... Args>
Expand Down
4 changes: 4 additions & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ template<rpp::constraint::observable TObservable, rpp::constraint::observable...
auto with_latest_from(TObservable&& observable, TObservables&&... observables);

auto window(size_t count);

template<rpp::constraint::observable TOpeningsObservable, typename TClosingsSelectorFn>
requires rpp::constraint::observable<std::invoke_result_t<TClosingsSelectorFn, rpp::utils::extract_observable_type_t<TOpeningsObservable>>>
auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector);
} // namespace rpp::operators

namespace rpp
Expand Down
4 changes: 2 additions & 2 deletions src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ struct merge_t
template<rpp::constraint::observer Observer, typename... Strategies>
void subscribe(Observer&& observer, const observable_chain_strategy<Strategies...>& strategy) const
{
// Need to take ownership over current_thread in case of inner-observables also uses them
// Need to take ownership over current_thread in case of inner-observables also using it
auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();

using InnerObservable = typename observable_chain_strategy<Strategies...>::value_type;
Expand All @@ -160,7 +160,7 @@ struct merge_with_t
{
merge_observer_strategy<std::decay_t<Observer>> strategy{std::forward<Observer>(observer)};

// Need to take ownership over current_thread in case of inner-observables also uses them
// Need to take ownership over current_thread in case of inner-observables also using it
auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();

strategy.on_next(observable_strategy);
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/take_until.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ struct take_until_t
auto d = std::make_shared<take_until_disposable<std::decay_t<Observer>>>(std::forward<Observer>(observer));
d->get_observer()->set_upstream(rpp::disposable_wrapper::from_weak(d));

// Need to take ownership over current_thread in case of inner-observables also uses them
// Need to take ownership over current_thread in case of inner-observables also using it
auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
observable.subscribe(take_until_throttle_observer_strategy<std::decay_t<Observer>>{d});

Expand Down
6 changes: 3 additions & 3 deletions src/rpp/rpp/operators/window.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace rpp
{
template<constraint::decayed_type Type>
using windowed_observable = decltype(std::declval<rpp::operators::details::forwarding_subject<Type>>().get_observable());
using window_observable = decltype(std::declval<rpp::operators::details::forwarding_subject<Type>>().get_observable());
}
namespace rpp::operators::details
{
Expand Down Expand Up @@ -105,7 +105,7 @@ class window_observer_strategy
struct window_t : public operators::details::operator_observable_strategy_different_types<window_observer_strategy, rpp::utils::types<>, size_t>
{
template<rpp::constraint::decayed_type T>
using result_value = windowed_observable<T>;
using result_value = window_observable<T>;

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;
Expand All @@ -115,7 +115,7 @@ struct window_t : public operators::details::operator_observable_strategy_differ
namespace rpp::operators
{
/**
* @brief Subdivide original observable into sub-observables (windowed observables) and emit sub-observables of items instead of original items
* @brief Subdivide original observable into sub-observables (window observables) and emit sub-observables of items instead of original items
*
* @marble window
{
Expand Down
Loading