Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/rpp/rpp/observers/details/disposable_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,15 @@ namespace rpp::details::observers

static void dispose() {}
};

struct locally_disposable_strategy
{
static void add(const rpp::disposable_wrapper&) {}

bool is_disposed() const noexcept { return state; }

void dispose() const { state = true; }

mutable bool state{};
};
} // namespace rpp::details::observers
5 changes: 5 additions & 0 deletions src/rpp/rpp/observers/details/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ namespace rpp::details::observers
*/
struct none_disposable_strategy;

/**
* @brief Just bool over is_disposed/dispose logic with no any add logic
*/
struct locally_disposable_strategy;

/**
* @brief Dynamic disposable logic based on pre-allocated vector
*/
Expand Down
3 changes: 3 additions & 0 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ namespace rpp::operators::details
template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_observer_strategy_base
{

concat_observer_strategy_base(std::shared_ptr<concat_state_t<TObservable, TObserver>> state, rpp::composite_disposable_wrapper refcounted)
: state{std::move(state)}
, refcounted{std::move(refcounted)}
Expand Down Expand Up @@ -138,6 +139,8 @@ namespace rpp::operators::details
template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_inner_observer_strategy : public concat_observer_strategy_base<TObservable, TObserver>
{
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

using base = concat_observer_strategy_base<TObservable, TObserver>;

using base::concat_observer_strategy_base;
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ namespace rpp::operators::details
template<rpp::constraint::observer Observer, typename Worker, bool ClearOnError>
struct delay_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

std::shared_ptr<delay_state<Observer, Worker>> state{};

void set_upstream(const rpp::disposable_wrapper& d) const
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/details/combining_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ namespace rpp::operators::details
template<typename TState>
struct combining_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

std::shared_ptr<TState> state{};

void set_upstream(const rpp::disposable_wrapper& d) const
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ namespace rpp::operators::details
template<rpp::constraint::observer TObserver>
struct merge_observer_base_strategy
{
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

merge_observer_base_strategy(std::shared_ptr<merge_state<TObserver>>&& state)
: m_state{std::move(state)}
{
Expand Down
7 changes: 2 additions & 5 deletions src/rpp/rpp/operators/retry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ namespace rpp::operators::details
template<rpp::constraint::observer TObserver, typename TObservable>
struct retry_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

std::shared_ptr<retry_state_t<TObserver, TObservable>> state;
mutable bool locally_disposed{};

template<typename T>
void on_next(T&& v) const
Expand All @@ -57,7 +56,6 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
if (state->count == 0)
{
state->observer.on_error(err);
Expand All @@ -74,7 +72,6 @@ namespace rpp::operators::details

void on_completed() const
{
locally_disposed = true;
state->observer.on_completed();
}

Expand All @@ -83,7 +80,7 @@ namespace rpp::operators::details
state->disposable.add(d);
}

bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); }
bool is_disposed() const { return state->disposable.is_disposed(); }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Issues Found with Removal of 'locally_disposed'

The locally_disposed variable is still referenced in src/rpp/rpp/operators/retry_when.hpp:

  • Line: mutable bool locally_disposed{};
  • Multiple assignments: locally_disposed = true;
  • Updated is_disposed() method still checks locally_disposed || state->disposable.is_disposed();

These remaining references indicate that the removal of locally_disposed is incomplete and may affect the disposal logic as originally intended.

🔗 Analysis chain

Verify the impact of removing 'locally_disposed' on disposal logic

The is_disposed() method now solely checks state->disposable.is_disposed() after the removal of locally_disposed. Ensure that this change does not affect the correctness of disposal checks, especially in scenarios where local disposal status was significant for stopping retries.

To confirm that the removal of locally_disposed does not introduce issues, run the following script to check for any remaining references and to assess disposal behavior:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that 'locally_disposed' is no longer referenced and that disposal logic functions correctly.

# Check for any remaining references to 'locally_disposed' in the codebase.
rg --type cpp 'locally_disposed'

# Inspect all usages of 'is_disposed()' to ensure they rely solely on 'state->disposable.is_disposed()'.
rg --type cpp 'is_disposed\(\)'

Length of output: 34493

};

template<rpp::constraint::observer TObserver, typename TObservable>
Expand Down
2 changes: 2 additions & 0 deletions src/rpp/rpp/operators/with_latest_from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ namespace rpp::operators::details
template<size_t I, rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... RestArgs>
struct with_latest_from_inner_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

std::shared_ptr<with_latest_from_state<Observer, TSelector, RestArgs...>> state{};

void set_upstream(const rpp::disposable_wrapper& d) const
Expand Down
7 changes: 2 additions & 5 deletions src/rpp/rpp/sources/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ namespace rpp::details
template<rpp::constraint::observer TObserver, constraint::decayed_type PackedContainer>
struct concat_source_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Inconsistent preferred_disposable_strategy Usage Detected

The switch to locally_disposable_strategy in concat.hpp is not consistently reflected across the codebase. Numerous files still utilize none_disposable_strategy, which may lead to inconsistent resource management behavior.

🔗 Analysis chain

Approve change to locally_disposable_strategy

The change from none_disposable_strategy to locally_disposable_strategy is a good improvement in resource management. This modification suggests that disposables are now managed locally within the strategy, which could lead to better cleanup and resource handling.

To ensure this change doesn't introduce any unintended side effects, please verify:

  1. The behavior of the concat_source_observer_strategy remains consistent with the expected functionality.
  2. There are no memory leaks or resource management issues introduced by this change.

You may want to run the following command to check for any other occurrences of none_disposable_strategy that might need similar updates:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

rg "none_disposable_strategy" --type cpp

Length of output: 6313


std::shared_ptr<concat_state_t<TObserver, PackedContainer>> state{};
mutable bool locally_disposed{};

template<typename T>
void on_next(T&& v) const
Expand All @@ -65,17 +64,15 @@ namespace rpp::details

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
state->observer.on_error(err);
}

