Fix early disposing inside delay#662
Conversation
📝 WalkthroughWalkthroughThe changes introduce several new type aliases named Changes
Possibly related PRs
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
src/rpp/rpp/operators/delay.hpp (1)
71-72: LGTM! Consider adding a brief comment.The addition of
preferred_disposable_strategyusingnone_disposable_strategyappears to address the issue of early disposing mentioned in the PR title. This change should prevent premature disposal of resources in the delay operator.Consider adding a brief comment explaining why this specific strategy is preferred for the delay operator. This would enhance code maintainability and make the intent clearer for future developers. For example:
// Use none_disposable_strategy to prevent early disposing of resources in the delay operator using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;
| using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; | ||
|
|
There was a problem hiding this comment.
💡 Codebase verification
Impact of the change requires attention.
preferred_disposable_strategyis used in multiple files and may need corresponding updates.- No existing tests for
delay_observer_strategywere found; consider adding tests. - A TODO related to disposal exists in
combining_strategy.hppthat should be addressed.
🔗 Analysis chain
Verify the impact of the change.
Let's ensure this change is properly implemented and doesn't have unintended consequences.
Run the following script to verify the change:
This script will help us verify:
- If there are any other occurrences of
preferred_disposable_strategythat might need similar changes. - If there are any tests related to
delay_observer_strategythat might need updating. - How
delay_observer_strategyis used in the codebase to ensure compatibility with this change. - If there are any remaining TODOs or FIXMEs related to disposal in the delay operator.
Please review the results of this script to ensure the change is comprehensive and doesn't introduce any inconsistencies.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the impact of adding preferred_disposable_strategy
# Test 1: Check for other occurrences of preferred_disposable_strategy
echo "Checking for other occurrences of preferred_disposable_strategy:"
rg "preferred_disposable_strategy" --type cpp
# Test 2: Look for tests related to delay_observer_strategy
echo "Checking for tests related to delay_observer_strategy:"
rg "delay_observer_strategy.*test" --type cpp
# Test 3: Check for usage of delay_observer_strategy
echo "Checking for usage of delay_observer_strategy:"
rg "delay_observer_strategy" --type cpp
# Test 4: Verify if there are any TODOs or FIXMEs related to disposal in delay
echo "Checking for TODOs or FIXMEs related to disposal in delay:"
rg "TODO|FIXME.*dispos.*delay" --type cpp
Length of output: 7173
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- src/tests/rpp/test_delay.cpp (2 hunks)
🧰 Additional context used
🔇 Additional comments (1)
src/tests/rpp/test_delay.cpp (1)
236-256: Excellent addition of a new test case!This new test case "delay is not disposing early" is a valuable addition to the test suite. It effectively verifies that the
delayoperator doesn't prematurely dispose of resources, which is crucial for maintaining correct behavior in asynchronous operations.The test logic is well-structured:
- It sets up a mock observer and a test scheduler.
- Creates a source observable that completes without emitting values.
- Applies the
delayoperator with a 1-second delay.- Verifies that the disposable is not disposed before advancing time.
- Ensures that
on_completedis called before disposal.- Confirms that the disposable is disposed after advancing time.
This test case enhances the robustness of the
delayoperator implementation by covering an important edge case.
| TEST_CASE("delay satisfies disposable contracts") | ||
| { | ||
| test_operator_with_disposable<int>(rpp::ops::delay(std::chrono::seconds{0}, manual_scheduler{})); | ||
| test_operator_with_disposable<int>(rpp::ops::delay(std::chrono::seconds{0}, rpp::schedulers::immediate{})); |
There was a problem hiding this comment.
💡 Codebase verification
Consistent Scheduler Usage Recommended in Delay Tests
The recent change replaces manual_scheduler with rpp::schedulers::immediate in the "delay satisfies disposable contracts" test, enhancing its reliability by using a synchronous scheduler.
However, additional instances of manual_scheduler were found in test_delay.cpp. To maintain consistency and ensure comprehensive test coverage, it's recommended to update these instances to use rpp::schedulers::immediate as well.
- Update the following lines in
src/tests/rpp/test_delay.cpp:rpp::ops::delay(std::chrono::seconds{30000}, manual_scheduler{})manual_scheduler::worker_strategy::s_test_queue = rpp::schedulers::details::schedulables_queue<manual_scheduler::worker_strategy>{};
This will standardize the scheduler usage across all delay operator tests, ensuring consistent behavior and improved test reliability.
🔗 Analysis chain
Good update to use immediate scheduler in disposable contract test.
The change from manual_scheduler to rpp::schedulers::immediate in the "delay satisfies disposable contracts" test is a positive modification. It tests the delay operator under immediate scheduling conditions, which could potentially uncover different behaviors or edge cases.
This change enhances the test coverage by verifying the disposable contracts under a different scheduling strategy. It's a good practice to test operators with various scheduler types to ensure consistent behavior across different execution contexts.
To ensure consistency across the test suite, let's verify if similar changes are needed in other test cases:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for other uses of manual_scheduler in delay tests
# Search for other occurrences of manual_scheduler in test_delay.cpp
echo "Occurrences of manual_scheduler in test_delay.cpp:"
rg "manual_scheduler" src/tests/rpp/test_delay.cpp
# Search for other delay operator tests that might benefit from using immediate scheduler
echo "\nOther delay operator tests:"
rg "ops::delay" src/tests/rpp/test_delay.cpp
Length of output: 1243
BENCHMARK RESULTS (AUTOGENERATED)
|
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 312.05 ns | 2.16 ns | 2.16 ns | 1.00 |
| Subscribe empty callbacks to empty observable via pipe operator | 302.10 ns | 2.16 ns | 2.16 ns | 1.00 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 690.52 ns | 0.31 ns | 0.31 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 1036.85 ns | 3.71 ns | 3.71 ns | 1.00 |
| concat_as_source of just(1 immediate) create + subscribe | 2255.05 ns | 141.51 ns | 141.53 ns | 1.00 |
| defer from array of 1 - defer + create + subscribe + immediate | 737.97 ns | 0.31 ns | 0.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 2153.45 ns | 59.19 ns | 59.23 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 2964.93 ns | 32.43 ns | 32.44 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 28352.62 ns | 27852.71 ns | 27426.70 ns | 1.02 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 40199.52 ns | 51780.23 ns | 48029.52 ns | 1.08 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3468.21 ns | 216.63 ns | 216.81 ns | 1.00 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1139.91 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 883.39 ns | 0.31 ns | 0.31 ns | 1.01 |
| immediate_just(1,2)+skip(1)+subscribe | 978.56 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 844.34 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 1272.11 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 914.32 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1140.28 ns | 18.20 ns | 18.83 ns | 0.97 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 849.49 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 265.86 ns | 2.16 ns | 2.16 ns | 1.00 |
| current_thread scheduler create worker + schedule | 370.26 ns | 5.86 ns | 5.87 ns | 1.00 |
| current_thread scheduler create worker + schedule + recursive schedule | 809.95 ns | 55.52 ns | 54.10 ns | 1.03 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 881.94 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 906.98 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2344.92 ns | 192.63 ns | 192.64 ns | 1.00 |
| immediate_just+buffer(2)+subscribe | 1583.10 ns | 13.89 ns | 13.41 ns | 1.04 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2442.18 ns | 1328.60 ns | 1387.12 ns | 0.96 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 886.15 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 914.18 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 2030.03 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3445.92 ns | 237.12 ns | 237.04 ns | 1.00 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3660.68 ns | 192.19 ns | 180.17 ns | 1.07 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 179.62 ns | 174.81 ns | 1.03 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3526.37 ns | 1239.97 ns | 1316.08 ns | 0.94 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2086.71 ns | 224.12 ns | 227.52 ns | 0.99 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 34.51 ns | 14.63 ns | 14.64 ns | 1.00 |
| subscribe 100 observers to publish_subject | 199373.60 ns | 16014.55 ns | 15936.85 ns | 1.00 |
| 100 on_next to 100 observers to publish_subject | 27110.37 ns | 17085.46 ns | 17126.42 ns | 1.00 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1473.16 ns | 12.65 ns | 12.66 ns | 1.00 |
| basic sample with immediate scheduler | 1366.77 ns | 5.55 ns | 5.55 ns | 1.00 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 933.82 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2071.43 ns | 915.65 ns | 939.01 ns | 0.98 |
| create(on_error())+retry(1)+subscribe | 601.52 ns | 121.61 ns | 121.33 ns | 1.00 |
ci-macos
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 977.50 ns | 3.95 ns | 5.68 ns | 0.70 |
| Subscribe empty callbacks to empty observable via pipe operator | 971.77 ns | 3.92 ns | 8.25 ns | 0.48 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1928.43 ns | 0.23 ns | 0.29 ns | 0.81 |
| from array of 1 - create + subscribe + current_thread | 2442.67 ns | 33.11 ns | 40.92 ns | 0.81 |
| concat_as_source of just(1 immediate) create + subscribe | 5427.55 ns | 427.04 ns | 786.24 ns | 0.54 |
| defer from array of 1 - defer + create + subscribe + immediate | 1969.89 ns | 0.23 ns | 0.31 ns | 0.77 |
| interval - interval + take(3) + subscribe + immediate | 5066.84 ns | 114.20 ns | 139.38 ns | 0.82 |
| interval - interval + take(3) + subscribe + current_thread | 6079.95 ns | 100.21 ns | 140.23 ns | 0.71 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 92376.33 ns | 81204.57 ns | 114780.33 ns | 0.71 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 89856.45 ns | 90478.08 ns | 126040.78 ns | 0.72 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 8263.18 ns | 595.50 ns | 772.44 ns | 0.77 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 2924.10 ns | 0.24 ns | 0.29 ns | 0.83 |
| immediate_just+filter(true)+subscribe | 2330.73 ns | 0.26 ns | 0.29 ns | 0.90 |
| immediate_just(1,2)+skip(1)+subscribe | 2829.04 ns | 0.24 ns | 0.29 ns | 0.84 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 2134.18 ns | 0.49 ns | 0.58 ns | 0.85 |
| immediate_just(1,2)+first()+subscribe | 3299.06 ns | 0.25 ns | 0.29 ns | 0.87 |
| immediate_just(1,2)+last()+subscribe | 2436.21 ns | 0.24 ns | 0.29 ns | 0.83 |
| immediate_just+take_last(1)+subscribe | 3158.64 ns | 0.24 ns | 0.29 ns | 0.83 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 2343.76 ns | 0.24 ns | 0.31 ns | 0.78 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 882.96 ns | 4.66 ns | 5.39 ns | 0.86 |
| current_thread scheduler create worker + schedule | 1238.38 ns | 38.24 ns | 44.78 ns | 0.85 |
| current_thread scheduler create worker + schedule + recursive schedule | 2091.73 ns | 213.44 ns | 245.31 ns | 0.87 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 2119.37 ns | 4.64 ns | 5.16 ns | 0.90 |
| immediate_just+scan(10, std::plus)+subscribe | 2375.32 ns | 0.47 ns | 0.57 ns | 0.82 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 5673.61 ns | 537.37 ns | 628.18 ns | 0.86 |
| immediate_just+buffer(2)+subscribe | 2641.43 ns | 68.21 ns | 80.09 ns | 0.85 |
| immediate_just+window(2)+subscribe + subscsribe inner | 5568.64 ns | 2542.69 ns | 2923.12 ns | 0.87 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 2227.44 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 2220.82 ns | 0.25 ns | 0.29 ns | 0.88 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 4976.64 ns | 5.42 ns | 6.41 ns | 0.85 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 7609.22 ns | 617.74 ns | 732.89 ns | 0.84 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 8601.13 ns | 525.07 ns | 631.44 ns | 0.83 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 620.83 ns | 727.34 ns | 0.85 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 8346.80 ns | 2031.03 ns | 2462.28 ns | 0.82 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 5283.96 ns | 971.87 ns | 1107.04 ns | 0.88 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 80.27 ns | 49.82 ns | 60.62 ns | 0.82 |
| subscribe 100 observers to publish_subject | 362185.00 ns | 43885.71 ns | 50372.39 ns | 0.87 |
| 100 on_next to 100 observers to publish_subject | 57770.10 ns | 20429.25 ns | 23329.30 ns | 0.88 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 2829.70 ns | 70.43 ns | 83.11 ns | 0.85 |
| basic sample with immediate scheduler | 2878.69 ns | 18.71 ns | 23.01 ns | 0.81 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 2420.20 ns | 0.24 ns | 0.29 ns | 0.83 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 7035.14 ns | 4366.85 ns | 4865.12 ns | 0.90 |
| create(on_error())+retry(1)+subscribe | 1895.00 ns | 380.84 ns | 446.98 ns | 0.85 |
ci-ubuntu-clang
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 272.56 ns | 1.60 ns | 1.55 ns | 1.03 |
| Subscribe empty callbacks to empty observable via pipe operator | 264.85 ns | 1.56 ns | 1.55 ns | 1.01 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 560.21 ns | 0.31 ns | 0.31 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 807.05 ns | 4.02 ns | 4.01 ns | 1.00 |
| concat_as_source of just(1 immediate) create + subscribe | 2394.72 ns | 189.22 ns | 186.37 ns | 1.02 |
| defer from array of 1 - defer + create + subscribe + immediate | 779.41 ns | 0.31 ns | 0.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 2223.88 ns | 58.26 ns | 58.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3170.58 ns | 30.88 ns | 30.88 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 27398.46 ns | 28100.54 ns | 32978.20 ns | 0.85 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 39964.81 ns | 37097.52 ns | 37651.41 ns | 0.99 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3709.58 ns | 303.90 ns | 303.27 ns | 1.00 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1141.85 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 841.86 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1123.20 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 859.23 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 1360.11 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 1011.83 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1211.33 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 854.28 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 276.23 ns | 1.57 ns | 1.54 ns | 1.02 |
| current_thread scheduler create worker + schedule | 388.44 ns | 4.32 ns | 4.58 ns | 0.94 |
| current_thread scheduler create worker + schedule + recursive schedule | 848.11 ns | 56.01 ns | 63.35 ns | 0.88 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 839.74 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 965.53 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2244.13 ns | 228.21 ns | 228.13 ns | 1.00 |
| immediate_just+buffer(2)+subscribe | 1504.91 ns | 13.58 ns | 13.59 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2461.37 ns | 941.72 ns | 932.57 ns | 1.01 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 833.56 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 855.49 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 2018.39 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3288.85 ns | 292.05 ns | 291.00 ns | 1.00 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3723.14 ns | 220.47 ns | 214.84 ns | 1.03 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 198.88 ns | 198.60 ns | 1.00 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3387.99 ns | 841.61 ns | 840.78 ns | 1.00 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2202.66 ns | 196.71 ns | 196.89 ns | 1.00 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 54.66 ns | 21.73 ns | 18.25 ns | 1.19 |
| subscribe 100 observers to publish_subject | 210492.20 ns | 16010.53 ns | 16110.75 ns | 0.99 |
| 100 on_next to 100 observers to publish_subject | 37382.16 ns | 20461.38 ns | 20570.95 ns | 0.99 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1305.99 ns | 11.42 ns | 11.42 ns | 1.00 |
| basic sample with immediate scheduler | 1291.00 ns | 6.17 ns | 6.17 ns | 1.00 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 991.06 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2174.13 ns | 1001.69 ns | 1008.80 ns | 0.99 |
| create(on_error())+retry(1)+subscribe | 670.72 ns | 163.46 ns | 159.66 ns | 1.02 |
ci-windows
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 559.28 ns | 4.02 ns | 4.94 ns | 0.81 |
| Subscribe empty callbacks to empty observable via pipe operator | 576.51 ns | 4.01 ns | 4.94 ns | 0.81 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1151.51 ns | 9.63 ns | 9.64 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 1422.35 ns | 17.91 ns | 17.69 ns | 1.01 |
| concat_as_source of just(1 immediate) create + subscribe | 3805.73 ns | 233.14 ns | 234.01 ns | 1.00 |
| defer from array of 1 - defer + create + subscribe + immediate | 1189.11 ns | 9.44 ns | 9.43 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 3355.56 ns | 145.43 ns | 144.67 ns | 1.01 |
| interval - interval + take(3) + subscribe + current_thread | 3397.63 ns | 68.26 ns | 65.38 ns | 1.04 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 116666.67 ns | 111830.00 ns | 109530.00 ns | 1.02 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 126233.33 ns | 128175.00 ns | 127975.00 ns | 1.00 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5376.96 ns | 307.89 ns | 308.32 ns | 1.00 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1810.64 ns | 25.28 ns | 24.99 ns | 1.01 |
| immediate_just+filter(true)+subscribe | 1305.22 ns | 24.36 ns | 24.06 ns | 1.01 |
| immediate_just(1,2)+skip(1)+subscribe | 1749.66 ns | 24.07 ns | 23.43 ns | 1.03 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1605.08 ns | 28.99 ns | 26.24 ns | 1.10 |
| immediate_just(1,2)+first()+subscribe | 2037.62 ns | 22.82 ns | 23.76 ns | 0.96 |
| immediate_just(1,2)+last()+subscribe | 1780.09 ns | 24.08 ns | 24.69 ns | 0.98 |
| immediate_just+take_last(1)+subscribe | 1998.78 ns | 69.98 ns | 70.04 ns | 1.00 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 1341.54 ns | 27.45 ns | 26.53 ns | 1.03 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 473.53 ns | 6.17 ns | 6.17 ns | 1.00 |
| current_thread scheduler create worker + schedule | 645.02 ns | 13.78 ns | 14.33 ns | 0.96 |
| current_thread scheduler create worker + schedule + recursive schedule | 1346.95 ns | 103.33 ns | 102.94 ns | 1.00 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 1308.06 ns | 24.36 ns | 24.37 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 1440.65 ns | 26.55 ns | 26.53 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 3537.57 ns | 264.77 ns | 267.27 ns | 0.99 |
| immediate_just+buffer(2)+subscribe | 2653.62 ns | 68.76 ns | 68.97 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 4051.32 ns | 1309.84 ns | 1329.72 ns | 0.99 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 1607.16 ns | 23.12 ns | 23.14 ns | 1.00 |
| immediate_just+take_while(true)+subscribe | 1315.55 ns | 24.37 ns | 24.06 ns | 1.01 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 3455.56 ns | 11.10 ns | 11.10 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5155.28 ns | 304.60 ns | 303.09 ns | 1.00 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5496.15 ns | 261.75 ns | 263.17 ns | 0.99 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 301.29 ns | 307.60 ns | 0.98 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 5332.57 ns | 937.46 ns | 948.18 ns | 0.99 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 3561.37 ns | 564.64 ns | 571.95 ns | 0.99 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 36.71 ns | 20.36 ns | 20.08 ns | 1.01 |
| subscribe 100 observers to publish_subject | 260450.00 ns | 28485.00 ns | 28994.29 ns | 0.98 |
| 100 on_next to 100 observers to publish_subject | 54836.84 ns | 32629.03 ns | 33011.76 ns | 0.99 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1878.12 ns | 101.47 ns | 102.29 ns | 0.99 |
| basic sample with immediate scheduler | 1892.30 ns | 78.81 ns | 73.56 ns | 1.07 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1444.93 ns | 24.99 ns | 24.66 ns | 1.01 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2158.37 ns | 243.28 ns | 246.52 ns | 0.99 |
| create(on_error())+retry(1)+subscribe | 1174.59 ns | 190.89 ns | 194.11 ns | 0.98 |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## v2 #662 +/- ##
=======================================
Coverage 98.28% 98.29%
=======================================
Files 154 154
Lines 7717 7732 +15
=======================================
+ Hits 7585 7600 +15
Misses 132 132 ☔ View full report in Codecov by Sentry. |
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (4)
src/rpp/rpp/observers/details/fwd.hpp (1)
32-35: LGTM! Consider a minor improvement to the comment.The addition of
locally_disposable_strategyis well-placed and consistent with the existing code structure. Its purpose is clear from the comment.Consider slightly rewording the comment for better clarity:
- * @brief Just bool over is_disposed/dispose logic with no any add logic + * @brief Simple boolean-based is_disposed/dispose logic without additional functionalitysrc/rpp/rpp/operators/details/combining_strategy.hpp (1)
52-53: LGTM! Consider adding documentation for clarity.The addition of the
preferred_disposable_strategytype alias is a good improvement for specifying the disposable strategy used by thecombining_observer_strategy. It aligns well with reactive programming principles where resource management is crucial.Consider adding a brief comment explaining the rationale behind using
none_disposable_strategyfor this particular observer strategy. This would enhance code readability and maintainability.src/rpp/rpp/sources/concat.hpp (2)
72-72: Approve simplification ofis_disposed()methodThe simplification of the
is_disposed()method is a good change. It aligns well with the removal of thelocally_disposedmember variable and the shift tolocally_disposable_strategy. This change reduces complexity and potential sources of bugs related to multiple disposal flags.For improved readability, consider using an explicit return statement:
-bool is_disposed() const { return state->disposable.is_disposed(); } +bool is_disposed() const +{ + return state->disposable.is_disposed(); +}This multi-line format can be easier to read and maintain, especially if the method needs to be expanded in the future.
Line range hint
55-72: Summary of changes in concat.hppThe modifications in this file focus on improving the disposable strategy and simplifying the disposal logic in the
concat_source_observer_strategystruct. The key changes are:
- Updating the
preferred_disposable_strategyto uselocally_disposable_strategy.- Removing the
locally_disposedmember variable.- Simplifying the
is_disposed()method.These changes appear to be part of a broader effort to enhance resource management in the library. They should lead to more straightforward and potentially more efficient disposal of resources.
To ensure these changes are effective across the entire codebase:
- Review other similar structures to see if they could benefit from similar improvements.
- Update relevant documentation to reflect the new disposal strategy.
- Consider adding or updating unit tests to verify the new disposal behavior.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (11)
- src/rpp/rpp/observers/details/disposable_strategy.hpp (1 hunks)
- src/rpp/rpp/observers/details/fwd.hpp (1 hunks)
- src/rpp/rpp/operators/concat.hpp (2 hunks)
- src/rpp/rpp/operators/delay.hpp (1 hunks)
- src/rpp/rpp/operators/details/combining_strategy.hpp (1 hunks)
- src/rpp/rpp/operators/merge.hpp (1 hunks)
- src/rpp/rpp/operators/retry.hpp (2 hunks)
- src/rpp/rpp/operators/with_latest_from.hpp (1 hunks)
- src/rpp/rpp/sources/concat.hpp (2 hunks)
- src/tests/rpp/test_delay.cpp (2 hunks)
- src/tests/utils/disposable_observable.hpp (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/rpp/rpp/operators/delay.hpp
🧰 Additional context used
🔇 Additional comments (11)
src/rpp/rpp/observers/details/disposable_strategy.hpp (1)
107-116: Verify the intention of theaddmethod and consider thread safetyThe new
locally_disposable_strategystruct introduces a simplified disposable strategy. However, there are a few points to consider:
The
addmethod is empty. Is this intentional? If disposables are meant to be managed, this could lead to resource leaks.Unlike
local_disposable_strategy, this implementation doesn't consider thread safety. In multi-threaded scenarios, this could lead to race conditions. Consider usingstd::atomic<bool>forstateif thread safety is required.The use of a mutable boolean
stateallows changes in const contexts. While this enables thedisposemethod to be const, it might lead to unexpected behavior. Consider makingdisposenon-const to clearly communicate that it modifies the object's state.To ensure this implementation aligns with the intended use case, please run the following script to check for any existing usages or similar patterns:
This will help verify if the simplified implementation is consistent with existing patterns and use cases in the codebase.
✅ Verification successful
The
locally_disposable_strategyaligns with existing no-op disposable strategies and is suitable for scenarios where disposable management is unnecessary. Ensure it's used appropriately to prevent resource leaks and consider thread safety by selecting the appropriate internal strategy (atomic_boolif needed).🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for similar disposable strategies and their usages # Search for similar disposable strategy implementations echo "Similar disposable strategy implementations:" rg --type cpp -e "struct.*disposable_strategy" -e "class.*disposable_strategy" # Search for usages of disposable strategies echo "\nUsages of disposable strategies:" rg --type cpp -e "disposable_strategy" -A 5 -B 5Length of output: 136994
src/rpp/rpp/observers/details/fwd.hpp (1)
32-35: Verify usage of the newlocally_disposable_strategyThe addition of
locally_disposable_strategyexpands the available disposable strategies without affecting existing code. To ensure its proper integration:Run the following script to check for usage of the new structure across the codebase:
This will help ensure that the new structure is being utilized as intended and that no further adjustments are needed in other parts of the codebase.
✅ Verification successful
Usage of the new
locally_disposable_strategyVerifiedThe
locally_disposable_strategyis appropriately utilized across the codebase, ensuring its integration is correct and non-breaking.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for usage of locally_disposable_strategy across the codebase # Search for files that include this header echo "Files including fwd.hpp:" rg --type cpp -l 'include.*fwd\.hpp' # Search for usage of locally_disposable_strategy echo "\nUsage of locally_disposable_strategy:" rg --type cpp 'locally_disposable_strategy'Length of output: 5652
src/tests/utils/disposable_observable.hpp (1)
132-132: Approved: Enhanced test coverage for observer disposalThe addition of
CHECK(obs.is_disposed());improves the test case by verifying that the observer itself is disposed after theon_completedevent is triggered. This check complements the existing assertion for the disposable and ensures a more comprehensive validation of the disposal process.src/rpp/rpp/operators/concat.hpp (3)
Line range hint
114-122: LGTM: Improved disposable management in constructorThe addition of the
rpp::composite_disposable_wrapper refcountedparameter to the constructor enhances the management of disposables. This change aligns well with the overall goal of improving disposable handling in the library.
142-143: LGTM: Appropriate disposable strategy for inner observerThe addition of the
preferred_disposable_strategyalias set torpp::details::observers::locally_disposable_strategyis a good choice for theconcat_inner_observer_strategy. This allows the inner observer to manage its own disposables, which is consistent with its role in the concat operation.
Line range hint
170-171: Please clarify the choice of disposable strategyThe
preferred_disposable_strategyforconcat_observer_strategyis set torpp::details::observers::none_disposable_strategy. This choice seems to indicate that this observer doesn't manage its own disposables, which is different from theconcat_inner_observer_strategy.Could you please explain the rationale behind using
none_disposable_strategyhere? Is there a specific reason why this observer doesn't need to manage disposables, unlike its inner counterpart?To help understand the impact of this choice, let's examine how disposables are used in the
concat_observer_strategy:src/rpp/rpp/operators/merge.hpp (1)
54-55: LGTM: Appropriate use of type alias for disposable strategy.The addition of the
preferred_disposable_strategytype alias is a good approach to specify the disposable strategy for merge operations. This change aligns with the PR's objective of fixing early disposing issues and provides flexibility for future modifications if needed.src/rpp/rpp/operators/with_latest_from.hpp (1)
53-54: Approved: Enhanced disposable management strategyThe addition of the
preferred_disposable_strategytype alias, defined asrpp::details::observers::locally_disposable_strategy, is a positive change. This refinement in the disposable management strategy for the inner observer aligns with the broader goal of enhancing disposable handling across various observer strategies in the ReactivePlusPlus library.This change:
- Provides a more specific disposable strategy for the
with_latest_from_inner_observer_strategy.- Improves code clarity by explicitly defining the preferred strategy.
- Maintains consistency with other parts of the library that have undergone similar enhancements.
The modification doesn't alter the existing functionality but sets the groundwork for more efficient and localized disposable management.
src/tests/rpp/test_delay.cpp (2)
236-257: Excellent addition of edge case test for delay operatorThis new test case is a valuable addition as it verifies an important behavior of the delay operator: ensuring that resources are not disposed of prematurely. The test is well-structured and covers the following key points:
- Immediate completion of the source without emitting values.
- Verification of the disposable's state before and after the delay period.
- Correct order of operations using
trompeloeil::sequence.This test enhances the robustness of the delay operator implementation and helps prevent potential issues with early resource disposal.
261-261: Approved: Consistent use of immediate schedulerThis change aligns with the previous suggestion to use
rpp::schedulers::immediatefor consistency across delay operator tests. By replacing themanual_schedulerwithrpp::schedulers::immediate, we ensure that:
- The disposable contracts are verified under synchronous execution conditions.
- The test suite maintains consistency in scheduler usage.
- The delay operator's behavior is tested with different scheduling strategies, improving overall test coverage.
This modification enhances the reliability and comprehensiveness of the test suite.
src/rpp/rpp/operators/retry.hpp (1)
47-47: Update to 'preferred_disposable_strategy' aligns with disposal requirementsChanging
preferred_disposable_strategytorpp::details::observers::locally_disposable_strategyensures thatretry_observer_strategymanages its disposables appropriately. This update aligns with the intended behavior of the retry mechanism by allowing local disposal, which is necessary for controlling resubscription attempts.
| struct concat_source_observer_strategy | ||
| { | ||
| using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; | ||
| using preferred_disposable_strategy = rpp::details::observers::locally_disposable_strategy; |
There was a problem hiding this comment.
💡 Codebase verification
Inconsistent preferred_disposable_strategy Usage Detected
The switch to locally_disposable_strategy in concat.hpp is not consistently reflected across the codebase. Numerous files still utilize none_disposable_strategy, which may lead to inconsistent resource management behavior.
🔗 Analysis chain
Approve change to locally_disposable_strategy
The change from none_disposable_strategy to locally_disposable_strategy is a good improvement in resource management. This modification suggests that disposables are now managed locally within the strategy, which could lead to better cleanup and resource handling.
To ensure this change doesn't introduce any unintended side effects, please verify:
- The behavior of the
concat_source_observer_strategyremains consistent with the expected functionality. - There are no memory leaks or resource management issues introduced by this change.
You may want to run the following command to check for any other occurrences of none_disposable_strategy that might need similar updates:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
rg "none_disposable_strategy" --type cpp
Length of output: 6313
| } | ||
|
|
||
| bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } | ||
| bool is_disposed() const { return state->disposable.is_disposed(); } |
There was a problem hiding this comment.
💡 Codebase verification
Issues Found with Removal of 'locally_disposed'
The locally_disposed variable is still referenced in src/rpp/rpp/operators/retry_when.hpp:
- Line:
mutable bool locally_disposed{}; - Multiple assignments:
locally_disposed = true; - Updated
is_disposed()method still checkslocally_disposed || state->disposable.is_disposed();
These remaining references indicate that the removal of locally_disposed is incomplete and may affect the disposal logic as originally intended.
🔗 Analysis chain
Verify the impact of removing 'locally_disposed' on disposal logic
The is_disposed() method now solely checks state->disposable.is_disposed() after the removal of locally_disposed. Ensure that this change does not affect the correctness of disposal checks, especially in scenarios where local disposal status was significant for stopping retries.
To confirm that the removal of locally_disposed does not introduce issues, run the following script to check for any remaining references and to assess disposal behavior:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that 'locally_disposed' is no longer referenced and that disposal logic functions correctly.
# Check for any remaining references to 'locally_disposed' in the codebase.
rg --type cpp 'locally_disposed'
# Inspect all usages of 'is_disposed()' to ensure they rely solely on 'state->disposable.is_disposed()'.
rg --type cpp 'is_disposed\(\)'
Length of output: 34493
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- src/tests/utils/disposable_observable.hpp (1 hunks)
🧰 Additional context used
🪛 GitHub Check: tests ci-macos Release
src/tests/utils/disposable_observable.hpp
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - ubsan
src/tests/utils/disposable_observable.hpp
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - lsan
src/tests/utils/disposable_observable.hpp
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - asan
src/tests/utils/disposable_observable.hpp
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - tsan
src/tests/utils/disposable_observable.hpp
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration
🔇 Additional comments (1)
src/tests/utils/disposable_observable.hpp (1)
118-139: Overall improvement in disposable management and test coverageThe changes to both test cases ("operator disposes disposable on_error" and "operator disposes disposable on_completed") significantly improve the management of disposables and enhance the test coverage. By using
std::optional, the code now handles the lifecycle of disposables more robustly. The additional checks for both the observable's disposal state and the optional disposable provide better verification of the expected behavior.However, the introduced syntax errors in both test cases need to be addressed to ensure the tests compile and run correctly. Once these issues are fixed, the test cases will provide more comprehensive coverage of the disposable behavior in both error and completion scenarios.
These changes represent a positive step towards more robust and thorough testing of the disposable observable functionality. After addressing the syntax errors, consider adding similar improvements to other relevant test cases in the file for consistency.
🧰 Tools
🪛 GitHub Check: tests ci-macos Release
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration🪛 GitHub Check: Sanitize - ubsan
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration🪛 GitHub Check: Sanitize - lsan
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration🪛 GitHub Check: Sanitize - asan
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration🪛 GitHub Check: Sanitize - tsan
[failure] 118-118:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration
| std::optional<rpp::composite_disposable_wrapper> d{} : op(rpp::source::create<T>([&d](auto&& obs) { | ||
| d = rpp::composite_disposable_wrapper::make(); | ||
| obs.set_upstream(d.value()); | ||
| obs.on_error({}); | ||
| CHECK(d.is_disposed()); | ||
| CHECK(obs.is_disposed()); | ||
| })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); | ||
|
|
||
| CHECK(d); | ||
| CHECK(d->is_disposed()); |
There was a problem hiding this comment.
Fix syntax error and consider additional improvements
There's a syntax error in the lambda capture on line 118. The colon : should be replaced with an equals sign =. Additionally, consider the following improvements:
- Use
std::optional<rpp::composite_disposable_wrapper>instead ofrpp::composite_disposable_wrapperto match the type ofd. - Use
CHECK(d.has_value())instead ofCHECK(d)for clarity.
Apply this diff to fix the syntax error and implement the suggested improvements:
- std::optional<rpp::composite_disposable_wrapper> d{} : op(rpp::source::create<T>([&d](auto&& obs) {
+ std::optional<rpp::composite_disposable_wrapper> d{};
+ op(rpp::source::create<T>([&d](auto&& obs) {
d = rpp::composite_disposable_wrapper::make();
- obs.set_upstream(d.value());
+ obs.set_upstream(*d);
obs.on_error({});
CHECK(obs.is_disposed());
})).subscribe([](const auto&) {}, [](const std::exception_ptr&) {});
- CHECK(d);
+ CHECK(d.has_value());
CHECK(d->is_disposed());These changes will fix the syntax error and improve the readability and robustness of the test case.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| std::optional<rpp::composite_disposable_wrapper> d{} : op(rpp::source::create<T>([&d](auto&& obs) { | |
| d = rpp::composite_disposable_wrapper::make(); | |
| obs.set_upstream(d.value()); | |
| obs.on_error({}); | |
| CHECK(d.is_disposed()); | |
| CHECK(obs.is_disposed()); | |
| })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); | |
| CHECK(d); | |
| CHECK(d->is_disposed()); | |
| std::optional<rpp::composite_disposable_wrapper> d{}; | |
| op(rpp::source::create<T>([&d](auto&& obs) { | |
| d = rpp::composite_disposable_wrapper::make(); | |
| obs.set_upstream(*d); | |
| obs.on_error({}); | |
| CHECK(obs.is_disposed()); | |
| })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); | |
| CHECK(d.has_value()); | |
| CHECK(d->is_disposed()); |
🧰 Tools
🪛 GitHub Check: tests ci-macos Release
[failure] 118-118:
expected ';' at end of declaration
[failure] 118-118:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - ubsan
[failure] 118-118:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - lsan
[failure] 118-118:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - asan
[failure] 118-118:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - tsan
[failure] 118-118:
expected ';' at end of declaration
| std::optional<rpp::composite_disposable_wrapper> d{} : op(rpp::source::create<T>([&d](auto&& obs) { | ||
| d = rpp::composite_disposable_wrapper::make(); | ||
| obs.set_upstream(d.value()); | ||
| obs.on_completed(); | ||
| CHECK(d.is_disposed()); | ||
| CHECK(obs.is_disposed()); | ||
| })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); | ||
|
|
||
| CHECK(d); | ||
| CHECK(d->is_disposed()); |
There was a problem hiding this comment.
Fix syntax error and consider additional improvements
There's a syntax error in the lambda capture on line 131, similar to the previous test case. The colon : should be replaced with an equals sign =. Additionally, consider the following improvements:
- Use
std::optional<rpp::composite_disposable_wrapper>instead ofrpp::composite_disposable_wrapperto match the type ofd. - Use
CHECK(d.has_value())instead ofCHECK(d)for clarity.
Apply this diff to fix the syntax error and implement the suggested improvements:
- std::optional<rpp::composite_disposable_wrapper> d{} : op(rpp::source::create<T>([&d](auto&& obs) {
+ std::optional<rpp::composite_disposable_wrapper> d{};
+ op(rpp::source::create<T>([&d](auto&& obs) {
d = rpp::composite_disposable_wrapper::make();
- obs.set_upstream(d.value());
+ obs.set_upstream(*d);
obs.on_completed();
CHECK(obs.is_disposed());
})).subscribe([](const auto&) {}, [](const std::exception_ptr&) {});
- CHECK(d);
+ CHECK(d.has_value());
CHECK(d->is_disposed());These changes will fix the syntax error and improve the readability and robustness of the test case.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| std::optional<rpp::composite_disposable_wrapper> d{} : op(rpp::source::create<T>([&d](auto&& obs) { | |
| d = rpp::composite_disposable_wrapper::make(); | |
| obs.set_upstream(d.value()); | |
| obs.on_completed(); | |
| CHECK(d.is_disposed()); | |
| CHECK(obs.is_disposed()); | |
| })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); | |
| CHECK(d); | |
| CHECK(d->is_disposed()); | |
| std::optional<rpp::composite_disposable_wrapper> d{}; | |
| op(rpp::source::create<T>([&d](auto&& obs) { | |
| d = rpp::composite_disposable_wrapper::make(); | |
| obs.set_upstream(*d); | |
| obs.on_completed(); | |
| CHECK(obs.is_disposed()); | |
| })).subscribe([](const auto&) {}, [](const std::exception_ptr&) {}); | |
| CHECK(d.has_value()); | |
| CHECK(d->is_disposed()); |
🧰 Tools
🪛 GitHub Check: tests ci-macos Release
[failure] 131-131:
expected ';' at end of declaration
[failure] 131-131:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - ubsan
[failure] 131-131:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - lsan
[failure] 131-131:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - asan
[failure] 131-131:
expected ';' at end of declaration
🪛 GitHub Check: Sanitize - tsan
[failure] 131-131:
expected ';' at end of declaration
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (1)
src/tests/rpp/test_merge.cpp (1)
232-234: Avoid using 'std::this_thread::sleep_for' in testsUsing
std::this_thread::sleep_forto synchronize threads in tests can lead to flaky tests due to timing dependencies. The test may pass or fail unpredictably based on system load or scheduling.Consider using synchronization primitives such as condition variables or promises/futures to coordinate between threads reliably.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- src/rpp/rpp/operators/details/combining_strategy.hpp (1 hunks)
- src/tests/rpp/test_merge.cpp (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/rpp/rpp/operators/details/combining_strategy.hpp
🧰 Additional context used
| std::mutex m{}; | ||
| source | ||
| | rpp::ops::as_blocking() | ||
| | rpp::ops::subscribe([&](auto&&) { | ||
| REQUIRE(extracted_obs.has_value()); | ||
| CHECK(!on_error_called); | ||
| std::thread{[extracted_obs] | ||
| std::thread{[extracted_obs,&m] | ||
| { | ||
| std::lock_guard lock{m}; |
There was a problem hiding this comment.
Potential data race when accessing 'extracted_obs'
The variable extracted_obs is accessed from multiple threads without proper synchronization. While the access to extracted_obs->on_error(...) inside the detached thread is protected by a mutex, the accesses to extracted_obs in the subscriber lambda (e.g., REQUIRE(extracted_obs.has_value());) are not synchronized. This can lead to data races and undefined behavior.
Consider synchronizing all accesses to extracted_obs or using thread-safe constructs to ensure that shared data is accessed safely across threads.
Undefined behavior due to capturing local mutex by reference in detached thread
The mutex m declared at line 225 is a local variable within the lambda passed to rpp::ops::subscribe. It is captured by reference (&m) in the detached thread created at line 231. Since the lambda scope may exit and m may be destroyed before the detached thread completes execution, this can lead to undefined behavior when the detached thread tries to lock m.
To fix this issue, consider extending the lifetime of the mutex by moving it outside the lambda or by capturing it by value using a std::shared_ptr.
Apply this diff to fix the issue:
- std::mutex m{};
+ auto m = std::make_shared<std::mutex>();
source
| rpp::ops::as_blocking()
| rpp::ops::subscribe([&](auto&&) {
REQUIRE(extracted_obs.has_value());
CHECK(!on_error_called);
- std::thread{[extracted_obs,&m]
+ std::thread{[extracted_obs, m]
{
- std::lock_guard lock{m};
+ std::lock_guard lock{*m};
extracted_obs->on_error(std::exception_ptr{});
}}.detach();
std::this_thread::sleep_for(std::chrono::seconds{1});
CHECK(!on_error_called); },
[&](auto) { on_error_called = true; });📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| std::mutex m{}; | |
| source | |
| | rpp::ops::as_blocking() | |
| | rpp::ops::subscribe([&](auto&&) { | |
| REQUIRE(extracted_obs.has_value()); | |
| CHECK(!on_error_called); | |
| std::thread{[extracted_obs] | |
| std::thread{[extracted_obs,&m] | |
| { | |
| std::lock_guard lock{m}; | |
| auto m = std::make_shared<std::mutex>(); | |
| source | |
| | rpp::ops::as_blocking() | |
| | rpp::ops::subscribe([&](auto&&) { | |
| REQUIRE(extracted_obs.has_value()); | |
| CHECK(!on_error_called); | |
| std::thread{[extracted_obs, m] | |
| { | |
| std::lock_guard lock{*m}; |



Summary by CodeRabbit
New Features
mergeandmerge_withfunctionalities, including error handling and concurrency scenarios.Bug Fixes