Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions src/examples/doxygen/delay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,28 @@ int main()
{
//! [delay]

auto start = rpp::schedulers::clock_type::now();

rpp::source::just(1, 2, 3)
.do_on_next([](auto&& v)
.do_on_next([&](auto&& v)
{
auto emitting_time = rpp::schedulers::clock_type::now();
std::cout << "emit " << v << " in thread{" << std::this_thread::get_id() << "} at epoch time " << emitting_time.time_since_epoch().count() << std::endl;
std::cout << "emit " << v << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(emitting_time - start).count() << "s"<< std::endl;
})
.delay(std::chrono::seconds{3}, rpp::schedulers::new_thread{})
.as_blocking()
.subscribe([](int v)
.subscribe([&](int v)
{
auto observing_time = rpp::schedulers::clock_type::now();
std::cout << "observe " << v << " in thread{" << std::this_thread::get_id() << "} at epoch time " << observing_time.time_since_epoch().count() << std::endl;
std::cout << "observe " << v << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(observing_time - start).count() <<"s" << std::endl;
});
// Template for output:
// emit 1 in thread{281472967355984} at epoch time 9302615113068
// emit 2 in thread{281472967355984} at epoch time 9302615155151
// emit 3 in thread{281472967355984} at epoch time 9302615157860
// observe 1 in thread{281472962380144} at epoch time 9305618428153
// observe 2 in thread{281472962380144} at epoch time 9305618551778
// observe 3 in thread{281472962380144} at epoch time 9305618558236
// emit 1 in thread{11772} duration since start 0s
// emit 2 in thread{11772} duration since start 0s
// emit 3 in thread{11772} duration since start 0s
// observe 1 in thread{15516} duration since start 3s
// observe 2 in thread{15516} duration since start 3s
// observe 3 in thread{15516} duration since start 3s
//! [delay]
return 0;
}
43 changes: 16 additions & 27 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,28 @@
#pragma once

#include <rpp/defs.hpp>
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/operators/fwd/delay.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/utils/functors.hpp>

#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state

IMPLEMENTATION_FILE(delay_tag);

namespace rpp::details
{

/**
* Functor (type-erasure) of "delay" for on_next operator.
*/
struct delay_on_next
{
rpp::schedulers::duration delay;
schedulers::duration delay;

void operator()(auto &&value,
const auto &subscriber,
const auto &worker) const {
void operator()(auto&& value, const auto& subscriber, const auto& worker) const
{
worker.schedule(delay,
[value = std::forward<decltype(value)>(value), subscriber]() -> rpp::schedulers::optional_duration
[value = std::forward<decltype(value)>(value), subscriber]()
{
subscriber.on_next(std::move(value));
return std::nullopt;
return schedulers::optional_duration{};
});
}
};
Expand All @@ -47,17 +43,13 @@ struct delay_on_next
*/
struct delay_on_error
{
rpp::schedulers::duration delay;

void operator()(const std::exception_ptr &err,
const auto &subscriber,
const auto& worker) const
void operator()(const std::exception_ptr& err, const auto& subscriber, const auto& worker) const
{
worker.schedule(delay,
[err, subscriber]() -> rpp::schedulers::optional_duration
// on-error must be delivered as soon as possible
worker.schedule([err, subscriber]()
{
subscriber.on_error(err);
return std::nullopt;
return schedulers::optional_duration{};
});
}
};
Expand All @@ -67,13 +59,12 @@ struct delay_on_error
*/
struct delay_on_completed
{
rpp::schedulers::duration delay;
schedulers::duration delay;

void operator()(const auto& subscriber,
const auto& worker) const
void operator()(const auto& subscriber, const auto& worker) const
{
worker.schedule(delay,
[subscriber]() -> rpp::schedulers::optional_duration
[subscriber]()
{
subscriber.on_completed();
return schedulers::optional_duration{};
Expand All @@ -88,24 +79,22 @@ template<constraint::decayed_type Type, schedulers::constraint::scheduler TSched
struct delay_impl
{
RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
rpp::schedulers::duration delay;
schedulers::duration delay;

template<constraint::subscriber_of_type<Type> TSub>
auto operator()(TSub&& subscriber) const
{
// convert it to dynamic due to expected amount of copies == amount of items
auto dynamic_subscriber = std::forward<TSub>(subscriber).as_dynamic();
auto subscription = dynamic_subscriber.get_subscription().make_child();

auto worker = scheduler.create_worker(dynamic_subscriber.get_subscription());

auto subscription = dynamic_subscriber.get_subscription().make_child();
return create_subscriber_with_state<Type>(std::move(subscription),
delay_on_next{delay},
delay_on_error{delay},
delay_on_error{},
delay_on_completed{delay},
std::move(dynamic_subscriber),
std::move(worker));
}
};

} // namespace rpp::details
5 changes: 2 additions & 3 deletions src/rpp/rpp/operators/fwd/observe_on.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <rpp/observables/details/member_overload.hpp>
#include <rpp/schedulers/constraints.hpp>


namespace rpp::details
{
struct observe_on_tag;
Expand Down Expand Up @@ -43,13 +42,13 @@ struct member_overload<Type, SpecificObservable, observe_on_tag>
template<schedulers::constraint::scheduler TScheduler>
auto observe_on(TScheduler&& scheduler) const& requires is_header_included<observe_on_tag, TScheduler>
{
return cast_this()->template lift<Type>(observe_on_impl<Type, std::decay_t<TScheduler>>{std::forward<TScheduler>(scheduler)});
return cast_this()->delay(schedulers::duration{0}, std::forward<TScheduler>(scheduler));
}

template<schedulers::constraint::scheduler TScheduler>
auto observe_on(TScheduler&& scheduler) && requires is_header_included<observe_on_tag, TScheduler>
{
return move_this().template lift<Type>(observe_on_impl<Type, std::decay_t<TScheduler>>{std::forward<TScheduler>(scheduler)});
return move_this().delay(schedulers::duration{0}, std::forward<TScheduler>(scheduler));
}

private:
Expand Down
35 changes: 2 additions & 33 deletions src/rpp/rpp/operators/observe_on.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,9 @@

#pragma once

#include <rpp/defs.hpp>
#include <rpp/operators/delay.hpp>
#include <rpp/operators/fwd/observe_on.hpp>
#include <rpp/subscribers/constraints.hpp>

#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state

IMPLEMENTATION_FILE(observe_on_tag);

namespace rpp::details
{
using observe_on_on_next = delay_on_next;
using observe_on_on_error = delay_on_error;
using observe_on_on_completed = delay_on_completed;

template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
struct observe_on_impl
{
RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;

template<constraint::subscriber_of_type<Type> TSub>
auto operator()(TSub&& subscriber) const
{
// convert it to dynamic due to expected amount of copies == amount of items
auto dynamic_subscriber = std::forward<TSub>(subscriber).as_dynamic();

auto worker = scheduler.create_worker(dynamic_subscriber.get_subscription());

return create_subscriber_with_state<Type>(dynamic_subscriber.get_subscription().make_child(),
observe_on_on_next{rpp::schedulers::duration{0}},
observe_on_on_error{rpp::schedulers::duration{0}},
observe_on_on_completed{rpp::schedulers::duration{0}},
std::move(dynamic_subscriber),
std::move(worker));
}
};
} // namespace rpp::details
// we just need delay to completed this one
#include <rpp/operators/delay.hpp>
96 changes: 48 additions & 48 deletions src/tests/test_delay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,47 @@
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#include <catch2/catch_test_macros.hpp>
#include "mock_observer.hpp"

#include <catch2/catch_test_macros.hpp>
#include <rpp/operators/delay.hpp>
#include <rpp/schedulers/trampoline_scheduler.hpp>
#include <rpp/sources/empty.hpp>
#include <rpp/sources/error.hpp>
#include <rpp/sources/just.hpp>

#include "mock_observer.hpp"

SCENARIO("delay mirrors both source observable and trigger observable", "[delay]")
{
std::chrono::milliseconds delay_duration{300};

GIVEN("observable of -1-|")
{
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();

rpp::source::just(1)
.delay(delay_duration, rpp::schedulers::trampoline{})
.as_blocking()
.subscribe([&](auto&& v)
{
THEN("should see event after the delay")
.delay(delay_duration, rpp::schedulers::trampoline{})
.as_blocking()
.subscribe(
[&](auto&& v)
{
CHECK(rpp::schedulers::clock_type::now() >= now + std::chrono::duration_cast<rpp::schedulers::duration>(delay_duration));
}
THEN("should see event after the delay")
{
CHECK(rpp::schedulers::clock_type::now() >= now + delay_duration);
}

mock.on_next(v);
},
[&](const std::exception_ptr& err) { mock.on_error(err); },
[&]()
{
THEN("should see event after the delay")
mock.on_next(v);
},
[&](const std::exception_ptr& err) { mock.on_error(err); },
[&]()
{
CHECK(rpp::schedulers::clock_type::now() >= now + std::chrono::duration_cast<rpp::schedulers::duration>(delay_duration));
}
THEN("should see event after the delay")
{
CHECK(rpp::schedulers::clock_type::now() >= now + delay_duration);
}

mock.on_completed();
});
mock.on_completed();
});

THEN("should see -1-|")
{
Expand All @@ -60,22 +60,22 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay]

GIVEN("observable of -x")
{
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();

rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{""}))
.delay(delay_duration, rpp::schedulers::trampoline{})
.as_blocking()
.subscribe([&](auto&& v) { mock.on_next(v); },
[&](const std::exception_ptr& err)
{
THEN("should see event after the delay")
.delay(delay_duration, rpp::schedulers::trampoline{})
.as_blocking()
.subscribe([&](auto&& v) { mock.on_next(v); },
[&](const std::exception_ptr& err)
{
CHECK(rpp::schedulers::clock_type::now() >= now + std::chrono::duration_cast<rpp::schedulers::duration>(delay_duration));
}
mock.on_error(err);
},
[&]() { mock.on_completed(); });
THEN("should see event immediately")
{
CHECK(rpp::schedulers::clock_type::now() < now + delay_duration);
}
mock.on_error(err);
},
[&]() { mock.on_completed(); });

THEN("should see -x after the delay")
{
Expand All @@ -87,22 +87,22 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay]

GIVEN("observable of -|")
{
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();

rpp::source::empty<int>()
.delay(delay_duration, rpp::schedulers::trampoline{})
.as_blocking()
.subscribe([&](auto&& v) { mock.on_next(v); },
[&](const std::exception_ptr& err) { mock.on_error(err); },
[&]()
{
THEN("should see event after delay")
.delay(delay_duration, rpp::schedulers::trampoline{})
.as_blocking()
.subscribe([&](auto&& v) { mock.on_next(v); },
[&](const std::exception_ptr& err) { mock.on_error(err); },
[&]()
{
CHECK(rpp::schedulers::clock_type::now() >= now + std::chrono::duration_cast<rpp::schedulers::duration>(delay_duration));
}
mock.on_completed();
});
THEN("should see event after delay")
{
CHECK(rpp::schedulers::clock_type::now() >= now + delay_duration);
}
mock.on_completed();
});

THEN("should see -|")
{
Expand Down