Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions src/rpp/rpp/operators/window_toggle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ struct window_toggle_state

struct state_t
{
RPP_NO_UNIQUE_ADDRESS TObserver observer;
mutable std::list<decltype(std::declval<Subject>().get_observer())> observers{};
RPP_NO_UNIQUE_ADDRESS TObserver observer;
std::list<decltype(std::declval<Subject>().get_observer())> observers{};
};

window_toggle_state(TObserver&& observer, const TClosingsSelectorFn& closings)
Expand All @@ -58,10 +58,10 @@ struct window_toggle_state

auto on_new_subject(const Subject& subject)
{
const auto locked_state = get_state_under_lock();
auto ptr = &locked_state->observers.emplace_back(subject.get_observer());
auto locked_state = get_state_under_lock();
const auto itr = locked_state->observers.insert(locked_state->observers.cend(), subject.get_observer());
locked_state->observer.on_next(subject.get_observable());
return ptr;
return itr;
}

private:
Expand All @@ -72,10 +72,12 @@ struct window_toggle_state
template<rpp::constraint::decayed_type TState>
struct window_toggle_closing_observer_strategy
{
std::shared_ptr<rpp::refcount_disposable> disposable;
std::shared_ptr<TState> state;
rpp::disposable_wrapper subject_disposable;
decltype(std::declval<TState>().on_new_subject(std::declval<typename TState::Subject>())) ptr;
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

std::shared_ptr<rpp::refcount_disposable> disposable;
std::shared_ptr<TState> state;
rpp::composite_disposable_wrapper this_disposable;
decltype(std::declval<TState>().on_new_subject(std::declval<typename TState::Subject>())) itr;

void on_next(const auto&) const
{
Expand All @@ -84,22 +86,25 @@ struct window_toggle_closing_observer_strategy

void on_error(const std::exception_ptr& err) const
{
const auto locked_state = state->get_state_under_lock();
auto locked_state = state->get_state_under_lock();
for (const auto& obs : locked_state->observers)
obs.on_error(err);
locked_state->observer.on_error(err);
}

void on_completed() const
{
disposable->remove(subject_disposable);
ptr->on_completed();

const auto locked_state= state->get_state_under_lock();
locked_state->observers.remove_if([ptr=ptr](const auto& obs) { return &obs == ptr; });
disposable->remove(this_disposable);

itr->on_completed();
this_disposable.dispose();

auto locked_state= state->get_state_under_lock();
locked_state->observers.erase(itr);
}
static void set_upstream(const disposable_wrapper&) { }
static bool is_disposed() { return false; }

void set_upstream(const disposable_wrapper& d) const { this_disposable.add(d); }
bool is_disposed() const { return this_disposable.is_disposed(); }
};

template<rpp::constraint::decayed_type TState>
Expand All @@ -112,9 +117,10 @@ struct window_toggle_opening_observer_strategy
void on_next(T&& v) const
{
typename TState::Subject subject{disposable->wrapper_from_this()};
auto ptr = state->on_new_subject(subject);
const auto itr = state->on_new_subject(subject);

disposable->add(subject.get_disposable());
state->get_closing(std::forward<T>(v)).subscribe(subject.get_disposable(), window_toggle_closing_observer_strategy<TState>{disposable, state, subject.get_disposable(), std::move(ptr)});
state->get_closing(std::forward<T>(v)).subscribe(window_toggle_closing_observer_strategy<TState>{disposable, state, subject.get_disposable(), itr});
}

void on_error(const std::exception_ptr& err) const
Expand Down
75 changes: 44 additions & 31 deletions src/tests/rpp/test_window_toggle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
TEST_CASE("window_toggle")
{
mock_observer_strategy<rpp::window_toggle_observable<int>> mock{};
std::vector<mock_observer_strategy<int>> inner_mocks{};
std::vector<mock_observer_strategy<int>> inner_mocks{};

auto subscribe_mocks = [&mock, &inner_mocks](const auto& observable)
{
Expand All @@ -40,9 +40,9 @@ TEST_CASE("window_toggle")

SECTION("opening - just(1), closing - never()")
{
subscribe_mocks(rpp::source::just(1,2,3)
subscribe_mocks(rpp::source::just(1,2,3)
| rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::never<int>();}));

CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
REQUIRE(inner_mocks.size() == 1);
Expand All @@ -55,9 +55,9 @@ TEST_CASE("window_toggle")
}
SECTION("opening - just(1), closing - empty()")
{
subscribe_mocks(rpp::source::just(1,2,3)
subscribe_mocks(rpp::source::just(1,2,3)
| rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::empty<int>();}));

CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
REQUIRE(inner_mocks.size() == 1);
Expand All @@ -70,9 +70,9 @@ TEST_CASE("window_toggle")
}
SECTION("opening - just(1,2,3), closing - empty()")
{
subscribe_mocks(rpp::source::just(1,2,3)
subscribe_mocks(rpp::source::just(1,2,3)
| rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::empty<int>();}));

CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
REQUIRE(inner_mocks.size() == 3);
Expand All @@ -85,9 +85,9 @@ TEST_CASE("window_toggle")
}
SECTION("opening - just(1,2,3), closing - never()")
{
subscribe_mocks(rpp::source::just(1,2,3)
subscribe_mocks(rpp::source::just(1,2,3)
| rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::never<int>();}));

CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
REQUIRE(inner_mocks.size() == 3);
Expand All @@ -102,9 +102,9 @@ TEST_CASE("window_toggle")
}
SECTION("opening - just(1,2,3), closing - just(1)")
{
subscribe_mocks(rpp::source::just(1,2,3)
subscribe_mocks(rpp::source::just(1,2,3)
| rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);}));

CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
REQUIRE(inner_mocks.size() == 3);
Expand All @@ -119,9 +119,9 @@ TEST_CASE("window_toggle")
}
SECTION("opening - never(), closing - just(1)")
{
subscribe_mocks(rpp::source::never<int>()
subscribe_mocks(rpp::source::never<int>()
| rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);}));

CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
REQUIRE(inner_mocks.size() == 3);
Expand All @@ -133,34 +133,34 @@ TEST_CASE("window_toggle")
}
SECTION("opening - empty(), closing - just(1)")
{
subscribe_mocks(rpp::source::empty<int>()
subscribe_mocks(rpp::source::empty<int>()
| rpp::ops::window_toggle(rpp::source::just(1,2,3), [](int){return rpp::source::just(1);}));

CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
REQUIRE(inner_mocks.size() == 0);
}
SECTION("source - error")
{
subscribe_mocks(rpp::source::error<int>({})
subscribe_mocks(rpp::source::error<int>({})
| rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never<int>(); }));

CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}
SECTION("openings - error")
{
subscribe_mocks(rpp::source::never<int>()
subscribe_mocks(rpp::source::never<int>()
| rpp::ops::window_toggle(rpp::source::error<int>({}), [](int){return rpp::source::never<int>(); }));

CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}
SECTION("openings - just(1), closings - error")
{
subscribe_mocks(rpp::source::never<int>()
subscribe_mocks(rpp::source::never<int>()
| rpp::ops::window_toggle(rpp::source::just(1), [](int){return rpp::source::error<int>({}); }));

CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
REQUIRE(inner_mocks.size() == 1);
Expand All @@ -172,9 +172,9 @@ TEST_CASE("window_toggle")
}
SECTION("openings - just(1), closings - throw")
{
subscribe_mocks(rpp::source::never<int>()
subscribe_mocks(rpp::source::never<int>()
| rpp::ops::window_toggle(rpp::source::just(1), [](int){ throw std::runtime_error{""}; return rpp::source::error<int>({}); }));

CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
REQUIRE(inner_mocks.size() == 1);
Expand All @@ -188,36 +188,49 @@ TEST_CASE("window_toggle")

TEST_CASE("window_toggle disposes original disposable only when everything is disposed")
{
auto source_disposable = rpp::composite_disposable_wrapper::make();
auto obs = rpp::source::create<int>([source_disposable](auto&& obs)

auto make_observable = [](auto d, bool emit = true)
{
obs.set_upstream(source_disposable);
obs.on_next(1);
});
return rpp::source::create<int>([d, emit](auto&& obs)
{
obs.set_upstream(d);
if (emit)
obs.on_next(1);
});
};

auto source_disposable = rpp::composite_disposable_wrapper::make();
auto opening_disposable = rpp::composite_disposable_wrapper::make();
auto closing_disposable = rpp::composite_disposable_wrapper::make();

auto observer_disposable = rpp::composite_disposable_wrapper::make();
auto inner_observer_disposable = rpp::composite_disposable_wrapper::make();
obs
| rpp::ops::window_toggle(rpp::source::just(rpp::schedulers::immediate{}, 1), [](int){return rpp::source::never<int>(); })
make_observable(source_disposable)
| rpp::ops::window_toggle(make_observable(opening_disposable) , [&](int){return make_observable(closing_disposable, false); })
| rpp::ops::subscribe(rpp::composite_disposable_wrapper{observer_disposable}, [inner_observer_disposable](const rpp::window_toggle_observable<int>& new_obs)
{
new_obs.subscribe(rpp::composite_disposable_wrapper{inner_observer_disposable}, [](int){});
});

CHECK(closing_disposable.is_disposed());
CHECK(!source_disposable.is_disposed());
CHECK(!observer_disposable.is_disposed());
CHECK(!inner_observer_disposable.is_disposed());

observer_disposable.dispose();

CHECK(!source_disposable.is_disposed());
CHECK(observer_disposable.is_disposed());
CHECK(!opening_disposable.is_disposed());
CHECK(closing_disposable.is_disposed());
CHECK(!source_disposable.is_disposed());
CHECK(!inner_observer_disposable.is_disposed());

inner_observer_disposable.dispose();

CHECK(source_disposable.is_disposed());
CHECK(observer_disposable.is_disposed());
CHECK(opening_disposable.is_disposed());
CHECK(closing_disposable.is_disposed());
CHECK(inner_observer_disposable.is_disposed());
}

Expand Down