retry_when operator#616
Conversation
for more information, see https://pre-commit.ci
WalkthroughWalkthroughThis update introduces the Changes
Sequence Diagram(s)sequenceDiagram
participant Observable
participant Notifier
participant RetryLogic
Observable->>RetryLogic: Emit value
RetryLogic-->>Observable: Forward value
Observable->>RetryLogic: Emit error
RetryLogic->>Notifier: Notify error
Notifier-->>RetryLogic: Return new observable
RetryLogic->>Observable: Resubscribe to new observable
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (4)
- src/rpp/rpp/operators.hpp (1 hunks)
- src/rpp/rpp/operators/fwd.hpp (1 hunks)
- src/rpp/rpp/operators/retry_when.hpp (1 hunks)
- src/tests/rpp/test_retry_when.cpp (1 hunks)
Additional comments not posted (14)
src/rpp/rpp/operators.hpp (1)
132-132: Include directive forretry_when.hppis appropriate.The addition of the
#include <rpp/operators/retry_when.hpp>directive is necessary for integrating theretry_whenoperator into the module.src/rpp/rpp/operators/retry_when.hpp (4)
21-53: Implementation ofretry_when_impl_inner_strategylooks correct.The
retry_when_impl_inner_strategyis well-implemented with appropriate handling ofon_next,on_error, andon_completedmethods. The use ofRPP_NO_UNIQUE_ADDRESSis suitable for optimizing memory layout.
55-97: Implementation ofretry_when_impl_strategyis robust.The
retry_when_impl_strategycorrectly handles the error notification and resubscription logic. The use ofstd::optionalfor the notifier observable ensures safe handling of potential exceptions.
117-135: Implementation ofretry_when_tis appropriate.The
retry_when_tstructure provides a clean interface for applying theretry_whenoperator to observables. The use of perfect forwarding ensures efficient handling of the notifier.
138-157:retry_whenoperator function is well-defined.The
retry_whenfunction is correctly constrained to ensure the notifier returns an observable. The documentation provides clear guidance on its usage.src/tests/rpp/test_retry_when.cpp (8)
25-47: Test forretry_whenwith no error emission is comprehensive.The test correctly verifies that the observable behaves as expected when no errors occur. The checks for subscription count and observer notifications are appropriate.
51-77: Test forretry_whenwith a single error is thorough.The test accurately simulates an error scenario and verifies that the observable resubscribes correctly. The assertions ensure the operator's behavior aligns with expectations.
80-91: Test forretry_whenwith multiple emissions is valid.The test checks that the observable resubscribes only once despite multiple emissions from the notifier, confirming the operator's correct behavior.
94-108: Test forretry_whenwith a throwing notifier is effective.The test ensures that if the notifier throws an exception, the observable does not resubscribe and the error is propagated correctly.
111-124: Test forretry_whenwith an error notifier is correct.The test verifies that when the notifier returns an error, the observable does not resubscribe and the error is handled appropriately.
127-140: Test forretry_whenwith an empty notifier is accurate.The test confirms that if the notifier returns an empty observable, the original observable completes without resubscribing.
145-157: Test forretry_whencopy behavior is precise.The test ensures that the
retry_whenoperator does not produce unnecessary copies, maintaining efficient memory usage.
159-171: Test forretry_whendisposable contracts is valid.The test verifies that the
retry_whenoperator adheres to disposable contracts, ensuring proper resource management.src/rpp/rpp/operators/fwd.hpp (1)
185-187: Addition ofretry_whento forward declarations is appropriate.The
retry_whenfunction is correctly constrained and its inclusion in the forward declarations aligns with the overall design of the operators module.
BENCHMARK RESULTS (AUTOGENERATED)
|
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 322.29 ns | 2.20 ns | 2.22 ns | 0.99 |
| Subscribe empty callbacks to empty observable via pipe operator | 305.59 ns | 2.16 ns | 2.16 ns | 1.00 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 731.99 ns | 0.31 ns | 0.31 ns | 1.01 |
| from array of 1 - create + subscribe + current_thread | 1083.43 ns | 3.73 ns | 4.02 ns | 0.93 |
| concat_as_source of just(1 immediate) create + subscribe | 2306.57 ns | 115.03 ns | 114.56 ns | 1.00 |
| defer from array of 1 - defer + create + subscribe + immediate | 865.46 ns | 0.31 ns | 0.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 2152.31 ns | 59.23 ns | 59.23 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3220.85 ns | 32.42 ns | 32.42 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 29785.00 ns | 27700.68 ns | 28741.19 ns | 0.96 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 40466.28 ns | 51358.50 ns | 49464.57 ns | 1.04 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3557.33 ns | 124.03 ns | 132.49 ns | 0.94 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1118.31 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 895.12 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1051.89 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 947.29 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 1251.39 ns | 0.62 ns | 0.62 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 992.22 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1156.44 ns | 17.60 ns | 17.60 ns | 1.00 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 893.23 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 268.43 ns | 2.16 ns | 2.16 ns | 1.00 |
| current_thread scheduler create worker + schedule | 372.04 ns | 5.86 ns | 5.87 ns | 1.00 |
| current_thread scheduler create worker + schedule + recursive schedule | 940.16 ns | 56.88 ns | 57.20 ns | 0.99 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 891.83 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 923.94 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2373.80 ns | 163.21 ns | 162.26 ns | 1.01 |
| immediate_just+buffer(2)+subscribe | 1627.74 ns | 13.91 ns | 13.90 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2480.53 ns | 1035.92 ns | 1095.39 ns | 0.95 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 878.95 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 886.33 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 1965.68 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3422.75 ns | 180.94 ns | 176.98 ns | 1.02 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3654.95 ns | 171.77 ns | 165.39 ns | 1.04 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 150.80 ns | 143.08 ns | 1.05 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3563.77 ns | 917.32 ns | 973.55 ns | 0.94 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2159.35 ns | 213.81 ns | 209.29 ns | 1.02 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 34.61 ns | 14.71 ns | 14.99 ns | 0.98 |
| subscribe 100 observers to publish_subject | 209795.40 ns | 15538.99 ns | 15549.80 ns | 1.00 |
| 100 on_next to 100 observers to publish_subject | 32696.68 ns | 20171.48 ns | 20082.22 ns | 1.00 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1457.45 ns | 12.66 ns | 12.65 ns | 1.00 |
| basic sample with immediate scheduler | 1419.92 ns | 5.55 ns | 5.55 ns | 1.00 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 936.30 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2074.39 ns | 976.24 ns | 985.15 ns | 0.99 |
| create(on_error())+retry(1)+subscribe | 673.09 ns | 119.19 ns | 119.02 ns | 1.00 |
ci-macos
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 982.25 ns | 3.92 ns | 4.78 ns | 0.82 |
| Subscribe empty callbacks to empty observable via pipe operator | 989.79 ns | 4.02 ns | 4.78 ns | 0.84 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1941.74 ns | 0.23 ns | 0.29 ns | 0.82 |
| from array of 1 - create + subscribe + current_thread | 2421.15 ns | 33.57 ns | 43.41 ns | 0.77 |
| concat_as_source of just(1 immediate) create + subscribe | 5371.34 ns | 334.48 ns | 409.43 ns | 0.82 |
| defer from array of 1 - defer + create + subscribe + immediate | 1943.18 ns | 0.23 ns | 0.29 ns | 0.81 |
| interval - interval + take(3) + subscribe + immediate | 4765.79 ns | 113.41 ns | 141.04 ns | 0.80 |
| interval - interval + take(3) + subscribe + current_thread | 5888.96 ns | 94.14 ns | 118.04 ns | 0.80 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 81811.00 ns | 87220.75 ns | 102220.82 ns | 0.85 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 85052.07 ns | 84338.77 ns | 107671.22 ns | 0.78 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 8124.80 ns | 375.84 ns | 463.93 ns | 0.81 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 2847.52 ns | 0.23 ns | 0.29 ns | 0.81 |
| immediate_just+filter(true)+subscribe | 2101.50 ns | 0.23 ns | 0.29 ns | 0.81 |
| immediate_just(1,2)+skip(1)+subscribe | 2744.08 ns | 0.23 ns | 0.29 ns | 0.81 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 2070.22 ns | 0.47 ns | 0.58 ns | 0.81 |
| immediate_just(1,2)+first()+subscribe | 3170.61 ns | 0.23 ns | 0.29 ns | 0.81 |
| immediate_just(1,2)+last()+subscribe | 2396.18 ns | 0.23 ns | 0.29 ns | 0.81 |
| immediate_just+take_last(1)+subscribe | 3003.91 ns | 0.23 ns | 0.32 ns | 0.73 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 2165.75 ns | 0.23 ns | 0.29 ns | 0.81 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 823.57 ns | 4.06 ns | 5.10 ns | 0.79 |
| current_thread scheduler create worker + schedule | 1201.45 ns | 35.81 ns | 45.03 ns | 0.80 |
| current_thread scheduler create worker + schedule + recursive schedule | 1968.95 ns | 197.66 ns | 248.11 ns | 0.80 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 2237.96 ns | 4.40 ns | 5.44 ns | 0.81 |
| immediate_just+scan(10, std::plus)+subscribe | 2452.41 ns | 0.50 ns | 0.57 ns | 0.88 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 5693.73 ns | 439.86 ns | 500.12 ns | 0.88 |
| immediate_just+buffer(2)+subscribe | 2533.85 ns | 64.55 ns | 118.37 ns | 0.55 |
| immediate_just+window(2)+subscribe + subscsribe inner | 5338.59 ns | 2417.08 ns | 2961.78 ns | 0.82 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 2166.18 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 2257.60 ns | 0.25 ns | 0.29 ns | 0.88 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 4912.77 ns | 4.90 ns | 6.02 ns | 0.81 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 7386.34 ns | 442.21 ns | 541.66 ns | 0.82 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 8540.69 ns | 441.29 ns | 540.14 ns | 0.82 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 480.98 ns | 565.46 ns | 0.85 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 8182.92 ns | 1993.27 ns | 2345.32 ns | 0.85 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 5284.04 ns | 814.97 ns | 990.04 ns | 0.82 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 75.72 ns | 50.13 ns | 60.20 ns | 0.83 |
| subscribe 100 observers to publish_subject | 345017.67 ns | 40678.52 ns | 50191.35 ns | 0.81 |
| 100 on_next to 100 observers to publish_subject | 50205.50 ns | 16876.83 ns | 20655.67 ns | 0.82 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 2755.78 ns | 67.84 ns | 83.24 ns | 0.81 |
| basic sample with immediate scheduler | 2757.23 ns | 18.76 ns | 22.98 ns | 0.82 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 2378.10 ns | 0.23 ns | 0.29 ns | 0.81 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 6436.16 ns | 4058.66 ns | 4982.32 ns | 0.81 |
| create(on_error())+retry(1)+subscribe | 1802.36 ns | 295.91 ns | 364.22 ns | 0.81 |
ci-ubuntu-clang
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 294.66 ns | 1.56 ns | 1.54 ns | 1.01 |
| Subscribe empty callbacks to empty observable via pipe operator | 296.40 ns | 1.56 ns | 1.54 ns | 1.01 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 611.22 ns | 0.31 ns | 0.31 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 839.68 ns | 4.32 ns | 4.32 ns | 1.00 |
| concat_as_source of just(1 immediate) create + subscribe | 2437.99 ns | 134.59 ns | 135.22 ns | 1.00 |
| defer from array of 1 - defer + create + subscribe + immediate | 829.87 ns | 0.31 ns | 0.31 ns | 1.01 |
| interval - interval + take(3) + subscribe + immediate | 2335.02 ns | 58.27 ns | 58.27 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3318.60 ns | 30.88 ns | 30.86 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 29871.14 ns | 28405.29 ns | 30085.74 ns | 0.94 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 40799.48 ns | 37398.96 ns | 36005.55 ns | 1.04 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3910.18 ns | 157.35 ns | 155.77 ns | 1.01 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1319.59 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 934.26 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1175.20 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 967.74 ns | 0.62 ns | 0.62 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 1532.29 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 1133.33 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1296.97 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 942.95 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 318.78 ns | 1.56 ns | 1.54 ns | 1.01 |
| current_thread scheduler create worker + schedule | 455.42 ns | 4.63 ns | 4.79 ns | 0.97 |
| current_thread scheduler create worker + schedule + recursive schedule | 950.95 ns | 58.15 ns | 54.78 ns | 1.06 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 946.47 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 1059.76 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2376.29 ns | 139.31 ns | 137.13 ns | 1.02 |
| immediate_just+buffer(2)+subscribe | 1693.31 ns | 14.04 ns | 13.58 ns | 1.03 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2559.97 ns | 1025.54 ns | 921.17 ns | 1.11 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 927.38 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 922.14 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 2104.11 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3445.51 ns | 158.62 ns | 158.41 ns | 1.00 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3958.63 ns | 146.44 ns | 146.37 ns | 1.00 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 145.13 ns | 144.05 ns | 1.01 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3517.62 ns | 848.45 ns | 850.46 ns | 1.00 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2475.94 ns | 204.87 ns | 205.94 ns | 0.99 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 54.03 ns | 17.56 ns | 17.65 ns | 0.99 |
| subscribe 100 observers to publish_subject | 225228.60 ns | 16156.95 ns | 15997.28 ns | 1.01 |
| 100 on_next to 100 observers to publish_subject | 35834.72 ns | 17552.43 ns | 20432.88 ns | 0.86 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1389.87 ns | 11.72 ns | 11.73 ns | 1.00 |
| basic sample with immediate scheduler | 1493.59 ns | 6.17 ns | 6.17 ns | 1.00 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1092.96 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2193.09 ns | 1210.08 ns | 1207.77 ns | 1.00 |
| create(on_error())+retry(1)+subscribe | 727.10 ns | 145.54 ns | 146.31 ns | 0.99 |
ci-windows
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 557.44 ns | 4.32 ns | 4.32 ns | 1.00 |
| Subscribe empty callbacks to empty observable via pipe operator | 583.32 ns | 4.32 ns | 4.32 ns | 1.00 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1142.84 ns | 4.63 ns | 4.62 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 1415.21 ns | 15.43 ns | 15.44 ns | 1.00 |
| concat_as_source of just(1 immediate) create + subscribe | 3710.87 ns | 174.79 ns | 173.72 ns | 1.01 |
| defer from array of 1 - defer + create + subscribe + immediate | 1196.62 ns | 4.93 ns | 4.94 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 3047.83 ns | 133.54 ns | 133.45 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3702.17 ns | 54.33 ns | 58.94 ns | 0.92 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 121422.22 ns | 112844.44 ns | 113800.00 ns | 0.99 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 128622.22 ns | 129325.00 ns | 131325.00 ns | 0.98 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5367.54 ns | 206.87 ns | 202.37 ns | 1.02 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1832.77 ns | 12.87 ns | 12.88 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 1324.51 ns | 11.70 ns | 11.68 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1969.36 ns | 13.07 ns | 13.05 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1365.76 ns | 15.80 ns | 15.81 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 2366.53 ns | 12.63 ns | 12.63 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 1803.20 ns | 14.13 ns | 14.13 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 2035.16 ns | 59.39 ns | 59.02 ns | 1.01 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 1375.64 ns | 13.79 ns | 13.78 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 677.06 ns | 6.17 ns | 6.18 ns | 1.00 |
| current_thread scheduler create worker + schedule | 654.66 ns | 14.15 ns | 14.20 ns | 1.00 |
| current_thread scheduler create worker + schedule + recursive schedule | 1092.78 ns | 101.86 ns | 102.15 ns | 1.00 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 1319.05 ns | 11.11 ns | 10.66 ns | 1.04 |
| immediate_just+scan(10, std::plus)+subscribe | 1415.93 ns | 21.29 ns | 21.29 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 3547.64 ns | 202.07 ns | 200.40 ns | 1.01 |
| immediate_just+buffer(2)+subscribe | 2586.55 ns | 58.15 ns | 57.92 ns | 1.00 |
| immediate_just+window(2)+subscribe + subscsribe inner | 4231.95 ns | 1302.54 ns | 1306.14 ns | 1.00 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 1309.46 ns | 11.44 ns | 11.60 ns | 0.99 |
| immediate_just+take_while(true)+subscribe | 1337.79 ns | 11.70 ns | 11.70 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 3472.97 ns | 7.40 ns | 7.40 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5153.95 ns | 221.39 ns | 222.32 ns | 1.00 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5490.91 ns | 215.64 ns | 214.85 ns | 1.00 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 212.32 ns | 194.78 ns | 1.09 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 5481.82 ns | 937.55 ns | 958.71 ns | 0.98 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 3542.16 ns | 554.89 ns | 531.58 ns | 1.04 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 37.95 ns | 20.09 ns | 19.74 ns | 1.02 |
| subscribe 100 observers to publish_subject | 268000.00 ns | 28840.54 ns | 28521.05 ns | 1.01 |
| 100 on_next to 100 observers to publish_subject | 52215.00 ns | 38723.33 ns | 39975.00 ns | 0.97 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1897.43 ns | 57.55 ns | 56.93 ns | 1.01 |
| basic sample with immediate scheduler | 1911.32 ns | 36.75 ns | 36.72 ns | 1.00 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1461.48 ns | 19.97 ns | 19.99 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 1937.25 ns | 343.85 ns | 331.13 ns | 1.04 |
| create(on_error())+retry(1)+subscribe | 1180.02 ns | 146.90 ns | 146.33 ns | 1.00 |
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- src/rpp/rpp/operators/retry_when.hpp (1 hunks)
- src/tests/rpp/test_retry_when.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (2)
- src/rpp/rpp/operators/retry_when.hpp
- src/tests/rpp/test_retry_when.cpp
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/rpp/rpp/operators/retry_when.hpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/rpp/rpp/operators/retry_when.hpp
There was a problem hiding this comment.
Actionable comments posted: 2
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- src/rpp/rpp/operators/retry_when.hpp (1 hunks)
- src/tests/rpp/test_retry_when.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/tests/rpp/test_retry_when.cpp
Additional comments not posted (4)
src/rpp/rpp/operators/retry_when.hpp (4)
1-20: Header and includes are correct.The file header and include directives are appropriate and complete.
82-119: Logic inretry_when_impl_strategyis correct.The handling of
on_next,on_error, andon_completedis well-implemented, and the use of try-catch for notifier exceptions is appropriate.
121-142: Logic indrainfunction is robust.The
drainfunction effectively manages subscription and resubscription, with appropriate exception handling.
170-189:retry_whenoperator and documentation are well-defined.The
retry_whenoperator is correctly implemented, and the accompanying documentation is clear and informative.
There was a problem hiding this comment.
Actionable comments posted: 1
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/rpp/rpp/operators/retry_when.hpp (1 hunks)
Additional comments not posted (5)
src/rpp/rpp/operators/retry_when.hpp (5)
48-77: Ensureon_nextis called once.Consider adding protection to ensure that
on_nextis called only once, possibly with atake(1)on the notifier returned observable.
145-167: Verify notifier behavior.Ensure that the notifier behaves as expected, especially in edge cases where it might throw or return an empty observable.
Run the following script to verify the notifier behavior:
Verification successful
Notifier behavior is adequately verified.
The
retry_whenoperator's notifier behavior is well-tested intest_retry_when.cpp, covering scenarios such as exceptions, empty observables, and resubscription logic. No further verification is needed.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify notifier behavior in the retry_when operator. # Test: Check for proper notifier behavior. rg --type cpp -A 5 $'retry_when'Length of output: 13266
82-119: Verify error handling and resource management.Ensure that the implementation correctly handles errors and manages resources efficiently to prevent leaks or unintended behavior.
Run the following script to verify error handling and resource management:
Verification successful
Error handling and resource management verified.
The
retry_whenoperator's implementation is thoroughly tested intest_retry_when.cpp, covering various scenarios of error handling and resource management. The tests ensure that the operator behaves correctly under different conditions, including proper disposal and resubscription logic.
- Test file:
src/tests/rpp/test_retry_when.cppScripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify error handling and resource management in the retry_when operator. # Test: Check for proper error handling and resource management. rg --type cpp -A 5 $'retry_when'Length of output: 13266
121-142: Verify handling of infinite retries.Ensure that the implementation correctly handles scenarios with infinite retries to prevent resource exhaustion.
Run the following script to verify the handling of infinite retries:
184-189: Verify constraints and requirements.Ensure that the constraints and requirements for the
retry_whenfunction are correctly defined and enforced.Run the following script to verify the constraints and requirements:
Verification successful
Verify
rpp::constraint::observableDefinition and EnforcementEnsure that the constraint
rpp::constraint::observableis correctly defined and enforced within the codebase to validate theretry_whenfunction's requirements.
Verify the Definition of
rpp::constraint::observableEnsure that the
rpp::constraint::observableis correctly defined to validate its use in theretry_whenfunction.
Constraints and Requirements Verified
The constraints and requirements for the
retry_whenfunction are correctly defined and enforced. Therpp::constraint::observableensures that the notifier returns a valid observable type, meeting the function's requirements.
- The
rpp::constraint::observableconcept is defined to check if a type is derived fromrpp::observable.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify constraints and requirements for the retry_when function. # Test: Check for proper constraints and requirements. rg --type cpp -A 5 $'retry_when'Length of output: 13266
Script:
#!/bin/bash # Description: Verify the definition and enforcement of `rpp::constraint::observable`. # Search for the definition of `rpp::constraint::observable`. rg --type cpp 'rpp::constraint::observable' # Search for any related constraints or requirements in the codebase. rg --type cpp 'constraint'Length of output: 98707
Script:
#!/bin/bash # Description: Search for the definition of `rpp::constraint::observable`. # Locate the definition of `rpp::constraint::observable` in the codebase. rg --type cpp 'namespace rpp::constraint' -A 10 | rg 'observable'Length of output: 3803
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- src/rpp/rpp/operators/retry_when.hpp (1 hunks)
- src/tests/rpp/test_retry_when.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (2)
- src/rpp/rpp/operators/retry_when.hpp
- src/tests/rpp/test_retry_when.cpp
…into retry_when
There was a problem hiding this comment.
Actionable comments posted: 2
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (3)
- src/examples/rpp/doxygen/retry_when.cpp (1 hunks)
- src/rpp/rpp/operators.hpp (1 hunks)
- src/rpp/rpp/operators/fwd.hpp (1 hunks)
Files skipped from review as they are similar to previous changes (2)
- src/rpp/rpp/operators.hpp
- src/rpp/rpp/operators/fwd.hpp
Additional comments not posted (1)
src/examples/rpp/doxygen/retry_when.cpp (1)
1-5: Ensure necessary headers are included.The file includes
<rpp/rpp.hpp>,<iostream>, and<string>. Ensure that these headers are sufficient for the functionality demonstrated and that no additional headers are needed.
| //! [retry_when delay] | ||
| size_t retry_count = 0; | ||
| rpp::source::create<std::string>([&retry_count](const auto& sub) { | ||
| if (retry_count != 4) | ||
| { | ||
| sub.on_error({}); | ||
| } | ||
| else | ||
| { | ||
| sub.on_next(std::string{"success"}); | ||
| sub.on_completed(); | ||
| } | ||
| }) | ||
| | rpp::operators::retry_when([](const std::exception_ptr&) { | ||
| return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{}); | ||
| }) | ||
| | rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; }); | ||
| // Source observable is resubscribed after 5 seconds on each error emission | ||
| //! [retry_when delay] |
There was a problem hiding this comment.
Clarify the retry logic and increment retry_count.
The example demonstrates retrying with a delay but does not increment retry_count, which could lead to infinite retries.
Apply this diff to increment retry_count:
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (retry_count != 4)
{
+ ++retry_count;
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| //! [retry_when delay] | |
| size_t retry_count = 0; | |
| rpp::source::create<std::string>([&retry_count](const auto& sub) { | |
| if (retry_count != 4) | |
| { | |
| sub.on_error({}); | |
| } | |
| else | |
| { | |
| sub.on_next(std::string{"success"}); | |
| sub.on_completed(); | |
| } | |
| }) | |
| | rpp::operators::retry_when([](const std::exception_ptr&) { | |
| return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{}); | |
| }) | |
| | rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; }); | |
| // Source observable is resubscribed after 5 seconds on each error emission | |
| //! [retry_when delay] | |
| //! [retry_when delay] | |
| size_t retry_count = 0; | |
| rpp::source::create<std::string>([&retry_count](const auto& sub) { | |
| if (retry_count != 4) | |
| { | |
| ++retry_count; | |
| sub.on_error({}); | |
| } | |
| else | |
| { | |
| sub.on_next(std::string{"success"}); | |
| sub.on_completed(); | |
| } | |
| }) | |
| | rpp::operators::retry_when([](const std::exception_ptr&) { | |
| return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{}); | |
| }) | |
| | rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; }); | |
| // Source observable is resubscribed after 5 seconds on each error emission | |
| //! [retry_when delay] |
There was a problem hiding this comment.
wow, looks like pretty valid point
There was a problem hiding this comment.
@victimsnino, I'm glad you found the point valid! If you need any further assistance or have more questions, feel free to ask.
(\(\
( -.-)
o_(")(")
Learnings added
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#616
File: src/examples/rpp/doxygen/retry_when.cpp:12-30
Timestamp: 2024-08-21T21:06:32.053Z
Learning: In the `retry_when` example in `retry_when.cpp`, incrementing `retry_count` is necessary to prevent infinite retries.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
| //! [retry_when] | ||
| retry_count = 0; | ||
| rpp::source::create<std::string>([&retry_count](const auto& sub) { | ||
| if (retry_count != 4) | ||
| { | ||
| sub.on_error({}); | ||
| } | ||
| else | ||
| { | ||
| sub.on_next(std::string{"success"}); | ||
| sub.on_completed(); | ||
| } | ||
| }) | ||
| | rpp::operators::retry_when([](const std::exception_ptr& ep) { | ||
| try | ||
| { | ||
| std::rethrow_exception(ep); | ||
| } | ||
| catch (const std::runtime_error&) | ||
| { | ||
| return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{}); | ||
| } | ||
| catch (...) | ||
| { | ||
| throw; | ||
| } | ||
| }) | ||
| | rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; }); | ||
| // Source observable is resubscribed after 5 seconds only on particular error emissions | ||
| //! [retry_when] |
There was a problem hiding this comment.
Clarify the error handling logic and increment retry_count.
The example demonstrates retrying based on specific error types but does not increment retry_count, which could lead to infinite retries.
Apply this diff to increment retry_count:
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (retry_count != 4)
{
+ ++retry_count;
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| //! [retry_when] | |
| retry_count = 0; | |
| rpp::source::create<std::string>([&retry_count](const auto& sub) { | |
| if (retry_count != 4) | |
| { | |
| sub.on_error({}); | |
| } | |
| else | |
| { | |
| sub.on_next(std::string{"success"}); | |
| sub.on_completed(); | |
| } | |
| }) | |
| | rpp::operators::retry_when([](const std::exception_ptr& ep) { | |
| try | |
| { | |
| std::rethrow_exception(ep); | |
| } | |
| catch (const std::runtime_error&) | |
| { | |
| return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{}); | |
| } | |
| catch (...) | |
| { | |
| throw; | |
| } | |
| }) | |
| | rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; }); | |
| // Source observable is resubscribed after 5 seconds only on particular error emissions | |
| //! [retry_when] | |
| //! [retry_when] | |
| retry_count = 0; | |
| rpp::source::create<std::string>([&retry_count](const auto& sub) { | |
| if (retry_count != 4) | |
| { | |
| ++retry_count; | |
| sub.on_error({}); | |
| } | |
| else | |
| { | |
| sub.on_next(std::string{"success"}); | |
| sub.on_completed(); | |
| } | |
| }) | |
| | rpp::operators::retry_when([](const std::exception_ptr& ep) { | |
| try | |
| { | |
| std::rethrow_exception(ep); | |
| } | |
| catch (const std::runtime_error&) | |
| { | |
| return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{}); | |
| } | |
| catch (...) | |
| { | |
| throw; | |
| } | |
| }) | |
| | rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; }); | |
| // Source observable is resubscribed after 5 seconds only on particular error emissions | |
| //! [retry_when] |
| rpp::source::create<std::string>([&retry_count](const auto& sub) { | ||
| if (retry_count != 4) | ||
| { | ||
| sub.on_error({}); |
There was a problem hiding this comment.
IIUC, the fundamental guarantees of observables are broken in this example:
After on_error, no other message must be emitted. This includes the emission of on_completed. See the observable contract for reference.
OnNext
conveys an item that is emitted by the Observable to the observer
OnCompleted
indicates that the Observable has completed successfully and that it will be emitting no further items
OnError
indicates that the Observable has terminated with a specified error condition and that it will be emitting no further items
I am having difficulty understanding how the semantics of retry_when fit into this contract. Operators usually act on the output of an observable. In this example, we have a restart functionality that backpropagates into the generator of a cold observable and acts on the observer itself. I am not happy with that design.
Please explain why we need retry_when.
There was a problem hiding this comment.
Actually not due to as required no any other messages emitted from this observable, but they would be emitted from new one (after resubscribe). Any observer obtained on_error wouldn't obtain any new messages.
Actually we have chain like this
observable->observer_inside_retry_when->final_observer
when observable emits error, then only observer_inside_retry_when actually obtains error. Instead of forwarding error to final_observer it does attempt to subscribe new observer_inside_retry_when (let's say observer_inside_retry_when_2) observer to new observable_2. so old chain is partially destructed
observable->observer_inside_retry_when->| final_observer
and it becomes like
observable_2->observer_inside_retry_when_2->final_observer
No any guarantees are broken in this case.
Why do we need it? For example, to implement error handling loic with custom delaying. Like this
rpp::source::create<int>([](const auto& observer)
{
// some hard job to construct state
while(true) {
observer.on_next(std::rand());
if (get_current_cpu_temp() > 95) { // emulating some issues
observer.on_error(.....);
break;
}
}
// some hard job to destruct state
})
| rpp::operators::retry_when([](const std::exception_ptr&) {
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::new_thread{}); // kind of "ok, we are obtained error, it is ok, let's wait 5 seconds to make our cpu cooler and try again"
})
| rpp::operators::subscribe(...);There was a problem hiding this comment.
no any other messages emitted from this observable, but they would be emitted from new one (after resubscribe)
That is precisely my problem with this operator. This only works and makes sense for cold observables. What happens if this operator is applied on a hot observable? It would subscribe to an observable that has been completed. Can we obtain a compile-time error to avoid this?
There was a problem hiding this comment.
In this case hot observable should be in “error state”. For example, subject caches last error and emits it on new subscriptions. In this case it would be infinite loop, but it is up to user to control it.
Not sure if it is possible to add compile time error. Anyway it can be easily suppressed just via converting observable to dynamic version (and losing all meta info)
There was a problem hiding this comment.
Given your arguments, I drop my request for a compile-time error message. Also, I accept your explanation:
The current design means that if the hot observable goes into an error state, you obtain zero emissions on consecutive subscriptions—unless you add a stateful relay like subject. This behavior follows the contract.
I recommend that the difference in behavior for hot and cold observables is explicitly mentioned and explained by example in the documentation of retry_when to avoid people running into a pitfall.
There was a problem hiding this comment.
Yeah, sure, thank you!
@CorentinBT , could you please handle this documentation request?
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/examples/rpp/doxygen/retry_when.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/examples/rpp/doxygen/retry_when.cpp
|
https://sonarcloud.io/code?id=victimsnino_ReactivePlusPlus&pullRequest=616&selected=victimsnino_ReactivePlusPlus%3Asrc%2Frpp%2Frpp%2Foperators%2Fretry_when.hpp&line=62 |
| template<rpp::constraint::decayed_type TNotifier> | ||
| struct retry_when_t | ||
| { | ||
| TNotifier notifier; |
There was a problem hiding this comment.
so its UB in msvc only if there are other members?
There was a problem hiding this comment.
If inherited from base class with vtable. I think , it is partially overrides vtable in this case
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## v2 #616 +/- ##
==========================================
- Coverage 94.88% 94.87% -0.01%
==========================================
Files 97 98 +1
Lines 1857 1894 +37
==========================================
+ Hits 1762 1797 +35
- Misses 95 97 +2 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Actionable comments posted: 2
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- src/rpp/rpp/operators/retry_when.hpp (1 hunks)
- src/tests/rpp/test_retry_when.cpp (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/tests/rpp/test_retry_when.cpp
Additional comments not posted (4)
src/rpp/rpp/operators/retry_when.hpp (4)
37-39: Address potential UB withRPP_NO_UNIQUE_ADDRESS.The first member with
RPP_NO_UNIQUE_ADDRESSfor a class with a base class with a vtable could cause undefined behavior on MSVC. Consider placing theboolas the first member or removingRPP_NO_UNIQUE_ADDRESSfor the first member.- RPP_NO_UNIQUE_ADDRESS TObserver observer; + TObserver observer;
196-201: Verify the requirements for theretry_whenfunction.Ensure that the notifier passed to the
retry_whenfunction meets the requirement of returning an observable. The use ofstd::forwardis appropriate for perfect forwarding.Verification successful
Verified: The
retry_whenfunction's notifier requirement is satisfied.The test cases and examples confirm that the notifiers used return observable types, meeting the function's requirements.
- The notifiers in
test_retry_when.cppreturn observable types likerpp::source::just,rpp::source::empty, andrpp::source::error.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Verify that the notifier passed to the `retry_when` function returns an observable. rg --type cpp -A 5 'retry_when'Length of output: 17610
48-80: Verify the effectiveness oflocally_disposedinretry_when_impl_inner_strategy.The use of
locally_disposedhelps prevent multiple calls toon_next. Ensure that this mechanism is effective in all scenarios, especially with complex observables.
147-170: Verify the use ofRPP_NO_UNIQUE_ADDRESSand subscription logic inretry_when_t.Ensure that the use of
RPP_NO_UNIQUE_ADDRESSfor thenotifiermember does not cause issues. Verify that the subscription logic correctly sets up the state and initiates the drain process.Verification successful
Verification successful for
RPP_NO_UNIQUE_ADDRESSand subscription logic inretry_when_t.The use of
RPP_NO_UNIQUE_ADDRESSin theretry_when_tclass is consistent with its usage across the codebase, indicating a deliberate design choice. The subscription logic correctly sets up the state and initiates the drain process, following a standard pattern.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Verify the use of `RPP_NO_UNIQUE_ADDRESS` and the subscription logic in `retry_when_t`. rg --type cpp -A 5 'RPP_NO_UNIQUE_ADDRESS'Length of output: 35078
|

Summary by CodeRabbit
New Features
retry_whenoperator for improved error handling and retry logic in reactive programming.Tests
retry_whenoperator to validate its functionality and ensure reliability in various scenarios.Documentation
retry_whenoperator for conditional retries based on error types, serving as a practical guide for developers.