Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rpp/rpp/disposables/details/base_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace rpp::details
}

protected:
virtual void base_dispose_impl(interface_disposable::Mode mode) noexcept = 0;
virtual void base_dispose_impl(interface_disposable::Mode) noexcept {};

private:
std::atomic_bool m_disposed{};
Expand Down
15 changes: 9 additions & 6 deletions src/rpp/rpp/operators/debounce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ namespace rpp::operators::details
{
std::shared_ptr<debounce_state<Observer, Worker>> state{};

bool is_disposed() const { return state->get_observer_under_lock()->is_disposed(); }
bool is_disposed() const { return state->is_disposed(); }

void on_error(const std::exception_ptr& err) const { state->get_observer_under_lock()->on_error(err); }
};

template<rpp::constraint::observer Observer, typename Worker>
class debounce_state final : public std::enable_shared_from_this<debounce_state<Observer, Worker>>
class debounce_state final : public rpp::details::enable_wrapper_from_this<debounce_state<Observer, Worker>>
, public rpp::details::base_disposable
{
using T = rpp::utils::extract_observer_type_t<Observer>;

Expand Down Expand Up @@ -79,7 +80,7 @@ namespace rpp::operators::details

return std::nullopt;
},
debounce_state_wrapper<Observer, Worker>{this->shared_from_this()});
debounce_state_wrapper<Observer, Worker>{this->wrapper_from_this().lock()});
}

std::variant<std::monostate, T, schedulers::time_point> extract_value_or_time()
Expand Down Expand Up @@ -120,7 +121,7 @@ namespace rpp::operators::details

bool is_disposed() const
{
return state->get_observer_under_lock()->is_disposed();
return state->is_disposed();
}

template<typename T>
Expand Down Expand Up @@ -153,7 +154,7 @@ namespace rpp::operators::details
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = Prev;
using updated_disposable_strategy = typename Prev::template add<1>;

rpp::schedulers::duration duration;
RPP_NO_UNIQUE_ADDRESS Scheduler scheduler;
Expand All @@ -163,7 +164,9 @@ namespace rpp::operators::details
{
using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;

auto ptr = std::make_shared<debounce_state<std::decay_t<Observer>, worker_t>>(std::forward<Observer>(observer), scheduler.create_worker(), duration);
auto d = rpp::disposable_wrapper_impl<debounce_state<std::decay_t<Observer>, worker_t>>::make(std::forward<Observer>(observer), scheduler.create_worker(), duration);
auto ptr = d.lock();
ptr->get_observer_under_lock()->set_upstream(d.as_weak());
Comment on lines +167 to +169

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure safe usage when locking disposable wrapper

In the lift method, after obtaining ptr via d.lock(), there is a potential for ptr to be null. To prevent a null pointer dereference, ensure that ptr is valid before using it.

Consider adding a check for ptr before dereferencing:

auto ptr = d.lock();
if (ptr)
{
    ptr->get_observer_under_lock()->set_upstream(d.as_weak());
    return rpp::observer<Type, debounce_observer_strategy<std::decay_t<Observer>, worker_t>>{std::move(ptr)};
}
else
{
    // Handle the null case appropriately
}

Alternatively, if you are confident that d.lock() will always return a valid pointer in this context, consider adding an assertion to document this assumption:

auto ptr = d.lock();
assert(ptr != nullptr);
ptr->get_observer_under_lock()->set_upstream(d.as_weak());

return rpp::observer<Type, debounce_observer_strategy<std::decay_t<Observer>, worker_t>>{std::move(ptr)};
}
};
Expand Down
10 changes: 7 additions & 3 deletions src/rpp/rpp/operators/details/combining_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
namespace rpp::operators::details
{
template<rpp::constraint::observer Observer>
class combining_state
class combining_state : public rpp::details::enable_wrapper_from_this<combining_state<Observer>>
, public rpp::details::base_disposable
{
public:
explicit combining_state(Observer&& observer, size_t on_completed_needed)
Expand Down Expand Up @@ -57,7 +58,7 @@ namespace rpp::operators::details

bool is_disposed() const
{
return state->get_observer_under_lock()->is_disposed();
return state->is_disposed();
}

void on_error(const std::exception_ptr& err) const
Expand Down Expand Up @@ -103,7 +104,10 @@ namespace rpp::operators::details
{
using State = TState<Observer, TSelector, Type, rpp::utils::extract_observable_type_t<TObservables>...>;

const auto state = std::make_shared<State>(std::forward<Observer>(observer), selector);
const auto d = rpp::disposable_wrapper_impl<State>::make(std::forward<Observer>(observer), selector);
auto state = d.lock();
state->get_observer_under_lock()->set_upstream(d.as_weak());

subscribe<std::decay_t<Type>>(state, std::index_sequence_for<TObservables...>{}, observables...);

return rpp::observer<Type, TStrategy<0, std::decay_t<Observer>, TSelector, Type, rpp::utils::extract_observable_type_t<TObservables>...>>{std::move(state)};
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace rpp::operators::details

bool is_disposed() const
{
return m_state->get_observer_under_lock()->is_disposed();
return m_state->get_disposable().is_disposed();
}

void on_error(const std::exception_ptr& err) const
Expand Down
15 changes: 15 additions & 0 deletions src/tests/rpp/test_combine_latest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ TEST_CASE("combine_latest handles race condition")
}
}

TEST_CASE("combine_latest is not deadlocking is_disposed")
{
std::optional<rpp::dynamic_observer<int>> observer{};

rpp::source::create<int>([&observer](auto&& obs) {
observer = std::forward<decltype(obs)>(obs).as_dynamic();
observer->on_next(1);
})
| rpp::operators::combine_latest(rpp::source::just(1))
| rpp::ops::subscribe([&observer](auto) {
CHECK(observer);
CHECK(!observer->is_disposed());
});
}

TEST_CASE("combine_latest satisfies disposable contracts")
{
auto observable_disposable = rpp::composite_disposable_wrapper::make();
Expand Down
20 changes: 20 additions & 0 deletions src/tests/rpp/test_debounce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ TEST_CASE("debounce emit only items where timeout reached")
}
}