void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); }

bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); }
bool is_disposed() const { return state->disposable.is_disposed(); }

void on_completed() const
{
locally_disposed = true;
state->disposable.clear();

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
Expand Down
26 changes: 25 additions & 1 deletion src/tests/rpp/test_delay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <rpp/subjects/publish_subject.hpp>

#include "disposable_observable.hpp"
#include "rpp_trompeloil.hpp"

namespace
{
Expand Down Expand Up @@ -232,9 +233,32 @@ TEST_CASE("delay delays observable's emissions")
}
}

TEST_CASE("delay is not disposing early")
{
mock_observer<int> mock{};
trompeloeil::sequence s{};

rpp::schedulers::test_scheduler scheduler{};

std::optional<rpp::composite_disposable_wrapper> d{};
rpp::source::create<int>([&d](auto&& obs) {
d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d.value());
obs.on_completed();
CHECK(obs.is_disposed());
})
| rpp::ops::delay(std::chrono::seconds{1}, scheduler)
| rpp::ops::subscribe(mock);

CHECK(!d->is_disposed());
REQUIRE_CALL(*mock, on_completed()).LR_WITH(!d->is_disposed()).IN_SEQUENCE(s);
scheduler.time_advance(std::chrono::seconds{1});
CHECK(d->is_disposed());
}

TEST_CASE("delay satisfies disposable contracts")
{
test_operator_with_disposable<int>(rpp::ops::delay(std::chrono::seconds{0}, manual_scheduler{}));
test_operator_with_disposable<int>(rpp::ops::delay(std::chrono::seconds{0}, rpp::schedulers::immediate{}));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Consistent Scheduler Usage Recommended in Delay Tests

The recent change replaces manual_scheduler with rpp::schedulers::immediate in the "delay satisfies disposable contracts" test, enhancing its reliability by using a synchronous scheduler.

However, additional instances of manual_scheduler were found in test_delay.cpp. To maintain consistency and ensure comprehensive test coverage, it's recommended to update these instances to use rpp::schedulers::immediate as well.

  • Update the following lines in src/tests/rpp/test_delay.cpp:
    • rpp::ops::delay(std::chrono::seconds{30000}, manual_scheduler{})
    • manual_scheduler::worker_strategy::s_test_queue = rpp::schedulers::details::schedulables_queue<manual_scheduler::worker_strategy>{};

This will standardize the scheduler usage across all delay operator tests, ensuring consistent behavior and improved test reliability.

🔗 Analysis chain

Good update to use immediate scheduler in disposable contract test.

The change from manual_scheduler to rpp::schedulers::immediate in the "delay satisfies disposable contracts" test is a positive modification. It tests the delay operator under immediate scheduling conditions, which could potentially uncover different behaviors or edge cases.

This change enhances the test coverage by verifying the disposable contracts under a different scheduling strategy. It's a good practice to test operators with various scheduler types to ensure consistent behavior across different execution contexts.

To ensure consistency across the test suite, let's verify if similar changes are needed in other test cases:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for other uses of manual_scheduler in delay tests

# Search for other occurrences of manual_scheduler in test_delay.cpp
echo "Occurrences of manual_scheduler in test_delay.cpp:"
rg "manual_scheduler" src/tests/rpp/test_delay.cpp

# Search for other delay operator tests that might benefit from using immediate scheduler
echo "\nOther delay operator tests:"
rg "ops::delay" src/tests/rpp/test_delay.cpp

Length of output: 1243

}

