Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/rpp/rpp/operators/retry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ namespace rpp::operators::details
return;
}

state->clear();

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
return;

Expand Down Expand Up @@ -89,7 +91,6 @@ namespace rpp::operators::details
{
if (state->count)
--state->count.value();
state->clear();
state->is_inside_drain.store(true, std::memory_order::seq_cst);
try
{
Expand Down
5 changes: 3 additions & 2 deletions src/rpp/rpp/operators/retry_when.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ namespace rpp::operators::details
void on_next(T&&) const
{
locally_disposed = true;
state->clear();

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
return;

drain<TObserver, TObservable, TNotifier>(state);
}

Expand All @@ -74,7 +76,7 @@ namespace rpp::operators::details
state->observer.on_completed();
}

void set_upstream(const disposable_wrapper& d) { state->add(d); }
void set_upstream(const disposable_wrapper& d) const { state->add(d); }

bool is_disposed() const { return locally_disposed || state->is_disposed(); }
};
Expand Down Expand Up @@ -121,7 +123,6 @@ namespace rpp::operators::details
{
while (!state->is_disposed())
{
state->clear();
state->is_inside_drain.store(true, std::memory_order::seq_cst);
try
{
Expand Down
3 changes: 2 additions & 1 deletion src/rpp/rpp/sources/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ namespace rpp::details
void on_completed() const
{
locally_disposed = true;
state->clear();

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
return;

Expand All @@ -95,7 +97,6 @@ namespace rpp::details
}

using value_type = rpp::utils::extract_observable_type_t<utils::iterable_value_t<PackedContainer>>;
state->clear();
state->is_inside_drain.store(true, std::memory_order::seq_cst);
try
{
Expand Down
15 changes: 15 additions & 0 deletions src/tests/rpp/test_concat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,21 @@ TEST_CASE("concat as operator async completiton")
subj.get_observer().on_completed();
}

TEST_CASE("concat disposes on looping")
{
mock_observer<int> mock{};
REQUIRE_CALL(*mock, on_next_rvalue(1));
REQUIRE_CALL(*mock, on_completed());

rpp::source::concat(rpp::source::create<int>([](auto&& subscriber) {
auto d = rpp::composite_disposable_wrapper::make();
subscriber.set_upstream(d);
subscriber.on_next(1);
subscriber.on_completed();
CHECK(d.is_disposed());
})) | rpp::ops::subscribe(mock);
}

TEST_CASE("concat doesn't produce extra copies")
{
copy_count_tracker tracker{};
Expand Down
2 changes: 1 addition & 1 deletion src/tests/rpp/test_repeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ TEST_CASE("repeat doesn't produce extra copies")

TEST_CASE("repeat satisfies disposable contracts")
{
test_operator_with_disposable<int>(rpp::ops::repeat());
test_operator_with_disposable<int>(rpp::ops::repeat(1));
}


Expand Down
16 changes: 16 additions & 0 deletions src/tests/rpp/test_retry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,22 @@ TEST_CASE("retry handles stack overflow")
| rpp::operators::subscribe(mock);
}

TEST_CASE("retry disposes on looping")
{
mock_observer<int> mock{};
REQUIRE_CALL(*mock, on_next_rvalue(1)).TIMES(2);
REQUIRE_CALL(*mock, on_error(trompeloeil::_));

rpp::source::concat(rpp::source::create<int>([](auto&& subscriber) {
auto d = rpp::composite_disposable_wrapper::make();
subscriber.set_upstream(d);
subscriber.on_next(1);
subscriber.on_error({});
CHECK(d.is_disposed());
})) | rpp::ops::retry(1)
| rpp::ops::subscribe(mock);
}

TEST_CASE("retry doesn't produce extra copies")
{
SECTION("retry(2)")
Expand Down
39 changes: 25 additions & 14 deletions src/tests/rpp/test_retry_when.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,24 @@ TEST_CASE("repeat_when does not stack overflow")
| rpp::operators::subscribe(mock);
}

TEST_CASE("retry_when disposes on looping")
{
mock_observer<int> mock{};
REQUIRE_CALL(*mock, on_next_rvalue(1)).TIMES(2);
REQUIRE_CALL(*mock, on_error(trompeloeil::_));

size_t i = 0;

rpp::source::concat(rpp::source::create<int>([](auto&& subscriber) {
auto d = rpp::composite_disposable_wrapper::make();
subscriber.set_upstream(d);
subscriber.on_next(1);
subscriber.on_error({});
CHECK(d.is_disposed());
})) | rpp::ops::retry_when([&i](const std::exception_ptr& e) { return i++ ? rpp::source::error<int>(e).as_dynamic() : rpp::source::just(1).as_dynamic(); })
| rpp::ops::subscribe(mock);
}

TEST_CASE("retry_when doesn't produce extra copies")
{
SECTION("retry_when(empty_notifier)")
Expand All @@ -264,20 +282,13 @@ TEST_CASE("retry_when doesn't produce extra copies")

TEST_CASE("retry_when satisfies disposable contracts")
{
auto observable_disposable = rpp::composite_disposable_wrapper::make();
{
auto observable = observable_with_disposable<int>(observable_disposable);
auto op = rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty<int>(); });

test_operator_with_disposable<int>(op);
test_operator_finish_before_dispose<int>(op);
test_operator_with_disposable<int>(rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty<int>(); }));
test_operator_finish_before_dispose<int>(rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty<int>(); }));

test_operator_over_observable_with_disposable<int>(
[](auto observable) {
return rpp::source::concat(observable, rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{"error"})))
| rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); });
});
}

CHECK((observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2));
test_operator_over_observable_with_disposable<int>(
[](auto observable) {
return rpp::source::concat(observable, rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{"error"})))
| rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); });
});
}
20 changes: 20 additions & 0 deletions src/tests/utils/disposable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,26 @@ void test_operator_over_observable_with_disposable(auto&& op)
CHECK(observable_disposable.is_disposed());
}

SECTION("operator disposes disposable on_error")
{
op(rpp::source::create<T>([](auto&& obs) {
const auto d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d);
obs.on_error({});
CHECK(d.is_disposed());
})).subscribe([](const auto&) {}, [](const std::exception_ptr&) {});
}

SECTION("operator disposes disposable on_completed")
{
op(rpp::source::create<T>([](auto&& obs) {
const auto d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d);
obs.on_completed();
CHECK(d.is_disposed());
})).subscribe([](const auto&) {}, [](const std::exception_ptr&) {});
}

SECTION("set_upstream with fixed_disposable_strategy_selector<1>")
{
CHECK_NOTHROW(op(rpp::observable<T, wrapped_observable_strategy_set_upstream<T, rpp::details::observables::fixed_disposable_strategy_selector<1>>>{})
Expand Down