TEST_CASE("debounce is not deadlocking is_disposed")
{
std::optional<rpp::dynamic_observer<int>> observer{};

rpp::schedulers::test_scheduler scheduler{};

rpp::source::create<int>([&observer](auto&& obs) {
observer = std::forward<decltype(obs)>(obs).as_dynamic();
observer->on_next(1);
})
| rpp::operators::debounce(std::chrono::seconds{1}, scheduler)
| rpp::ops::subscribe([&observer](int) {
CHECK(observer);
CHECK(!observer->is_disposed());
});
scheduler.time_advance(std::chrono::seconds{1});
}
Comment on lines +124 to +140

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Good addition of a test case for deadlock prevention, but consider enhancing it.

The new test case "debounce is not deadlocking is_disposed" is a valuable addition to ensure that the debounce operator doesn't cause deadlocks when checking the disposed state. However, consider the following enhancements to make the test more robust and informative:

  1. Add a comment explaining the purpose of the test and the potential deadlock scenario it's preventing.
  2. Explicitly dispose of the observer after the scheduler advancement to test the full lifecycle.
  3. Add assertions after advancing the scheduler to verify the expected behavior (e.g., that the lambda was called).
  4. Consider testing with multiple emissions to ensure consistent behavior.

Here's a suggested improvement:

