fix infinite looping#628
Conversation
WalkthroughWalkthroughThe changes include enhancements to the Changes
Sequence Diagram(s)sequenceDiagram
participant State
participant Details
State->>Details: Check state
Details->>State: is_stopped()?
alt Stopped
Details-->>State: return true
else Not Stopped
Details->>State: is_disposed()?
alt Disposed
Details-->>State: return true
else Not Disposed
Details-->>State: return false
end
end
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/rpp/rpp/operators/take_until.hpp (1 hunks)
Additional comments not posted (1)
src/rpp/rpp/operators/take_until.hpp (1)
68-68: Enhanced state checking inis_disposedmethod.The modification to include both
is_stopped()andis_disposed()in theis_disposedmethod is a sensible change that aligns with the PR objective to fix infinite looping issues. This broader check should help prevent scenarios where the object might be stopped but not yet disposed, which could lead to unwanted behavior.The code changes are approved.
Run the following script to verify the integration of this change with other parts of the codebase:
Verification successful
Integration of
is_disposedmethod change verified.The modification to include both
is_stopped()andis_disposed()in theis_disposedmethod has been reviewed across the codebase. The broader check aligns with the PR's objective to prevent infinite loops and does not introduce any new issues. The integration is consistent with the method's usage in various parts of the codebase.
- No issues related to infinite looping or improper state checks were found.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the integration of the `is_disposed` method change with other parts of the codebase. # Test: Search for the method usage. Expect: No issues related to infinite looping or improper state checks. rg --type cpp -A 5 $'is_disposed'Length of output: 117991
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Actionable comments posted: 1
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/tests/rpp/test_take_until.cpp (2 hunks)
Additional comments not posted (1)
src/tests/rpp/test_take_until.cpp (1)
16-21: Headers addition approved.The newly added headers are essential for the functionality of the new test case, ensuring that the operators needed for composing the reactive streams are available.
Also applies to: 29-29
BENCHMARK RESULTS (AUTOGENERATED)
|
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 305.99 ns | 2.19 ns | 2.16 ns | 1.01 |
| Subscribe empty callbacks to empty observable via pipe operator | 306.74 ns | 2.16 ns | 2.16 ns | 1.00 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 701.62 ns | 0.31 ns | 0.31 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 1058.74 ns | 3.71 ns | 3.42 ns | 1.08 |
| concat_as_source of just(1 immediate) create + subscribe | 2335.89 ns | 100.19 ns | 101.54 ns | 0.99 |
| defer from array of 1 - defer + create + subscribe + immediate | 728.22 ns | 0.31 ns | 0.36 ns | 0.86 |
| interval - interval + take(3) + subscribe + immediate | 2100.75 ns | 59.23 ns | 59.23 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3027.46 ns | 32.46 ns | 32.42 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 29269.86 ns | 28576.03 ns | 28396.97 ns | 1.01 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 42838.08 ns | 52819.21 ns | 50697.28 ns | 1.04 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3519.43 ns | 121.82 ns | 133.90 ns | 0.91 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1090.74 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 849.57 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 992.43 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 863.58 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 1254.96 ns | 0.62 ns | 0.62 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 912.02 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1125.81 ns | 17.30 ns | 17.28 ns | 1.00 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 837.97 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 265.93 ns | 2.16 ns | 2.16 ns | 1.00 |
| current_thread scheduler create worker + schedule | 378.20 ns | 5.56 ns | 5.56 ns | 1.00 |
| current_thread scheduler create worker + schedule + recursive schedule | 869.29 ns | 57.17 ns | 56.76 ns | 1.01 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 906.45 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 899.23 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2305.75 ns | 168.96 ns | 166.74 ns | 1.01 |
| immediate_just+buffer(2)+subscribe | 1564.69 ns | 13.59 ns | 13.59 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2391.64 ns | 1126.76 ns | 1098.39 ns | 1.03 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 827.24 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 913.52 ns | 0.74 ns | 0.31 ns | 2.41 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 1947.57 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3516.01 ns | 172.84 ns | 176.73 ns | 0.98 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3701.77 ns | 171.67 ns | 169.29 ns | 1.01 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 131.14 ns | 139.07 ns | 0.94 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3535.84 ns | 1000.00 ns | 1050.21 ns | 0.95 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2119.22 ns | 206.43 ns | 210.06 ns | 0.98 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 34.50 ns | 14.61 ns | 14.71 ns | 0.99 |
| subscribe 100 observers to publish_subject | 199269.67 ns | 15660.68 ns | 15189.88 ns | 1.03 |
| 100 on_next to 100 observers to publish_subject | 27333.05 ns | 20121.00 ns | 17137.61 ns | 1.17 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1375.61 ns | 13.90 ns | 12.97 ns | 1.07 |
| basic sample with immediate scheduler | 1443.22 ns | 5.55 ns | 5.55 ns | 1.00 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 923.37 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2050.46 ns | 1027.98 ns | 1003.45 ns | 1.02 |
| create(on_error())+retry(1)+subscribe | 593.10 ns | 107.96 ns | 122.05 ns | 0.88 |
ci-macos
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 964.13 ns | 3.93 ns | 4.73 ns | 0.83 |
| Subscribe empty callbacks to empty observable via pipe operator | 965.49 ns | 3.93 ns | 4.66 ns | 0.84 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1920.32 ns | 0.23 ns | 0.28 ns | 0.83 |
| from array of 1 - create + subscribe + current_thread | 2420.64 ns | 31.18 ns | 41.63 ns | 0.75 |
| concat_as_source of just(1 immediate) create + subscribe | 5351.39 ns | 332.70 ns | 397.98 ns | 0.84 |
| defer from array of 1 - defer + create + subscribe + immediate | 1951.73 ns | 0.23 ns | 0.28 ns | 0.84 |
| interval - interval + take(3) + subscribe + immediate | 4906.50 ns | 113.87 ns | 138.60 ns | 0.82 |
| interval - interval + take(3) + subscribe + current_thread | 5990.65 ns | 95.48 ns | 115.56 ns | 0.83 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 83779.71 ns | 80016.29 ns | 103981.40 ns | 0.77 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 87456.42 ns | 86883.92 ns | 103356.36 ns | 0.84 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 8099.22 ns | 380.19 ns | 454.29 ns | 0.84 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 2714.68 ns | 0.22 ns | 0.27 ns | 0.82 |
| immediate_just+filter(true)+subscribe | 1993.93 ns | 0.22 ns | 0.27 ns | 0.82 |
| immediate_just(1,2)+skip(1)+subscribe | 2614.10 ns | 0.22 ns | 0.27 ns | 0.81 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1966.37 ns | 0.45 ns | 0.54 ns | 0.82 |
| immediate_just(1,2)+first()+subscribe | 3025.40 ns | 0.22 ns | 0.27 ns | 0.82 |
| immediate_just(1,2)+last()+subscribe | 2257.65 ns | 0.22 ns | 0.27 ns | 0.82 |
| immediate_just+take_last(1)+subscribe | 2868.78 ns | 0.22 ns | 0.27 ns | 0.82 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 2015.56 ns | 0.22 ns | 0.28 ns | 0.81 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 845.48 ns | 4.09 ns | 4.96 ns | 0.82 |
| current_thread scheduler create worker + schedule | 1157.94 ns | 37.06 ns | 45.18 ns | 0.82 |
| current_thread scheduler create worker + schedule + recursive schedule | 1926.01 ns | 195.98 ns | 240.12 ns | 0.82 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 2060.00 ns | 4.01 ns | 5.16 ns | 0.78 |
| immediate_just+scan(10, std::plus)+subscribe | 2313.68 ns | 0.47 ns | 0.55 ns | 0.86 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 5253.04 ns | 403.86 ns | 474.52 ns | 0.85 |
| immediate_just+buffer(2)+subscribe | 2488.15 ns | 63.57 ns | 74.93 ns | 0.85 |
| immediate_just+window(2)+subscribe + subscsribe inner | 5324.35 ns | 2401.41 ns | 2807.72 ns | 0.86 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 1990.02 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 1994.29 ns | 0.22 ns | 0.27 ns | 0.82 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 4766.38 ns | 5.01 ns | 5.70 ns | 0.88 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 7224.88 ns | 449.87 ns | 523.94 ns | 0.86 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 8132.65 ns | 432.25 ns | 517.15 ns | 0.84 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 450.90 ns | 536.32 ns | 0.84 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 7581.77 ns | 1835.91 ns | 2231.09 ns | 0.82 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 4861.66 ns | 803.74 ns | 1185.22 ns | 0.68 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 73.15 ns | 47.80 ns | 57.04 ns | 0.84 |
| subscribe 100 observers to publish_subject | 336296.33 ns | 40010.62 ns | 47743.43 ns | 0.84 |
| 100 on_next to 100 observers to publish_subject | 49426.36 ns | 16735.54 ns | 21890.15 ns | 0.76 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 2674.83 ns | 61.86 ns | 85.73 ns | 0.72 |
| basic sample with immediate scheduler | 3005.57 ns | 18.68 ns | 20.34 ns | 0.92 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 2305.88 ns | 0.23 ns | 0.27 ns | 0.84 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 6274.87 ns | 4006.73 ns | 4896.22 ns | 0.82 |
| create(on_error())+retry(1)+subscribe | 1763.83 ns | 281.60 ns | 350.56 ns | 0.80 |
ci-ubuntu-clang
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 268.05 ns | 1.57 ns | 0.88 ns | 1.79 |
| Subscribe empty callbacks to empty observable via pipe operator | 277.04 ns | 1.56 ns | 0.88 ns | 1.78 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 564.16 ns | 0.44 ns | 0.42 ns | 1.05 |
| from array of 1 - create + subscribe + current_thread | 782.92 ns | 4.32 ns | 4.02 ns | 1.07 |
| concat_as_source of just(1 immediate) create + subscribe | 2374.27 ns | 131.40 ns | 135.85 ns | 0.97 |
| defer from array of 1 - defer + create + subscribe + immediate | 790.13 ns | 0.31 ns | 0.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 2215.00 ns | 58.30 ns | 58.36 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3181.03 ns | 30.86 ns | 30.88 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 27207.17 ns | 27643.50 ns | 28130.59 ns | 0.98 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 35788.71 ns | 35809.97 ns | 34108.41 ns | 1.05 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3679.47 ns | 149.07 ns | 158.96 ns | 0.94 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1161.42 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 850.80 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1076.43 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 870.05 ns | 0.62 ns | 0.62 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 1380.55 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 1002.03 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1192.17 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 884.96 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 291.48 ns | 1.56 ns | 0.88 ns | 1.78 |
| current_thread scheduler create worker + schedule | 395.08 ns | 4.63 ns | 4.01 ns | 1.15 |
| current_thread scheduler create worker + schedule + recursive schedule | 860.03 ns | 67.59 ns | 57.56 ns | 1.17 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 855.89 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 973.44 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2248.56 ns | 138.01 ns | 136.97 ns | 1.01 |
| immediate_just+buffer(2)+subscribe | 1528.73 ns | 13.59 ns | 13.59 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2432.11 ns | 934.83 ns | 946.36 ns | 0.99 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 843.74 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 851.04 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 1995.57 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3278.69 ns | 159.16 ns | 160.33 ns | 0.99 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3743.54 ns | 148.29 ns | 146.37 ns | 1.01 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 144.13 ns | 144.44 ns | 1.00 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3426.62 ns | 844.66 ns | 846.46 ns | 1.00 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2293.18 ns | 203.18 ns | 198.96 ns | 1.02 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 54.00 ns | 17.85 ns | 17.83 ns | 1.00 |
| subscribe 100 observers to publish_subject | 213055.80 ns | 16064.20 ns | 15948.90 ns | 1.01 |
| 100 on_next to 100 observers to publish_subject | 35315.59 ns | 20742.52 ns | 20616.54 ns | 1.01 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1302.25 ns | 12.04 ns | 11.44 ns | 1.05 |
| basic sample with immediate scheduler | 1282.47 ns | 5.86 ns | 5.86 ns | 1.00 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1035.75 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2167.08 ns | 1249.03 ns | 1246.50 ns | 1.00 |
| create(on_error())+retry(1)+subscribe | 651.11 ns | 138.77 ns | 146.59 ns | 0.95 |
ci-windows
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 575.55 ns | 4.57 ns | 4.01 ns | 1.14 |
| Subscribe empty callbacks to empty observable via pipe operator | 596.31 ns | 4.62 ns | 4.02 ns | 1.15 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1159.47 ns | 9.63 ns | 9.63 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 1429.18 ns | 17.67 ns | 17.90 ns | 0.99 |
| concat_as_source of just(1 immediate) create + subscribe | 3721.11 ns | 186.27 ns | 174.68 ns | 1.07 |
| defer from array of 1 - defer + create + subscribe + immediate | 1191.23 ns | 9.41 ns | 9.42 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 3664.10 ns | 144.41 ns | 145.36 ns | 0.99 |
| interval - interval + take(3) + subscribe + current_thread | 3436.12 ns | 65.30 ns | 66.95 ns | 0.98 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 124777.78 ns | 120388.89 ns | 112020.00 ns | 1.07 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 136412.50 ns | 137314.29 ns | 128962.50 ns | 1.06 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5363.51 ns | 217.71 ns | 219.16 ns | 0.99 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1846.94 ns | 24.99 ns | 25.28 ns | 0.99 |
| immediate_just+filter(true)+subscribe | 1330.25 ns | 24.06 ns | 24.35 ns | 0.99 |
| immediate_just(1,2)+skip(1)+subscribe | 1744.22 ns | 23.44 ns | 24.06 ns | 0.97 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1635.84 ns | 26.30 ns | 29.00 ns | 0.91 |
| immediate_just(1,2)+first()+subscribe | 2051.58 ns | 23.76 ns | 22.82 ns | 1.04 |
| immediate_just(1,2)+last()+subscribe | 2061.80 ns | 24.67 ns | 24.06 ns | 1.03 |
| immediate_just+take_last(1)+subscribe | 2025.40 ns | 69.07 ns | 69.46 ns | 0.99 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 1363.10 ns | 26.53 ns | 27.44 ns | 0.97 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 484.88 ns | 6.17 ns | 6.48 ns | 0.95 |
| current_thread scheduler create worker + schedule | 655.45 ns | 14.51 ns | 14.04 ns | 1.03 |
| current_thread scheduler create worker + schedule + recursive schedule | 1105.56 ns | 105.29 ns | 104.86 ns | 1.00 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 1331.09 ns | 24.35 ns | 24.35 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 1438.76 ns | 26.53 ns | 26.82 ns | 0.99 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 3471.02 ns | 204.33 ns | 198.84 ns | 1.03 |
| immediate_just+buffer(2)+subscribe | 2674.71 ns | 68.59 ns | 68.83 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 4005.21 ns | 1305.72 ns | 1309.47 ns | 1.00 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 1619.04 ns | 23.14 ns | 23.14 ns | 1.00 |
| immediate_just+take_while(true)+subscribe | 1334.38 ns | 24.07 ns | 24.35 ns | 0.99 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 3163.81 ns | 11.11 ns | 11.10 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5125.47 ns | 220.69 ns | 218.98 ns | 1.01 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5483.15 ns | 210.87 ns | 212.89 ns | 0.99 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 193.30 ns | 191.40 ns | 1.01 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 5406.83 ns | 933.70 ns | 940.38 ns | 0.99 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 3544.81 ns | 512.78 ns | 514.36 ns | 1.00 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 37.21 ns | 20.36 ns | 20.07 ns | 1.01 |
| subscribe 100 observers to publish_subject | 265575.00 ns | 26825.00 ns | 27263.16 ns | 0.98 |
| 100 on_next to 100 observers to publish_subject | 51780.00 ns | 38760.71 ns | 32659.38 ns | 1.19 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1925.75 ns | 102.19 ns | 100.81 ns | 1.01 |
| basic sample with immediate scheduler | 2287.19 ns | 74.67 ns | 73.93 ns | 1.01 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1470.77 ns | 24.66 ns | 24.97 ns | 0.99 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2183.83 ns | 355.45 ns | 350.26 ns | 1.01 |
| create(on_error())+retry(1)+subscribe | 1694.89 ns | 140.69 ns | 145.86 ns | 0.96 |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## v2 #628 +/- ##
=======================================
Coverage 95.62% 95.62%
=======================================
Files 98 98
Lines 1897 1897
=======================================
Hits 1814 1814
Misses 83 83 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (4)
- src/rpp/rpp/operators/retry.hpp (2 hunks)
- src/rpp/rpp/operators/retry_when.hpp (3 hunks)
- src/rpp/rpp/sources/concat.hpp (2 hunks)
- src/tests/rpp/test_take_until.cpp (2 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/tests/rpp/test_take_until.cpp
Additional comments not posted (4)
src/rpp/rpp/operators/retry.hpp (1)
65-66: Approved: Modification to state clearing logic inon_error.The change to invoke
state->clear()before checkingis_inside_drainis a strategic move to ensure that the state is consistently reset before further operations. This adjustment is crucial for preventing potential infinite loops or incorrect behavior, aligning well with the PR's objective to fix infinite looping issues.src/rpp/rpp/operators/retry_when.hpp (2)
59-63: Approved: Enhanced state management inon_next.The addition of
state->clear()immediately after settinglocally_disposedto true in theon_nextmethod ensures that the state is reset at a critical point. This change enhances the robustness of theretry_whenoperator by ensuring that the state is appropriately managed before proceeding with further operations, which is essential for preventing issues related to state mismanagement.
79-79: Approved: Const-correctness inset_upstream.The modification to make the
set_upstreammethod aconstmethod is a positive change, enhancing code reliability and clarity by enforcing const-correctness. This adjustment ensures that the method's implementation is aligned with modern C++ standards, which emphasize the importance of const-correctness to prevent unintended side effects.src/rpp/rpp/sources/concat.hpp (1)
78-79: Approved: Refinement in state clearing logic inon_completed.The adjustment to clear the state at the beginning of the
on_completedmethod is a thoughtful change that enhances the clarity and efficiency of state management. This modification ensures that the state is reset at a crucial point, potentially improving the performance and predictability of theconcatoperator during the completion process.
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (3)
- src/tests/rpp/test_concat.cpp (1 hunks)
- src/tests/rpp/test_retry.cpp (1 hunks)
- src/tests/rpp/test_retry_when.cpp (1 hunks)
Additional comments not posted (3)
src/tests/rpp/test_retry.cpp (1)
176-190: Review of the new test case "retry disposes on looping".The test case is well-structured and effectively checks the disposal of resources after an error is emitted using the
retryoperator. The use ofrpp::source::concatto create a source that emits a value and then an error is appropriate for this test scenario.However, it's important to ensure that the test case covers all necessary aspects and does not have any logical errors. The test setup and assertions appear to be correct, and the use of mock observers and sequences is consistent with the rest of the test suite.
src/tests/rpp/test_retry_when.cpp (1)
251-267: Review of the new test case "retry_when disposes on looping".The test case is well-structured and effectively checks the disposal of resources after an error is emitted using the
retry_whenoperator. The use ofrpp::source::concatto create a source that emits a value and then an error is appropriate for this test scenario.However, it's important to ensure that the test case covers all necessary aspects and does not have any logical errors. The test setup and assertions appear to be correct, and the use of mock observers and sequences is consistent with the rest of the test suite.
src/tests/rpp/test_concat.cpp (1)
251-264: Review of the new test case "concat disposes on looping".The test case is well-structured and effectively checks the disposal of resources after a completion signal is emitted using the
concatoperator. The use ofrpp::source::concatto create a source that emits a value and then completes is appropriate for this test scenario.However, it's important to ensure that the test case covers all necessary aspects and does not have any logical errors. The test setup and assertions appear to be correct, and the use of mock observers and sequences is consistent with the rest of the test suite.
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (3)
- src/tests/rpp/test_repeat.cpp (1 hunks)
- src/tests/rpp/test_retry_when.cpp (2 hunks)
- src/tests/utils/disposable_observable.hpp (1 hunks)
Additional comments not posted (4)
src/tests/rpp/test_repeat.cpp (1)
137-137: Approved: Modification to test case forrepeatoperator.The change to use
repeat(1)instead of a parameterlessrepeatis a good fix to ensure the test does not enter an infinite loop. This modification aligns with the PR's objective to fix infinite looping issues.src/tests/utils/disposable_observable.hpp (2)
102-110: Approved: New test section for disposable disposal on error.The addition of the test section "operator disposes disposable on_error" is crucial for ensuring that disposables are correctly disposed of when an error occurs. This helps in managing resources efficiently and preventing memory leaks.
112-120: Approved: New test section for disposable disposal on completion.The addition of the test section "operator disposes disposable on_completed" is crucial for ensuring that disposables are correctly disposed of when the observable completes. This helps in managing resources efficiently and preventing memory leaks.
src/tests/rpp/test_retry_when.cpp (1)
251-267: Approved: New test case forretry_whenoperator in looping scenarios.The addition of the test case "retry_when disposes on looping" is crucial for verifying that the
retry_whenoperator handles disposals correctly in looping scenarios. This test ensures robust error handling and resource management.
|



Summary by CodeRabbit
New Features
is_disposedmethod to provide a more comprehensive check by considering both stopped and disposed states.take_untiloperator to prevent infinite loops in reactive streams.concat,retry, andretry_whenoperators to verify proper disposal behavior during looping scenarios.Bug Fixes
on_completedmethod to enhance performance and clarity.