Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Implementation Status.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
## Subjects

- [x] publish_subject
- [x] behavior_subject
- [ ] serialized_subject
- [ ] replay_subject
- [ ] publish_subject
- [ ] async_subject
3 changes: 2 additions & 1 deletion src/rpp/rpp/subjects.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
* \ingroup rpp
*/

#include <rpp/subjects/publish_subject.hpp>
#include <rpp/subjects/publish_subject.hpp>
#include <rpp/subjects/behavior_subject.hpp>
139 changes: 139 additions & 0 deletions src/rpp/rpp/subjects/behavior_subject.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#pragma once

#include <rpp/subjects/fwd.hpp>
#include <rpp/utils/constraints.hpp>
#include <rpp/subscribers/dynamic_subscriber.hpp>
#include <rpp/subjects/details/subject_state.hpp>
#include <rpp/subjects/details/base_subject.hpp>

namespace rpp::subjects::details
{
template<rpp::constraint::decayed_type T>
class behavior_strategy
{
public:
template<rpp::constraint::decayed_same_as<T> TT, rpp::constraint::decayed_same_as<composite_subscription> TSub>
behavior_strategy(TT&& v, TSub&& sub)
: m_state{std::make_shared<behavior_state>(std::forward<TT>(v))}
, m_sub{std::forward<TSub>(sub)}
{
m_sub.add([state = std::weak_ptr{m_state}]
{
if (const auto locked = state.lock())
locked->on_unsubscribe();
});
}

void on_subscribe(const dynamic_subscriber<T>& sub) const
{
if (m_sub.is_subscribed())
sub.on_next(m_state->get_value());

m_state->on_subscribe(sub);
}

auto get_subscriber() const
{
return rpp::make_specific_subscriber<T>(m_sub,
[state = m_state](const T& v)
{
state->set_value(v);
state->on_next(v);
},
[state = m_state](const std::exception_ptr& err)
{
state->on_error(err);
},
[state = m_state]()
{
state->on_completed();
});
}

T get_value() const
{
return m_state->get_value();
}

private:
class behavior_state : public subject_state<T>
{
public:
behavior_state(const T& v)
: subject_state<T>{}
, value{v} {}

behavior_state(T&& v)
: subject_state<T>{}
, value{std::move(v)} {}

T get_value()
{
std::lock_guard lock{mutex};
return value;
}


void set_value(const T& v)
{
std::lock_guard lock{mutex};
value = v;
}

private:

std::mutex mutex;
T value;
};

std::shared_ptr<behavior_state> m_state;
composite_subscription m_sub{};
};
} // namespace rpp::subjects::details

namespace rpp::subjects
{
/**
* \brief Subject which multicasts values to observers subscribed on it and sends last emitted value (or initial value) on subscribe. It contains two parts: subscriber and observable at the same time.
*
* \details Each subscriber obtains only last/initial value + values which emitted after corresponding subscribe. on_error/on_completer/unsubscribe cached and provided to new subscribers if any
*
* \warning this subject is not synchronized/serialized! It means, that expected to call callbacks of subscriber in the serialized way to follow observable contract: "Observables must issue notifications to observers serially (not in parallel).". If you are not sure or need extra serialization, please, use serialized_subject.
*
* \tparam T value provided by this subject
*
* \ingroup subjects
* \see https://reactivex.io/documentation/subject.html
*/
template<rpp::constraint::decayed_type T>
class behavior_subject final : public details::base_subject<T, details::behavior_strategy<T>>
{
public:
behavior_subject(const T& initial_value, const composite_subscription& sub)
: details::base_subject<T, details::behavior_strategy<T>>{initial_value, sub} {}

behavior_subject(T&& initial_value, const composite_subscription& sub)
: details::base_subject<T, details::behavior_strategy<T>>{std::move(initial_value), sub} {}

behavior_subject(const T& initial_value, composite_subscription&& sub = composite_subscription{})
: details::base_subject<T, details::behavior_strategy<T>>{initial_value, std::move(sub)} {}

behavior_subject(T&& initial_value, composite_subscription&& sub = composite_subscription{})
: details::base_subject<T, details::behavior_strategy<T>>{std::move(initial_value), std::move(sub)} {}


T get_value() const
{
return details::base_subject<T, details::behavior_strategy<T>>::get_strategy().get_value();
}
};
} // namespace rpp::subjects
11 changes: 7 additions & 4 deletions src/rpp/rpp/subjects/details/base_subject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ template<rpp::constraint::decayed_type T, subject_strategy<T> Strategy>
class base_subject : public subject_tag
{
public:
base_subject(const composite_subscription& sub = composite_subscription{})
: m_strategy{sub} {}

auto get_subscriber() const
{
return m_strategy.get_subscriber();
Expand All @@ -38,7 +35,13 @@ class base_subject : public subject_tag
});
}

