diff --git a/src/rpp/rpp/operators/buffer.hpp b/src/rpp/rpp/operators/buffer.hpp index f2e772ea6..325b38292 100644 --- a/src/rpp/rpp/operators/buffer.hpp +++ b/src/rpp/rpp/operators/buffer.hpp @@ -41,8 +41,11 @@ namespace rpp::operators::details m_bucket.push_back(std::forward(v)); if (m_bucket.size() == m_bucket.capacity()) { + const auto capacity = m_bucket.capacity(); m_observer.on_next(std::move(m_bucket)); + m_bucket.clear(); + m_bucket.reserve(capacity); } } diff --git a/src/tests/rpp/test_buffer.cpp b/src/tests/rpp/test_buffer.cpp index 82964c588..a045ce13c 100644 --- a/src/tests/rpp/test_buffer.cpp +++ b/src/tests/rpp/test_buffer.cpp @@ -104,6 +104,18 @@ TEST_CASE("buffer bundles items") | rpp::ops::subscribe(mock); } } + SECTION("accept by moving") + { + REQUIRE_CALL(*mock, on_next_rvalue(std::vector{1, 2})).IN_SEQUENCE(s); + REQUIRE_CALL(*mock, on_next_rvalue(std::vector{3, 4})).IN_SEQUENCE(s); + + rpp::source::just(1, 2, 3, 4) + | rpp::ops::buffer(2) + | rpp::ops::subscribe([&mock](std::vector b) // NOLINT + { + mock.on_next(std::move(b)); + }); + } } TEST_CASE("buffer satisfies disposable contracts")