repeat_when operator#715
Conversation
📝 Walkthrough""" WalkthroughA generic repeating strategy for observable subscriptions has been introduced, enabling the implementation of the Changes
Sequence Diagram(s)sequenceDiagram
participant Observer
participant repeat_when
participant SourceObservable
participant NotifierObservable
Observer->>repeat_when: subscribe()
repeat_when->>SourceObservable: subscribe()
SourceObservable-->>repeat_when: on_next/on_error/on_completed
alt on_completed
repeat_when->>NotifierObservable: subscribe()
NotifierObservable-->>repeat_when: on_next
repeat_when->>SourceObservable: resubscribe()
end
repeat_when-->>Observer: on_next/on_error/on_completed
Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (3)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (4)
🔇 Additional comments (1)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/rpp/rpp/operators/details/repeating_strategy.hpp (1)
106-113: Consider relaxing memory ordering for performance.The code uses
std::memory_order::seq_cstfor all atomic operations, which provides the strongest guarantees but may impact performance. Consider usingmemory_order::acquirefor the load in line 112 andmemory_order::releasefor the store in line 106, as full sequential consistency might not be required here.- state->is_inside_drain.store(true, std::memory_order::seq_cst); + state->is_inside_drain.store(true, std::memory_order::release); try { using value_type = rpp::utils::extract_observer_type_t<TObserver>; state->observable.subscribe(rpp::observer<value_type, TStrategy>{state}); - if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + if (state->is_inside_drain.exchange(false, std::memory_order::acq_rel)) return;src/rpp/rpp/operators/repeat_when.hpp (1)
70-71: Split long line for better readability.Line 70 is excessively long, making the code harder to read and maintain. Consider splitting it into multiple lines.
- const auto d = disposable_wrapper_impl<repeating_state<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observable), notifier); + using state_type = repeating_state<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>; + const auto d = disposable_wrapper_impl<state_type>::make( + std::forward<TObserver>(observer), + std::forward<TObservable>(observable), + notifier);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
.clangd(1 hunks)src/rpp/rpp/operators/details/repeating_strategy.hpp(1 hunks)src/rpp/rpp/operators/repeat_when.hpp(1 hunks)src/rpp/rpp/operators/retry_when.hpp(3 hunks)src/tests/rpp/test_repeat_when.cpp(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
src/rpp/rpp/operators/retry_when.hpp (3)
src/rpp/rpp/operators/details/repeating_strategy.hpp (10)
state(98-98)err(70-74)err(70-70)d(82-82)d(82-82)d(96-96)d(96-96)drain(45-45)drain(102-121)drain(102-102)src/rpp/rpp/operators/repeat_when.hpp (3)
state(40-50)err(35-38)err(35-35)src/rpp/rpp/operators/retry.hpp (9)
state(70-73)state(80-80)err(54-68)err(54-54)d(75-78)d(75-75)drain(39-39)drain(84-105)drain(84-84)
src/rpp/rpp/operators/details/repeating_strategy.hpp (2)
src/rpp/rpp/operators/repeat_when.hpp (3)
state(40-50)err(35-38)err(35-35)src/rpp/rpp/operators/retry_when.hpp (3)
state(47-50)err(35-45)err(35-35)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Cache deps on ci-windows for Debug
- GitHub Check: Cache deps on ci-windows for Release
- GitHub Check: Test RPP as package - ubuntu-22.04
🔇 Additional comments (6)
.clangd (1)
1-2: LGTM!Standard clangd configuration pointing to the compilation database in the build directory.
src/tests/rpp/test_repeat_when.cpp (1)
1-174: Excellent test coverage!The test suite comprehensively covers all aspects of the
repeat_whenoperator including edge cases, performance considerations, and resource management. The tests are well-structured and use appropriate testing patterns.src/rpp/rpp/operators/retry_when.hpp (1)
16-76: Excellent refactoring to use the shared repeating strategy!The refactoring successfully eliminates code duplication by leveraging the generic
repeating_observer_strategy,repeating_inner_observer_strategy, andrepeating_state. This improves maintainability and ensures consistent behavior across retry and repeat operators.src/rpp/rpp/operators/details/repeating_strategy.hpp (2)
25-42: Well-designed state management structure.The
repeating_statestruct correctly inherits fromcomposite_disposablefor disposal management and uses an atomic flag for thread-safe reentrancy control. The use ofRPP_NO_UNIQUE_ADDRESSfor space optimization is a good practice.
59-68:Details
❌ Incorrect review comment
Potential race condition in drain triggering logic.
The pattern on lines 64-67 has a subtle race condition. Between the
exchangereturning false and callingdrain, another thread could setis_inside_drainto true, potentially leading to missed drain cycles. Consider using a more robust synchronization pattern or documenting this edge case if it's acceptable behavior.
🏁 Script executed:
#!/bin/bash # Search for similar atomic exchange patterns in the codebase to see if this is a known pattern rg -A 5 -B 5 "is_inside_drain.*exchange.*false" --type cppLength of output: 4764
🏁 Script executed:
#!/bin/bash set -e echo "===== Head of repeating_strategy.hpp =====" sed -n '1,200p' src/rpp/rpp/operators/details/repeating_strategy.hpp || true echo -e "\n===== All occurrences of is_inside_drain in repeating_strategy.hpp =====" rg -n "is_inside_drain" src/rpp/rpp/operators/details/repeating_strategy.hpp || trueLength of output: 4847
No race in drain synchronization guard
The atomic
is_inside_drain.exchange(false, std::memory_order::seq_cst)pattern inrepeating_strategy.hppis identical to the logic inconcat.hppandretry.hpp. This ensures that if a drain is already in progress, any concurrenton_nextwill seeexchange(false)returntrueand bail out, preventing overlapping drains. Given thatsubscribeis invoked synchronously on the current thread, there’s no window for a missed drain cycle in practice.
I'll resolve this comment.Likely an incorrect or invalid review comment.
src/rpp/rpp/operators/repeat_when.hpp (1)
20-51: Correct implementation of repeat_when semantics.The strategy correctly forwards values and errors to the observer, and properly triggers resubscription by subscribing to the notifier on completion. The exception handling ensures errors during notifier subscription are properly propagated.
BENCHMARK RESULTS (AUTOGENERATED)
|
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 306.34 ns | 1.88 ns | 1.87 ns | 1.01 | 1.87 ns |
| Subscribe empty callbacks to empty observable via pipe operator | 307.39 ns | 1.86 ns | 1.87 ns | 1.00 | 1.86 ns |
Sources
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 697.71 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| from array of 1 - create + subscribe + current_thread | 1055.76 ns | 3.73 ns | 3.73 ns | 1.00 | 3.73 ns |
| concat_as_source of just(1 immediate) create + subscribe | 2322.56 ns | 114.96 ns | 117.42 ns | 0.98 | 115.81 ns |
| defer from array of 1 - defer + create + subscribe + immediate | 743.68 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| interval - interval + take(3) + subscribe + immediate | 2139.61 ns | 59.62 ns | 59.60 ns | 1.00 | 59.59 ns |
| interval - interval + take(3) + subscribe + current_thread | 3041.49 ns | 32.70 ns | 32.68 ns | 1.00 | 34.49 ns |
| from array of 1 - create + as_blocking + subscribe + new_thread | 30772.67 ns | 31484.03 ns | 30058.05 ns | 1.05 | 31883.41 ns |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 48384.36 ns | 54432.58 ns | 53818.15 ns | 1.01 | 53306.30 ns |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3591.75 ns | 135.15 ns | 135.31 ns | 1.00 | 150.38 ns |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1105.94 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just+filter(true)+subscribe | 846.06 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just(1,2)+skip(1)+subscribe | 1036.15 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 879.20 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just(1,2)+first()+subscribe | 1280.75 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just(1,2)+last()+subscribe | 989.30 ns | 0.31 ns | 0.31 ns | 0.99 | 0.31 ns |
| immediate_just+take_last(1)+subscribe | 1125.43 ns | 18.64 ns | 18.65 ns | 1.00 | 19.58 ns |
| immediate_just(1,2,3)+element_at(1)+subscribe | 839.75 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate scheduler create worker + schedule | 267.66 ns | 1.55 ns | 0.47 ns | 3.33 | 1.55 ns |
| current_thread scheduler create worker + schedule | 386.29 ns | 4.41 ns | 4.35 ns | 1.01 | 4.67 ns |
| current_thread scheduler create worker + schedule + recursive schedule | 842.91 ns | 61.10 ns | 61.23 ns | 1.00 | 61.39 ns |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 864.08 ns | 0.33 ns | 0.31 ns | 1.04 | 0.31 ns |
| immediate_just+scan(10, std::plus)+subscribe | 901.55 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2351.41 ns | 123.63 ns | 156.62 ns | 0.79 | 174.04 ns |
| immediate_just+buffer(2)+subscribe | 1537.42 ns | 13.68 ns | 13.99 ns | 0.98 | 17.41 ns |
| immediate_just+window(2)+subscribe + subscsribe inner | 2456.44 ns | 1259.89 ns | 1279.34 ns | 0.98 | 1364.62 ns |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 834.60 ns | - | - | 0.00 | - |
| immediate_just+take_while(true)+subscribe | 856.32 ns | 0.35 ns | 0.31 ns | 1.13 | 0.31 ns |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 1968.89 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3434.82 ns | 157.92 ns | 196.28 ns | 0.80 | 190.28 ns |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3699.67 ns | 157.41 ns | 170.49 ns | 0.92 | 163.07 ns |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 143.03 ns | 131.54 ns | 1.09 | 148.56 ns |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3540.64 ns | 374.08 ns | 421.51 ns | 0.89 | 446.96 ns |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2282.93 ns | 218.70 ns | 217.55 ns | 1.01 | 210.15 ns |
| immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 3137.99 ns | 241.65 ns | 257.26 ns | 0.94 | 267.07 ns |
Subjects
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 34.80 ns | 14.79 ns | 14.79 ns | 1.00 | 14.96 ns |
| subscribe 100 observers to publish_subject | 202714.20 ns | 17885.09 ns | 17571.09 ns | 1.02 | 17727.64 ns |
| 100 on_next to 100 observers to publish_subject | 27114.79 ns | 16915.45 ns | 16876.93 ns | 1.00 | 16900.93 ns |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| basic sample | 1477.26 ns | 13.06 ns | 13.05 ns | 1.00 | 23.02 ns |
| basic sample with immediate scheduler | 1423.22 ns | 5.28 ns | 5.28 ns | 1.00 | 17.07 ns |
| mix operators with disposables and without disposables | 6376.08 ns | 1426.17 ns | 1441.52 ns | 0.99 | 1845.47 ns |
| single disposable and looooooong indentity chain | 24873.85 ns | 1035.81 ns | 1065.28 ns | 0.97 | 5263.59 ns |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 927.53 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2120.30 ns | 999.88 ns | 996.44 ns | 1.00 | 1000.00 ns |
| create(on_error())+retry(1)+subscribe | 619.51 ns | 109.13 ns | 158.54 ns | 0.69 | 118.18 ns |
ci-macos
General
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 392.13 ns | 0.47 ns | 1.02 ns | 0.46 | 0.47 ns |
| Subscribe empty callbacks to empty observable via pipe operator | 359.01 ns | 0.47 ns | 1.03 ns | 0.45 | 0.47 ns |
Sources
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 737.88 ns | 0.31 ns | 0.34 ns | 0.92 | 0.31 ns |
| from array of 1 - create + subscribe + current_thread | 904.84 ns | 4.06 ns | 4.51 ns | 0.90 | 4.06 ns |
| concat_as_source of just(1 immediate) create + subscribe | 2064.19 ns | 159.85 ns | 175.05 ns | 0.91 | 160.66 ns |
| defer from array of 1 - defer + create + subscribe + immediate | 744.44 ns | 0.31 ns | 0.34 ns | 0.93 | 0.31 ns |
| interval - interval + take(3) + subscribe + immediate | 1925.69 ns | 49.59 ns | 54.81 ns | 0.90 | 49.67 ns |
| interval - interval + take(3) + subscribe + current_thread | 2353.06 ns | 29.34 ns | 31.68 ns | 0.93 | 29.75 ns |
| from array of 1 - create + as_blocking + subscribe + new_thread | 23605.57 ns | 21798.78 ns | 38938.96 ns | 0.56 | 21758.87 ns |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 29060.03 ns | 25577.97 ns | 23779.41 ns | 1.08 | 25004.17 ns |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 2978.30 ns | 175.89 ns | 199.02 ns | 0.88 | 178.85 ns |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1074.57 ns | 0.31 ns | 0.48 ns | 0.64 | 0.31 ns |
| immediate_just+filter(true)+subscribe | 804.18 ns | 0.31 ns | 0.55 ns | 0.57 | 0.31 ns |
| immediate_just(1,2)+skip(1)+subscribe | 1033.83 ns | 0.31 ns | 0.38 ns | 0.82 | 0.31 ns |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 804.46 ns | 0.31 ns | 0.37 ns | 0.84 | 0.31 ns |
| immediate_just(1,2)+first()+subscribe | 1307.07 ns | 0.31 ns | 0.73 ns | 0.43 | 0.31 ns |
| immediate_just(1,2)+last()+subscribe | 946.99 ns | 0.53 ns | 1.23 ns | 0.43 | 0.53 ns |
| immediate_just+take_last(1)+subscribe | 1151.00 ns | 0.31 ns | 0.56 ns | 0.55 | 0.31 ns |
| immediate_just(1,2,3)+element_at(1)+subscribe | 798.12 ns | 0.31 ns | 0.35 ns | 0.89 | 0.31 ns |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate scheduler create worker + schedule | 314.17 ns | 0.47 ns | 1.02 ns | 0.46 | 0.47 ns |
| current_thread scheduler create worker + schedule | 418.64 ns | 4.09 ns | 4.53 ns | 0.90 | 4.06 ns |
| current_thread scheduler create worker + schedule + recursive schedule | 687.86 ns | 62.74 ns | 70.16 ns | 0.89 | 62.06 ns |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 795.98 ns | 2.45 ns | 2.84 ns | 0.86 | 2.40 ns |
| immediate_just+scan(10, std::plus)+subscribe | 988.54 ns | 0.31 ns | 0.35 ns | 0.91 | 0.31 ns |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2126.93 ns | 205.10 ns | 234.19 ns | 0.88 | 182.82 ns |
| immediate_just+buffer(2)+subscribe | 961.81 ns | 15.87 ns | 23.05 ns | 0.69 | 15.54 ns |
| immediate_just+window(2)+subscribe + subscsribe inner | 1894.87 ns | 967.72 ns | 1374.70 ns | 0.70 | 971.76 ns |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 786.03 ns | - | - | 0.00 | - |
| immediate_just+take_while(true)+subscribe | 812.12 ns | 0.31 ns | 0.40 ns | 0.78 | 0.31 ns |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 1818.57 ns | 2.08 ns | 3.92 ns | 0.53 | 1.89 ns |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 2787.34 ns | 201.12 ns | 225.60 ns | 0.89 | 201.81 ns |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3034.56 ns | 199.00 ns | 225.15 ns | 0.88 | 200.64 ns |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 202.99 ns | 230.01 ns | 0.88 | 202.94 ns |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 2907.26 ns | 496.56 ns | 565.02 ns | 0.88 | 500.83 ns |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 1973.26 ns | 323.51 ns | 440.77 ns | 0.73 | 310.84 ns |
| immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 2819.52 ns | 319.94 ns | 361.47 ns | 0.89 | 317.80 ns |
Subjects
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 41.40 ns | 21.46 ns | 34.99 ns | 0.61 | 21.77 ns |
| subscribe 100 observers to publish_subject | 127180.56 ns | 16449.52 ns | 20139.95 ns | 0.82 | 16468.25 ns |
| 100 on_next to 100 observers to publish_subject | 31660.89 ns | 13690.64 ns | 25853.41 ns | 0.53 | 13713.33 ns |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| basic sample | 1163.84 ns | 11.25 ns | 19.00 ns | 0.59 | 23.72 ns |
| basic sample with immediate scheduler | 1162.42 ns | 5.02 ns | 5.75 ns | 0.87 | 12.79 ns |
| mix operators with disposables and without disposables | 5224.24 ns | 1322.14 ns | 1778.07 ns | 0.74 | 1584.25 ns |
| single disposable and looooooong indentity chain | 14929.96 ns | 1542.91 ns | 1694.56 ns | 0.91 | 3461.85 ns |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 938.95 ns | 0.31 ns | 0.40 ns | 0.78 | 0.31 ns |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 3321.48 ns | 2564.02 ns | 4826.39 ns | 0.53 | 2589.76 ns |
| create(on_error())+retry(1)+subscribe | 665.25 ns | 167.76 ns | 184.75 ns | 0.91 | 168.24 ns |
ci-ubuntu-clang
General
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 271.64 ns | 1.56 ns | 0.64 ns | 2.43 | 1.56 ns |
| Subscribe empty callbacks to empty observable via pipe operator | 272.28 ns | 1.55 ns | 0.64 ns | 2.43 | 1.55 ns |
Sources
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 571.27 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| from array of 1 - create + subscribe + current_thread | 797.01 ns | 4.04 ns | 4.04 ns | 1.00 | 4.04 ns |
| concat_as_source of just(1 immediate) create + subscribe | 2360.34 ns | 131.37 ns | 130.21 ns | 1.01 | 131.45 ns |
| defer from array of 1 - defer + create + subscribe + immediate | 778.33 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| interval - interval + take(3) + subscribe + immediate | 2227.31 ns | 58.68 ns | 58.66 ns | 1.00 | 58.67 ns |
| interval - interval + take(3) + subscribe + current_thread | 3160.94 ns | 31.06 ns | 31.14 ns | 1.00 | 31.73 ns |
| from array of 1 - create + as_blocking + subscribe + new_thread | 29769.51 ns | 30015.86 ns | 30078.81 ns | 1.00 | 31750.86 ns |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 40751.54 ns | 37800.23 ns | 38709.83 ns | 0.98 | 37006.52 ns |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3693.90 ns | 149.67 ns | 147.58 ns | 1.01 | 148.82 ns |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1147.42 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just+filter(true)+subscribe | 848.70 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just(1,2)+skip(1)+subscribe | 1093.54 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 882.56 ns | 0.33 ns | 0.32 ns | 1.03 | 0.33 ns |
| immediate_just(1,2)+first()+subscribe | 1385.66 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just(1,2)+last()+subscribe | 1015.42 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just+take_last(1)+subscribe | 1197.09 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just(1,2,3)+element_at(1)+subscribe | 868.04 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate scheduler create worker + schedule | 280.18 ns | 0.64 ns | 1.55 ns | 0.41 | 0.64 ns |
| current_thread scheduler create worker + schedule | 390.20 ns | 4.35 ns | 4.05 ns | 1.08 | 4.35 ns |
| current_thread scheduler create worker + schedule + recursive schedule | 850.60 ns | 55.29 ns | 55.86 ns | 0.99 | 55.61 ns |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 857.46 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
| immediate_just+scan(10, std::plus)+subscribe | 973.13 ns | 0.62 ns | 0.62 ns | 1.00 | 0.33 ns |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2291.36 ns | 139.66 ns | 144.09 ns | 0.97 | 134.70 ns |
| immediate_just+buffer(2)+subscribe | 1558.46 ns | 13.97 ns | 14.30 ns | 0.98 | 14.61 ns |
| immediate_just+window(2)+subscribe + subscsribe inner | 2514.38 ns | 901.80 ns | 913.34 ns | 0.99 | 926.89 ns |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 849.22 ns | - | - | 0.00 | - |
| immediate_just+take_while(true)+subscribe | 861.67 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 2090.09 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3293.94 ns | 161.71 ns | 159.92 ns | 1.01 | 153.70 ns |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3744.61 ns | 139.85 ns | 139.68 ns | 1.00 | 138.51 ns |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 143.39 ns | 143.18 ns | 1.00 | 138.37 ns |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3392.45 ns | 380.71 ns | 380.39 ns | 1.00 | 381.67 ns |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2251.11 ns | 199.93 ns | 200.80 ns | 1.00 | 200.19 ns |
| immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 3261.17 ns | 224.05 ns | 225.63 ns | 0.99 | 225.41 ns |
Subjects
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 55.13 ns | 18.41 ns | 19.80 ns | 0.93 | 22.77 ns |
| subscribe 100 observers to publish_subject | 215392.80 ns | 18184.52 ns | 18428.41 ns | 0.99 | 17607.33 ns |
| 100 on_next to 100 observers to publish_subject | 51667.96 ns | 20279.08 ns | 20219.98 ns | 1.00 | 20280.69 ns |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| basic sample | 1307.54 ns | 11.49 ns | 11.50 ns | 1.00 | 21.43 ns |
| basic sample with immediate scheduler | 1315.09 ns | 5.90 ns | 5.90 ns | 1.00 | 6.52 ns |
| mix operators with disposables and without disposables | 6700.55 ns | 1165.87 ns | 1166.92 ns | 1.00 | 1480.68 ns |
| single disposable and looooooong indentity chain | 27502.44 ns | 1252.36 ns | 1249.12 ns | 1.00 | 4670.42 ns |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1005.44 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2161.60 ns | 1161.51 ns | 1180.27 ns | 0.98 | 1183.64 ns |
| create(on_error())+retry(1)+subscribe | 687.87 ns | 138.57 ns | 139.40 ns | 0.99 | 139.15 ns |
ci-windows
General
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 578.32 ns | 2.16 ns | 2.16 ns | 1.00 | 2.16 ns |
| Subscribe empty callbacks to empty observable via pipe operator | 586.31 ns | 2.16 ns | 2.33 ns | 0.93 | 2.16 ns |
Sources
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1232.90 ns | 4.93 ns | 4.94 ns | 1.00 | 5.55 ns |
| from array of 1 - create + subscribe + current_thread | 1599.73 ns | 15.74 ns | 15.74 ns | 1.00 | 15.47 ns |
| concat_as_source of just(1 immediate) create + subscribe | 3750.92 ns | 175.09 ns | 175.57 ns | 1.00 | 177.58 ns |
| defer from array of 1 - defer + create + subscribe + immediate | 1189.51 ns | 5.24 ns | 5.24 ns | 1.00 | 5.24 ns |
| interval - interval + take(3) + subscribe + immediate | 3357.60 ns | 140.95 ns | 139.77 ns | 1.01 | 141.50 ns |
| interval - interval + take(3) + subscribe + current_thread | 3411.29 ns | 60.49 ns | 60.39 ns | 1.00 | 61.66 ns |
| from array of 1 - create + as_blocking + subscribe + new_thread | 125275.00 ns | 119055.56 ns | 114090.00 ns | 1.04 | 126475.00 ns |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 131644.44 ns | 135750.00 ns | 131475.00 ns | 1.03 | 143675.00 ns |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5346.88 ns | 202.11 ns | 199.98 ns | 1.01 | 222.01 ns |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1803.69 ns | 19.73 ns | 19.42 ns | 1.02 | 20.36 ns |
| immediate_just+filter(true)+subscribe | 1617.54 ns | 18.81 ns | 18.50 ns | 1.02 | 21.31 ns |
| immediate_just(1,2)+skip(1)+subscribe | 1722.75 ns | 18.51 ns | 17.89 ns | 1.03 | 20.38 ns |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1364.29 ns | 23.44 ns | 20.90 ns | 1.12 | 25.55 ns |
| immediate_just(1,2)+first()+subscribe | 2373.24 ns | 17.28 ns | 18.20 ns | 0.95 | 18.51 ns |
| immediate_just(1,2)+last()+subscribe | 1464.24 ns | 18.52 ns | 19.13 ns | 0.97 | 22.23 ns |
| immediate_just+take_last(1)+subscribe | 1998.66 ns | 64.95 ns | 66.33 ns | 0.98 | 69.79 ns |
| immediate_just(1,2,3)+element_at(1)+subscribe | 1645.80 ns | 21.90 ns | 21.05 ns | 1.04 | 22.84 ns |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate scheduler create worker + schedule | 474.10 ns | 4.32 ns | 4.32 ns | 1.00 | 4.32 ns |
| current_thread scheduler create worker + schedule | 645.47 ns | 11.11 ns | 11.11 ns | 1.00 | 11.17 ns |
| current_thread scheduler create worker + schedule + recursive schedule | 1342.48 ns | 99.32 ns | 98.96 ns | 1.00 | 99.87 ns |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 1303.40 ns | 18.82 ns | 18.81 ns | 1.00 | 21.30 ns |
| immediate_just+scan(10, std::plus)+subscribe | 1429.87 ns | 21.27 ns | 20.96 ns | 1.01 | 22.83 ns |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 3826.87 ns | 183.85 ns | 184.91 ns | 0.99 | 208.77 ns |
| immediate_just+buffer(2)+subscribe | 2274.36 ns | 64.47 ns | 66.59 ns | 0.97 | 73.07 ns |
| immediate_just+window(2)+subscribe + subscsribe inner | 3938.97 ns | 1210.26 ns | 1214.69 ns | 1.00 | 1231.28 ns |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 1303.06 ns | 17.57 ns | 17.57 ns | 1.00 | 19.11 ns |
| immediate_just+take_while(true)+subscribe | 1311.24 ns | 18.81 ns | 18.50 ns | 1.02 | 20.98 ns |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 3222.39 ns | 11.10 ns | 11.10 ns | 1.00 | 11.11 ns |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5170.31 ns | 202.74 ns | 198.24 ns | 1.02 | 222.96 ns |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5704.46 ns | 183.15 ns | 178.24 ns | 1.03 | 203.52 ns |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 195.36 ns | 194.05 ns | 1.01 | 196.35 ns |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 6076.14 ns | 444.55 ns | 439.05 ns | 1.01 | 456.89 ns |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 3852.33 ns | 513.33 ns | 560.04 ns | 0.92 | 514.10 ns |
| immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 4871.98 ns | 315.47 ns | 306.46 ns | 1.03 | 326.86 ns |
Subjects
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 36.80 ns | 29.85 ns | 29.29 ns | 1.02 | 30.73 ns |
| subscribe 100 observers to publish_subject | 260300.00 ns | 25580.49 ns | 24497.83 ns | 1.04 | 25726.09 ns |
| 100 on_next to 100 observers to publish_subject | 51955.00 ns | 35925.00 ns | 35900.00 ns | 1.00 | 33022.58 ns |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| basic sample | 1859.22 ns | 95.56 ns | 96.67 ns | 0.99 | 112.43 ns |
| basic sample with immediate scheduler | 1858.19 ns | 68.15 ns | 68.58 ns | 0.99 | 83.82 ns |
| mix operators with disposables and without disposables | 9211.32 ns | 1800.74 ns | 1793.26 ns | 1.00 | 2560.00 ns |
| single disposable and looooooong indentity chain | 25635.56 ns | 1634.29 ns | 1714.40 ns | 0.95 | 6251.91 ns |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1445.70 ns | 19.42 ns | 19.11 ns | 1.02 | 21.92 ns |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
|---|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 1902.89 ns | 370.76 ns | 367.99 ns | 1.01 | 380.75 ns |
| create(on_error())+retry(1)+subscribe | 1596.50 ns | 138.43 ns | 137.60 ns | 1.01 | 134.77 ns |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## v2 #715 +/- ##
========================================
Coverage 98.65% 98.66%
========================================
Files 156 159 +3
Lines 9804 9926 +122
========================================
+ Hits 9672 9793 +121
- Misses 132 133 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|



Summary by CodeRabbit
New Features
Refactor
Tests