protected:
base_subject(auto&& ...args)
: m_strategy{std::forward<decltype(args)>(args)...} {}

const Strategy& get_strategy() const { return m_strategy; }

private:
Strategy m_strategy{};
};
} // namespace rpp::subjects::details
} // namespace rpp::subjects::details
5 changes: 4 additions & 1 deletion src/rpp/rpp/subjects/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace rpp::subjects::details
struct subject_tag;

template<typename Strategy, typename T>
concept subject_strategy = std::constructible_from<Strategy, rpp::composite_subscription> && requires(Strategy t)
concept subject_strategy = requires(Strategy t)
{
{t.get_subscriber()} -> rpp::constraint::subscriber;
t.on_subscribe(std::declval<rpp::dynamic_subscriber<T>>());
Expand All @@ -32,4 +32,7 @@ namespace rpp::subjects
{
template<rpp::constraint::decayed_type T>
class publish_subject;

template<rpp::constraint::decayed_type T>
class behavior_subject;
} // namespace rpp::subjects
14 changes: 11 additions & 3 deletions src/rpp/rpp/subjects/publish_subject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ template<rpp::constraint::decayed_type T>
class publish_strategy
{
public:
publish_strategy(const composite_subscription& sub)
: m_sub{sub}
template<rpp::constraint::decayed_same_as<composite_subscription> TSub>
publish_strategy(TSub&& sub)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ bugprone-forwarding-reference-overload ⚠️
constructor accepting a forwarding reference can hide the copy and move constructors

: m_sub{std::forward<TSub>(sub)}
{
m_sub.add([state = std::weak_ptr{m_state}]
{
Expand Down Expand Up @@ -74,5 +75,12 @@ namespace rpp::subjects
* \see https://reactivex.io/documentation/subject.html
*/
template<rpp::constraint::decayed_type T>
class publish_subject final : public details::base_subject<T, details::publish_strategy<T>>{};
class publish_subject final : public details::base_subject<T, details::publish_strategy<T>>{
public:
publish_subject(const composite_subscription& sub)
: details::base_subject<T, details::publish_strategy<T>>{sub} {}

publish_subject(composite_subscription&& sub = composite_subscription{})
: details::base_subject<T, details::publish_strategy<T>>{std::move(sub)} {}
};
} // namespace rpp::subjects
50 changes: 49 additions & 1 deletion src/tests/rpp/test_subjects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "mock_observer.hpp"

#include <catch2/catch_test_macros.hpp>
#include <rpp/subjects/behavior_subject.hpp>
#include <rpp/subjects/publish_subject.hpp>

SCENARIO("publish subject multicasts values")
Expand Down Expand Up @@ -231,4 +232,51 @@ SCENARIO("publish subject caches error/completed/unsubscribe")
}
}
}
}
}

SCENARIO("behavior subject caches last emission")
{
auto mock = mock_observer<int>{};
GIVEN("behavior subject")
{
auto subj = rpp::subjects::behavior_subject<int>{100};
WHEN("subscribe on it")
{
subj.get_observable().subscribe(mock);
THEN("subject emits initial value on subscribe")
{
CHECK(mock.get_received_values() == std::vector{100});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
}
WHEN("send new value to subject")
{
subj.get_subscriber().on_next(5);
AND_WHEN("subscribe on it")
{
subj.get_observable().subscribe(mock);
THEN("subject emits last value on subscribe")
{
CHECK(mock.get_received_values() == std::vector{5});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
}
}
WHEN("complete subject")
{
subj.get_subscriber().on_completed();
AND_WHEN("subscribe on it")
{
subj.get_observable().subscribe(mock);
THEN("subject emits completion on subscribe")
{
CHECK(mock.get_received_values() == std::vector<int>{});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
}
}