Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 78 additions & 17 deletions src/tests/test_interval.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,106 @@
SCENARIO("interval emit values with provided interval", "[interval]")
{
auto scheduler = test_scheduler{};
auto mock = mock_observer<size_t>{};
auto mock = mock_observer<size_t>{};
GIVEN("interval observable")
{
auto interval = std::chrono::seconds{ 1 };
auto obs = rpp::source::interval(interval, scheduler);
auto interval = std::chrono::seconds{1};
auto obs = rpp::source::interval(interval, scheduler);
auto initial_time = test_scheduler::worker_strategy::now();

WHEN("subscribe on it via take 3")
{
obs.take(3).subscribe(mock);
THEN("observer obtains sequence of values")
THEN("nothing happens immediately till scheduler advanced")
{
CHECK(mock.get_received_values() == std::vector<size_t>{0, 1, 2});
CHECK(mock.get_received_values() == std::vector<size_t>{});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval});
CHECK(scheduler.get_executions().empty());
}
AND_WHEN("move time in advance on interval once")
{
scheduler.time_advance(interval);
THEN("observer obtains first value")
{
CHECK(mock.get_received_values() == std::vector<size_t>{0});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
THEN("interval schedules schedulable with provided interval")
{
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval,
initial_time + 2*interval});
CHECK(scheduler.get_executions() == std::vector{initial_time+interval});
}
}
THEN("interval schedules schedulable with provided interval")
AND_WHEN("move time in advance on interval enough amount of time")
{
CHECK(scheduler.get_schedulings() == std::vector{ s_current_time + interval, s_current_time + 2*interval, s_current_time + 3*interval});
for (size_t i = 0; i < 5; ++i)
scheduler.time_advance(interval);

THEN("observer obtains sequence of values")
{
CHECK(mock.get_received_values() == std::vector<size_t>{0, 1, 2});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
THEN("interval schedules schedulable with provided interval")
{
CHECK(scheduler.get_executions() == std::vector{ initial_time + interval,
initial_time + 2*interval,
initial_time + 3*interval});
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + interval,
initial_time + 2*interval,
initial_time + 3*interval,
initial_time + 4*interval});
}
}
}
}

GIVEN("interval observable with initial delay")
{
auto initial_delay = std::chrono::seconds{ 2 };
auto interval = std::chrono::seconds{ 1 };
auto obs = rpp::source::interval(initial_delay, interval, scheduler);
auto initial_delay = std::chrono::seconds{2};
auto interval = std::chrono::seconds{1};
auto obs = rpp::source::interval(initial_delay, interval, scheduler);
auto initial_time = test_scheduler::worker_strategy::now();

WHEN("subscribe on it via take 3")
{
obs.take(3).subscribe(mock);
THEN("observer obtains sequence of values")
THEN("nothing happens immediately till scheduler advanced")
{
CHECK(mock.get_received_values() == std::vector<size_t>{0, 1, 2});
CHECK(mock.get_received_values() == std::vector<size_t>{});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + initial_delay});
CHECK(scheduler.get_executions().empty());
}
THEN("interval schedules schedulable with provided interval")

AND_WHEN("move time in advance on interval enough amount of time")
{
CHECK(scheduler.get_schedulings() == std::vector{ s_current_time + initial_delay, s_current_time + initial_delay + interval, s_current_time + initial_delay + 2 * interval });
for (size_t i = 0; i < 5; ++i)
scheduler.time_advance(interval);
scheduler.time_advance(interval);
THEN("observer obtains sequence of values")
{
CHECK(mock.get_received_values() == std::vector<size_t>{0, 1, 2});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
THEN("interval schedules schedulable with provided interval")
{
CHECK(scheduler.get_schedulings() == std::vector{ initial_time + initial_delay,
initial_time + initial_delay + interval,
initial_time + initial_delay + 2 * interval,
initial_time + initial_delay + 3 * interval });
CHECK(scheduler.get_executions() == std::vector{ initial_time + initial_delay,
initial_time + initial_delay + interval,
initial_time + initial_delay + 2 * interval });
}
}
}
}
}
}
10 changes: 7 additions & 3 deletions src/tests/test_observe_on.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]"
{
auto mock = mock_observer<std::string>{};
auto scheduler = test_scheduler{};
auto initial_time = test_scheduler::worker_strategy::now();

GIVEN("observable with item")
{
Expand All @@ -38,7 +39,8 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]"

CHECK(mock.get_received_values() == vals);
CHECK(mock.get_on_completed_count() == 1);
CHECK(scheduler.get_schedulings() == std::vector{ s_current_time, s_current_time, s_current_time });//2 items + on_completed
CHECK(scheduler.get_schedulings() == std::vector{ initial_time, initial_time, initial_time });//2 items + on_completed
CHECK(scheduler.get_executions() == std::vector{ initial_time, initial_time, initial_time });//2 items + on_completed
}
}
}
Expand All @@ -56,7 +58,8 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]"

CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
CHECK(scheduler.get_schedulings() == std::vector{ s_current_time });
CHECK(scheduler.get_schedulings() == std::vector{ initial_time });
CHECK(scheduler.get_executions() == std::vector{ initial_time });

}
}
Expand All @@ -76,7 +79,8 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]"
CHECK(mock.get_total_on_next_count() == 0);
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
CHECK(scheduler.get_schedulings() == std::vector{ s_current_time });
CHECK(scheduler.get_schedulings() == std::vector{ initial_time });
CHECK(scheduler.get_executions() == std::vector{ initial_time });

}
}
Expand Down
80 changes: 59 additions & 21 deletions src/tests/test_scheduler.hpp
Original file line number Diff line number Diff line change
@@ -1,58 +1,96 @@
// ReactivePlusPlus library
//
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#pragma once

#include <rpp/schedulers.hpp>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ performance-unnecessary-value-param ⚠️
parameter state is passed by value and only copied once; consider moving it to avoid unnecessary copies

Suggested change
#include <utility>

static rpp::schedulers::time_point s_current_time{ std::chrono::seconds{0} };
static rpp::schedulers::time_point s_current_time{std::chrono::seconds{0}};

class test_scheduler final : public rpp::schedulers::details::scheduler_tag
{
public:
class worker_strategy
struct state
{
public:
worker_strategy(const rpp::subscription_base& sub,
std::shared_ptr<std::vector<rpp::schedulers::time_point>> schedulings)
: m_sub{ sub }
, m_schedulings{ schedulings } {}
state() = default;

void schedule(rpp::schedulers::time_point time_point,
rpp::schedulers::constraint::schedulable_fn auto&& fn)
{
schedulings.push_back(time_point);
queue.emplace(time_point,
static_cast<size_t>(rpp::schedulers::clock_type::now().time_since_epoch().count()),
std::forward<decltype(fn)>(fn));
}

void defer_at(rpp::schedulers::time_point time_point, rpp::schedulers::constraint::schedulable_fn auto&& fn) const
void drain()
{
while (m_sub.is_subscribed())
while (!queue.empty() && sub.is_subscribed())
{
m_schedulings->push_back(time_point);
auto time_point = queue.top().get_time_point();
if (time_point > s_current_time)
return;

auto fn = queue.top().extract_function();
queue.pop();

executions.push_back(s_current_time);
if (auto duration = fn())
time_point = std::max(now(), time_point + duration.value());
else
return;
schedule(std::max(s_current_time, time_point + duration.value()), std::move(fn));
}
}

rpp::subscription_base sub{};
std::vector<rpp::schedulers::time_point> schedulings{};
std::vector<rpp::schedulers::time_point> executions{};
std::priority_queue<rpp::schedulers::details::schedulable<std::function<rpp::schedulers::optional_duration()>>> queue{};
};

class worker_strategy
{
public:
worker_strategy(std::shared_ptr<state> state)
: m_state{state} { }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ performance-unnecessary-value-param ⚠️
parameter state is passed by value and only copied once; consider moving it to avoid unnecessary copies

Suggested change
: m_state{state} { }
: m_state{std::move(state} { }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ performance-unnecessary-value-param ⚠️
parameter state is passed by value and only copied once; consider moving it to avoid unnecessary copies

Suggested change
: m_state{state} { }
: m_state{state)} { }


void defer_at(rpp::schedulers::time_point time_point,
rpp::schedulers::constraint::schedulable_fn auto&& fn) const
{
if (m_state->sub.is_subscribed())
{
m_state->schedule(time_point, std::forward<decltype(fn)>(fn));
m_state->drain();
}
}

static rpp::schedulers::time_point now() { return s_current_time; }

private:
rpp::subscription_base m_sub;
std::shared_ptr<std::vector<rpp::schedulers::time_point>> m_schedulings;
std::shared_ptr<state> m_state;
};

test_scheduler() {}

rpp::schedulers::worker<worker_strategy> create_worker(const rpp::subscription_base& sub = {}) const
{
return rpp::schedulers::worker<worker_strategy>{sub, m_schedulings};
m_state->sub = sub;
return rpp::schedulers::worker<worker_strategy>{m_state};
}

const auto& get_schedulings() const { return *m_schedulings; }
const auto& get_schedulings() const { return m_state->schedulings; }
const auto& get_executions() const { return m_state->executions; }

void time_advance(rpp::schedulers::duration dur) const
{
s_current_time += dur;
m_state->drain();
}

private:
std::shared_ptr<std::vector<rpp::schedulers::time_point>> m_schedulings = std::make_shared<std::vector<rpp::schedulers::time_point>>();
};
std::shared_ptr<state> m_state = std::make_shared<state>();
};
28 changes: 22 additions & 6 deletions src/tests/test_schedulers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,34 @@ SCENARIO("scheduler's worker uses time")
GIVEN("test scheduler")
{
auto scheduler = test_scheduler{};
auto initial_time = test_scheduler::worker_strategy::now();

WHEN("Schedule action without time")
{
scheduler.create_worker().schedule([] { return rpp::schedulers::optional_duration{}; });
THEN("worker obtains schedulable once with current time")
{
CHECK(scheduler.get_schedulings() == std::vector{ s_current_time });
CHECK(scheduler.get_schedulings() == std::vector{ initial_time });
CHECK(scheduler.get_executions() == std::vector{ initial_time });
}
}
WHEN("Schedule action with some time")
{
auto time = rpp::schedulers::clock_type::now();
scheduler.create_worker().schedule(time, [] { return rpp::schedulers::optional_duration{}; });
THEN("worker obtains schedulable once with provided time")
auto dur = std::chrono::seconds{3};
scheduler.create_worker().schedule(initial_time+dur, [] { return rpp::schedulers::optional_duration{}; });
THEN("worker obtains schedulable once with provided time, but not execute till time advanced")
{
CHECK(scheduler.get_schedulings() == std::vector{ time });
CHECK(scheduler.get_schedulings() == std::vector{ initial_time+dur });
CHECK(scheduler.get_executions().empty());
AND_WHEN("time advanced")
{
scheduler.time_advance(dur+std::chrono::seconds{1});
THEN("scheduler executes schedulable")
{
CHECK(scheduler.get_executions() == std::vector{ initial_time+dur+std::chrono::seconds{1} });
}
}

}
}
WHEN("Schedule action with zero duration")
Expand Down Expand Up @@ -107,9 +120,12 @@ SCENARIO("scheduler's worker uses time")
return std::nullopt;
return dur;
});
for(size_t i = 0; i < 3;++i)
scheduler.time_advance(dur);