TEST_CASE("debounce is not deadlocking is_disposed")
{
    // This test ensures that checking is_disposed() within the debounce operator
    // does not cause a deadlock, even when the observer is disposed.
    std::optional<rpp::dynamic_observer<int>> observer{};
    rpp::schedulers::test_scheduler scheduler{};
    bool lambda_called = false;

    auto subscription = rpp::source::create<int>([&observer](auto&& obs) {
        observer = std::forward<decltype(obs)>(obs).as_dynamic();
        observer->on_next(1);
        observer->on_next(2);  // Add another emission to test multiple debounces
    })
    | rpp::operators::debounce(std::chrono::seconds{1}, scheduler)
    | rpp::ops::subscribe([&observer, &lambda_called](int value) {
        CHECK(observer);
        CHECK(!observer->is_disposed());
        lambda_called = true;
        CHECK(value == 2);  // Ensure we got the last emitted value
    });

    scheduler.time_advance(std::chrono::milliseconds{500});
    CHECK_FALSE(lambda_called);  // Ensure the lambda hasn't been called yet

    scheduler.time_advance(std::chrono::seconds{1});
    CHECK(lambda_called);  // Ensure the lambda was called after debounce period

    subscription.dispose();  // Explicitly dispose of the subscription
    CHECK(observer->is_disposed());  // Verify that the observer is now disposed
}

This enhanced version provides more comprehensive testing of the debounce behavior and explicitly checks for the non-deadlocking property when disposing of the observer.



TEST_CASE("debounce forwards error")
{
auto mock = mock_observer_strategy<int>{};
Expand Down
14 changes: 14 additions & 0 deletions src/tests/rpp/test_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,20 @@ TEST_CASE("merge dispose inner_disposable immediately")
| rpp::ops::subscribe([](int) {});
}

TEST_CASE("merge is not deadlocking is_disposed")
{
std::optional<rpp::dynamic_observer<int>> observer{};
rpp::source::create<int>([&observer](auto&& obs) {
observer = std::forward<decltype(obs)>(obs).as_dynamic();
observer->on_next(1);
})
| rpp::ops::merge_with(rpp::source::never<int>())
| rpp::ops::subscribe([&observer](int) {
CHECK(observer);
CHECK(!observer->is_disposed());
});
}
Comment on lines +261 to +273

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance test case clarity and robustness

The new test case effectively verifies that the merge operator doesn't cause deadlocks when checking if the observer is disposed. However, consider the following improvements:

  1. Use a more descriptive name for the test case, e.g., "merge_operator_does_not_deadlock_when_checking_observer_disposal".
  2. Consider using REQUIRE instead of CHECK for critical assertions to ensure the test fails immediately if these conditions are not met.
  3. Add a check to verify the observer's state before emission to ensure it's not disposed prematurely.

Here's a suggested refactoring of the test case:

TEST_CASE("merge_operator_does_not_deadlock_when_checking_observer_disposal")
{
    rpp::dynamic_observer<int> observer;
    bool value_emitted = false;

    rpp::source::create<int>([&](auto&& obs) {
        observer = std::forward<decltype(obs)>(obs).as_dynamic();
        REQUIRE(observer);
        REQUIRE(!observer.is_disposed());
        observer.on_next(1);
        value_emitted = true;
    })
        | rpp::ops::merge_with(rpp::source::never<int>())
        | rpp::ops::subscribe([&](int) {
              REQUIRE(value_emitted);
              REQUIRE(!observer.is_disposed());
          });

    REQUIRE(value_emitted);
}

This refactored version:

  • Uses a more descriptive test case name
  • Replaces std::optional with a direct rpp::dynamic_observer<int>
  • Adds checks before and after emission
  • Uses REQUIRE for critical assertions
  • Adds a flag to ensure the value was actually emitted


TEST_CASE("merge doesn't produce extra copies")
{
SUBCASE("send value by copy")
Expand Down
15 changes: 15 additions & 0 deletions src/tests/rpp/test_zip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,21 @@ TEST_CASE("zip doesn't produce extra copies")
}
}

TEST_CASE("zip is not deadlocking is_disposed")
{
std::optional<rpp::dynamic_observer<int>> observer{};

rpp::source::create<int>([&observer](auto&& obs) {
observer = std::forward<decltype(obs)>(obs).as_dynamic();
observer->on_next(1);
})
| rpp::operators::zip(rpp::source::just(1))
| rpp::ops::subscribe([&observer](auto) {
CHECK(observer);
CHECK(!observer->is_disposed());
});
}

TEST_CASE("zip satisfies disposable contracts")
{
auto observable_disposable = rpp::composite_disposable_wrapper::make();
Expand Down