TEST_CASE("observe_on forward error immediately")
Expand Down
5 changes: 4 additions & 1 deletion src/tests/rpp/test_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,21 @@ TEST_CASE_TEMPLATE("merge handles race condition", TestType, rpp::memory_model::
extracted_obs.emplace(std::forward<decltype(obs)>(obs).as_dynamic());
});


auto test = [&](auto source) {
SUBCASE("subscribe on it")
{
SUBCASE("on_error can't interleave with on_next")
{
std::mutex m{};
source
| rpp::ops::as_blocking()
| rpp::ops::subscribe([&](auto&&) {
REQUIRE(extracted_obs.has_value());
CHECK(!on_error_called);
std::thread{[extracted_obs]
std::thread{[extracted_obs,&m]
{
std::lock_guard lock{m};
Comment on lines +225 to +233

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential data race when accessing 'extracted_obs'

The variable extracted_obs is accessed from multiple threads without proper synchronization. While the access to extracted_obs->on_error(...) inside the detached thread is protected by a mutex, the accesses to extracted_obs in the subscriber lambda (e.g., REQUIRE(extracted_obs.has_value());) are not synchronized. This can lead to data races and undefined behavior.

Consider synchronizing all accesses to extracted_obs or using thread-safe constructs to ensure that shared data is accessed safely across threads.


⚠️ Potential issue

Undefined behavior due to capturing local mutex by reference in detached thread

The mutex m declared at line 225 is a local variable within the lambda passed to rpp::ops::subscribe. It is captured by reference (&m) in the detached thread created at line 231. Since the lambda scope may exit and m may be destroyed before the detached thread completes execution, this can lead to undefined behavior when the detached thread tries to lock m.

To fix this issue, consider extending the lifetime of the mutex by moving it outside the lambda or by capturing it by value using a std::shared_ptr.

Apply this diff to fix the issue:

-                        std::mutex m{};
+                        auto m = std::make_shared<std::mutex>();
                         source
                             | rpp::ops::as_blocking()
                             | rpp::ops::subscribe([&](auto&&) {
                                         REQUIRE(extracted_obs.has_value());
                                         CHECK(!on_error_called);
-                                        std::thread{[extracted_obs,&m]
+                                        std::thread{[extracted_obs, m]
                                         {
-                                            std::lock_guard lock{m};
+                                            std::lock_guard lock{*m};
                                             extracted_obs->on_error(std::exception_ptr{});
                                         }}.detach();
                                         std::this_thread::sleep_for(std::chrono::seconds{1});
                                         CHECK(!on_error_called); },
                                                   [&](auto) { on_error_called = true; });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
std::mutex m{};
source
| rpp::ops::as_blocking()
| rpp::ops::subscribe([&](auto&&) {
REQUIRE(extracted_obs.has_value());
CHECK(!on_error_called);
std::thread{[extracted_obs]
std::thread{[extracted_obs,&m]
{
std::lock_guard lock{m};
auto m = std::make_shared<std::mutex>();
source
| rpp::ops::as_blocking()
| rpp::ops::subscribe([&](auto&&) {
REQUIRE(extracted_obs.has_value());
CHECK(!on_error_called);
std::thread{[extracted_obs, m]
{
std::lock_guard lock{*m};

extracted_obs->on_error(std::exception_ptr{});
}}.detach();
std::this_thread::sleep_for(std::chrono::seconds{1});
Expand Down
24 changes: 16 additions & 8 deletions src/tests/utils/disposable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,30 @@ void test_operator_over_observable_with_disposable(auto&& op)

SUBCASE("operator disposes disposable on_error")
{
op(rpp::source::create<T>([](auto&& obs) {
const auto d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d);
std::optional<rpp::composite_disposable_wrapper> d{};
op(rpp::source::create<T>([&d](auto&& obs) {
d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d.value());
obs.on_error({});
CHECK(d.is_disposed());
CHECK(obs.is_disposed());
})).subscribe([](const auto&) {}, [](const std::exception_ptr&) {});

CHECK(d);
CHECK(d->is_disposed());
}

SUBCASE("operator disposes disposable on_completed")
{
op(rpp::source::create<T>([](auto&& obs) {
const auto d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d);
std::optional<rpp::composite_disposable_wrapper> d{};
op(rpp::source::create<T>([&d](auto&& obs) {
d = rpp::composite_disposable_wrapper::make();
obs.set_upstream(d.value());
obs.on_completed();
CHECK(d.is_disposed());
CHECK(obs.is_disposed());
})).subscribe([](const auto&) {}, [](const std::exception_ptr&) {});

CHECK(d);
CHECK(d->is_disposed());
}

SUBCASE("set_upstream with fixed_disposable_strategy_selector<1>")
Expand Down