THEN("worker obtains schedulable three times with current time and time+diff")
{
CHECK(scheduler.get_schedulings() == std::vector{ s_current_time, s_current_time + dur , s_current_time + dur + dur });
CHECK(scheduler.get_schedulings() == std::vector{ initial_time, initial_time + dur , initial_time + dur + dur });
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/tests/test_subscribe_on.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ TEST_CASE("subscribe_on schedules job in another scheduler")
{
auto mock = mock_observer<int>{};
auto scheduler = test_scheduler{};
auto initial_time = test_scheduler::worker_strategy::now();

GIVEN("observable")
{
auto obs = rpp::source::create<int>([&](const auto& sub)
Expand All @@ -35,7 +37,8 @@ TEST_CASE("subscribe_on schedules job in another scheduler")
{
REQUIRE(mock.get_total_on_next_count() == 1);
REQUIRE(mock.get_on_completed_count() == 1);
REQUIRE(scheduler.get_schedulings() == std::vector{ s_current_time });
REQUIRE(scheduler.get_schedulings() == std::vector{ initial_time });
REQUIRE(scheduler.get_executions() == std::vector{ initial_time });
}
}
}
Expand All @@ -50,7 +53,8 @@ TEST_CASE("subscribe_on schedules job in another scheduler")
REQUIRE(mock.get_total_on_next_count() == 0);
REQUIRE(mock.get_on_error_count() == 1);
REQUIRE(mock.get_on_completed_count() == 0);
REQUIRE(scheduler.get_schedulings() == std::vector{ s_current_time });
REQUIRE(scheduler.get_schedulings() == std::vector{ initial_time });
REQUIRE(scheduler.get_executions() == std::vector{ initial_time });
}
}
}
Expand Down