From 6737d8535b11306de59225ba021a6e8e73f6ed70 Mon Sep 17 00:00:00 2001 From: wokron Date: Tue, 2 Jun 2026 20:23:41 +0800 Subject: [PATCH 1/5] zcrx also use SelectBufferCQEHandler --- include/condy/cqe_handler.hpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/include/condy/cqe_handler.hpp b/include/condy/cqe_handler.hpp index 729b94a7..a0641347 100644 --- a/include/condy/cqe_handler.hpp +++ b/include/condy/cqe_handler.hpp @@ -54,7 +54,9 @@ struct SimpleCQEHandler { * result of the operation (the value of `cqe->res`) and the selected buffer, * whose type is determined by the buffer ring. */ -template class SelectBufferCQEHandler { +template + requires(requires(Br *br, io_uring_cqe *cqe) { br->handle_finish(cqe); }) +class SelectBufferCQEHandler { public: SelectBufferCQEHandler(Br *buffers) : buffers_(buffers) {} From c7ea74c19abc37534b098414eca1159f01ba191f Mon Sep 17 00:00:00 2001 From: wokron Date: Tue, 2 Jun 2026 20:25:16 +0800 Subject: [PATCH 2/5] support zcrx --- include/condy.hpp | 1 + include/condy/zcrx.hpp | 296 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 297 insertions(+) create mode 100644 include/condy/zcrx.hpp diff --git a/include/condy.hpp b/include/condy.hpp index 067deb95..ae49ce40 100644 --- a/include/condy.hpp +++ b/include/condy.hpp @@ -27,6 +27,7 @@ #include "condy/sync_wait.hpp" // IWYU pragma: export #include "condy/task.hpp" // IWYU pragma: export #include "condy/version.hpp" // IWYU pragma: export +#include "condy/zcrx.hpp" // IWYU pragma: export /** * @brief The main namespace for the Condy library. diff --git a/include/condy/zcrx.hpp b/include/condy/zcrx.hpp new file mode 100644 index 00000000..2868f7ad --- /dev/null +++ b/include/condy/zcrx.hpp @@ -0,0 +1,296 @@ +#pragma once + +#include "condy/detail/buffers.hpp" +#include "condy/detail/context.hpp" +#include "condy/detail/utils.hpp" +#include "condy/runtime.hpp" +#include +#include + +namespace condy { + +#if !IO_URING_CHECK_VERSION(2, 15) // >= 2.15 + +class ZeroCopyRxBufferPool; + +/** + * @brief Buffer from a ZeroCopyRxBufferPool. + * @details This buffer type is used for buffers obtained from a + * ZeroCopyRxBufferPool. It automatically returns the buffer to the pool when it + * is out of scope. + * @note The lifetime of the buffer must not exceed the lifetime of the + * ZeroCopyRxBufferPool it is associated with. + */ +class ZeroCopyRxBuffer : public detail::ManagedBuffer { +public: + using Base = detail::ManagedBuffer; + using Base::Base; +}; + +/** + * @brief Area for zero-copy receive buffers. + */ +struct ZeroCopyRxArea { + void *addr = nullptr; + size_t size; +}; + +/** + * @brief Area for zero-copy receive buffers using DMA-BUF. + */ +struct ZeroCopyRxDMABufArea { + int dmabuf_fd; + size_t offset; + size_t size; +}; + +/** + * @brief Buffer pool for zero-copy receive buffers. + * @details This buffer pool utilizes the io_uring zcrx feature to provide + * zero-copy receive buffers. It can be used to receive data directly into + * user-space buffers without copying, which can improve performance for + * high-throughput network applications. + * @returns std::pair When passed to async + * operations, the return type will be a pair of the operation result and the + * @ref ZeroCopyRxBuffer. + * @note The lifetime of this pool must not exceed the running period of the + * associated Runtime, and the lifetime of any ZeroCopyRxBuffer obtained from + * this pool must not exceed the lifetime of this pool. + */ +class ZeroCopyRxBufferPool { +public: + /** + * @brief Construct a new Zero Copy Rx Buffer Pool object + * @param if_idx Network interface index to register the buffer pool with. + * @param if_rxq Receive queue index to register the buffer pool with. + * @param rq_entries Number of receive queue entries. + * @param area Area for zero-copy receive buffers. + */ + ZeroCopyRxBufferPool(uint32_t if_idx, uint32_t if_rxq, uint32_t rq_entries, + const ZeroCopyRxArea &area) + : ZeroCopyRxBufferPool(*detail::Context::current().runtime(), if_idx, + if_rxq, rq_entries, area) {} + + /** + * @brief Construct a new Zero Copy Rx Buffer Pool object + * @param runtime The runtime to register the buffer pool with. + * @param if_idx Network interface index to register the buffer pool with. + * @param if_rxq Receive queue index to register the buffer pool with. + * @param rq_entries Number of receive queue entries. + * @param area Area for zero-copy receive buffers. + */ + ZeroCopyRxBufferPool(Runtime &runtime, uint32_t if_idx, uint32_t if_rxq, + uint32_t rq_entries, const ZeroCopyRxArea &area) + : ZeroCopyRxBufferPool(runtime.ring_internal(), if_idx, if_rxq, + rq_entries, area, 0) {} + + // Device-less constructor, DO NOT use this in production code if you don't + // know what you are doing. + ZeroCopyRxBufferPool(Runtime &runtime, uint32_t rq_entries, + const ZeroCopyRxArea &area) + : ZeroCopyRxBufferPool(runtime.ring_internal(), 0, 0, rq_entries, area, + ZCRX_REG_NODEV) {} + + /** + * @brief Construct a new Zero Copy Rx Buffer Pool object + * @param if_idx Network interface index to register the buffer pool with. + * @param if_rxq Receive queue index to register the buffer pool with. + * @param rq_entries Number of receive queue entries. + * @param area Area for zero-copy receive buffers using DMA-BUF. + */ + ZeroCopyRxBufferPool(uint32_t if_idx, uint32_t if_rxq, uint32_t rq_entries, + const ZeroCopyRxDMABufArea &area) + : ZeroCopyRxBufferPool(*detail::Context::current().runtime(), if_idx, + if_rxq, rq_entries, area) {} + + /** + * @brief Construct a new Zero Copy Rx Buffer Pool object + * @param runtime The runtime to register the buffer pool with. + * @param if_idx Network interface index to register the buffer pool with. + * @param if_rxq Receive queue index to register the buffer pool with. + * @param rq_entries Number of receive queue entries. + * @param area Area for zero-copy receive buffers using DMA-BUF. + */ + ZeroCopyRxBufferPool(Runtime &runtime, uint32_t if_idx, uint32_t if_rxq, + uint32_t rq_entries, const ZeroCopyRxDMABufArea &area) + : ring_(&runtime.ring_internal()), flags_(0) { + area_size_ = 0; + area_ptr_ = nullptr; + + io_uring_zcrx_area_reg area_reg = {}; + area_reg.addr = area.offset; + area_reg.len = area.size; + area_reg.flags = IORING_ZCRX_AREA_DMABUF; + + register_ifq_(if_idx, if_rxq, rq_entries, area_reg, + sysconf(_SC_PAGESIZE)); + } + + ~ZeroCopyRxBufferPool() { + [[maybe_unused]] int r; + if (area_size_ > 0) { + assert(area_ptr_ != nullptr); + r = munmap(area_ptr_, area_size_); + assert(r == 0); + } + assert(rq_ring_.ring_ptr != nullptr); + r = munmap(rq_ring_.ring_ptr, ring_size_); + assert(r == 0); + // TODO: Unregister ifq + } + + CONDY_DELETE_COPY_MOVE(ZeroCopyRxBufferPool); + +private: + ZeroCopyRxBufferPool(detail::Ring &ring, uint32_t if_idx, uint32_t if_rxq, + uint32_t rq_entries, const ZeroCopyRxArea &area, + uint32_t flags) + : ring_(&ring), flags_(flags) { + const size_t page_size = sysconf(_SC_PAGESIZE); + + if (area.addr == nullptr) { + area_size_ = detail::align_up(area.size, page_size); + area_ptr_ = mmap(nullptr, area_size_, PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, 0, 0); + if (area_ptr_ == MAP_FAILED) { + throw detail::make_system_error("mmap"); + } + auto d = detail::defer([&]() { munmap(area_ptr_, area_size_); }); + + io_uring_zcrx_area_reg area_reg = {}; + area_reg.addr = reinterpret_cast(area_ptr_); + area_reg.len = area_size_; + area_reg.flags = 0; + + register_ifq_(if_idx, if_rxq, rq_entries, area_reg, page_size); + + d.dismiss(); + } else { + // Not owned, so we don't track the size for unmapping + area_size_ = 0; + area_ptr_ = area.addr; + + io_uring_zcrx_area_reg area_reg = {}; + area_reg.addr = reinterpret_cast(area_ptr_); + area_reg.len = area.size; + area_reg.flags = 0; + + register_ifq_(if_idx, if_rxq, rq_entries, area_reg, page_size); + } + } + +public: + uint32_t zcrx_id() const noexcept { return zcrx_id_; } + + ZeroCopyRxBuffer handle_finish(io_uring_cqe *cqe) noexcept { + if (cqe->res < 0) { + return ZeroCopyRxBuffer(); + } + io_uring_zcrx_cqe *rcqe = + reinterpret_cast(cqe + 1); + void *data = static_cast(area_ptr_) + + (rcqe->off & ~IORING_ZCRX_AREA_MASK); + size_t size = static_cast(cqe->res); + return ZeroCopyRxBuffer(data, size, this); + } + + void add_buffer_back(void *ptr, size_t size) noexcept { + rq_enqueue_(ptr, size); + maybe_flush_rq_(); + } + +private: + void register_ifq_(uint32_t if_idx, uint32_t if_rxq, uint32_t rq_entries, + io_uring_zcrx_area_reg &area_reg, size_t page_size) { + rq_entries = std::bit_ceil(rq_entries); + io_uring_region_desc region_reg = {}; + ring_size_ = get_refill_ring_size_(rq_entries, page_size); + region_reg.user_addr = 0; + region_reg.size = ring_size_; + region_reg.flags = 0; + + io_uring_zcrx_ifq_reg reg = {}; + reg.if_idx = if_idx; + reg.if_rxq = if_rxq; + reg.rq_entries = rq_entries; + reg.area_ptr = reinterpret_cast(&area_reg); + reg.region_ptr = reinterpret_cast(®ion_reg); + reg.flags = flags_; + + int r = io_uring_register_ifq(ring_->ring(), ®); + if (r != 0) { + throw detail::make_system_error("io_uring_register_ifq", -r); + } + // TODO: unregister ifq if any exception + + void *ring_ptr = mmap(nullptr, ring_size_, PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_POPULATE, ring_->ring()->ring_fd, + static_cast(region_reg.mmap_offset)); + if (ring_ptr == MAP_FAILED) { + throw detail::make_system_error("mmap"); + } + rq_ring_.khead = (unsigned int *)((char *)ring_ptr + reg.offsets.head); + rq_ring_.ktail = (unsigned int *)((char *)ring_ptr + reg.offsets.tail); + rq_ring_.rqes = + (struct io_uring_zcrx_rqe *)((char *)ring_ptr + reg.offsets.rqes); + rq_ring_.rq_tail = 0; + rq_ring_.ring_entries = reg.rq_entries; + rq_ring_.ring_ptr = ring_ptr; + + zcrx_id_ = reg.zcrx_id; + area_token_ = area_reg.rq_area_token; + } + + static size_t get_refill_ring_size_(uint32_t rq_entries, + size_t page_size) noexcept { + size_t ring_size = rq_entries * sizeof(io_uring_zcrx_rqe); + ring_size += page_size; + ring_size = detail::align_up(ring_size, page_size); + return ring_size; + } + + size_t rq_nr_queued_() const noexcept { + return rq_ring_.rq_tail - io_uring_smp_load_acquire(rq_ring_.khead); + } + + void rq_enqueue_(void *ptr, size_t size) noexcept { + assert(rq_nr_queued_() < rq_ring_.ring_entries); + io_uring_zcrx_rqe *rqe; + unsigned rq_mask = rq_ring_.ring_entries - 1; + rqe = &rq_ring_.rqes[rq_ring_.rq_tail & rq_mask]; + rqe->off = (static_cast(ptr) - static_cast(area_ptr_)) | + area_token_; + rqe->len = static_cast(size); + io_uring_smp_store_release(rq_ring_.ktail, ++rq_ring_.rq_tail); + } + + void flush_rq_() noexcept { + zcrx_ctrl ctrl = {}; + ctrl.zcrx_id = zcrx_id_; + ctrl.op = ZCRX_CTRL_FLUSH_RQ; + [[maybe_unused]] int r = + io_uring_register_zcrx_ctrl(ring_->ring(), &ctrl); + assert(r == 0); + } + + void maybe_flush_rq_() noexcept { + if (rq_nr_queued_() >= rq_ring_.ring_entries || + (flags_ & ZCRX_REG_NODEV)) { + flush_rq_(); + } + } + +private: + detail::Ring *ring_; + void *area_ptr_; + size_t area_size_; + size_t ring_size_; + io_uring_zcrx_rq rq_ring_; + uint32_t zcrx_id_; + uint64_t area_token_; + uint32_t flags_; +}; + +#endif + +} // namespace condy \ No newline at end of file From 3acfc1744bcd4204c84b691036df104dfebdedc3 Mon Sep 17 00:00:00 2001 From: wokron Date: Tue, 2 Jun 2026 20:28:07 +0800 Subject: [PATCH 3/5] add zcrx async_recv_multishot --- include/condy/async_operations.hpp | 22 ++++++++++++++++++++++ include/condy/detail/async_operations.hpp | 9 +++++++++ 2 files changed, 31 insertions(+) diff --git a/include/condy/async_operations.hpp b/include/condy/async_operations.hpp index 469115e4..121cc1c5 100644 --- a/include/condy/async_operations.hpp +++ b/include/condy/async_operations.hpp @@ -13,6 +13,7 @@ #include "condy/condy_uring.hpp" #include "condy/detail/async_operations.hpp" #include "condy/detail/helpers.hpp" +#include "condy/zcrx.hpp" namespace condy { @@ -647,6 +648,27 @@ inline auto async_recv_multishot(Fd sockfd, Buffer &buf, int flags, } #endif +#if !IO_URING_CHECK_VERSION(2, 15) // >= 2.15 +/** + * @brief See https://docs.kernel.org/networking/iou-zcrx.html + */ +template +inline auto async_recv_multishot(Fd sockfd, ZeroCopyRxBufferPool &pool, + [[maybe_unused]] int flags, + MultiShotFunc &&func) { + auto zcrx_id = pool.zcrx_id(); + auto prep_func = [=](detail::Ring *ring) { + auto *sqe = ring->get_sqe(); + detail::prep_recv_zc_multishot(sqe, sockfd, zcrx_id); + return sqe; + }; + auto op = build_multishot_op_awaiter< + SelectBufferCQEHandler>( + std::move(prep_func), std::forward(func), &pool); + return detail::maybe_flag_fixed_fd(std::move(op), sockfd); +} +#endif + /** * @brief See io_uring_prep_openat2 */ diff --git a/include/condy/detail/async_operations.hpp b/include/condy/detail/async_operations.hpp index d2764157..425eaab2 100644 --- a/include/condy/detail/async_operations.hpp +++ b/include/condy/detail/async_operations.hpp @@ -191,5 +191,14 @@ inline void prep_sendto_zc_fixed(io_uring_sqe *sqe, int sockfd, const void *buf, sqe->buf_index = buf_index; } +#if !IO_URING_CHECK_VERSION(2, 15) // >= 2.15 +inline void prep_recv_zc_multishot(io_uring_sqe *sqe, int fd, + uint32_t zcrx_id) { + io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, fd, nullptr, 0, 0); + sqe->ioprio |= IORING_RECV_MULTISHOT; + sqe->zcrx_ifq_idx = zcrx_id; +} +#endif + } // namespace detail } // namespace condy \ No newline at end of file From 57ecd5a02910453da72cb4e2e527e351b23b51ec Mon Sep 17 00:00:00 2001 From: wokron Date: Tue, 2 Jun 2026 20:28:35 +0800 Subject: [PATCH 4/5] add zcrx test --- tests/test_async_operations.4.cpp | 54 +++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/tests/test_async_operations.4.cpp b/tests/test_async_operations.4.cpp index 8e6ef260..806f2c1d 100644 --- a/tests/test_async_operations.4.cpp +++ b/tests/test_async_operations.4.cpp @@ -4,6 +4,7 @@ #include "condy/runtime.hpp" #include "condy/runtime_options.hpp" #include "condy/sync_wait.hpp" +#include "condy/zcrx.hpp" #include "helpers.hpp" #include #include @@ -1081,4 +1082,57 @@ TEST_CASE("test async_operations - test pipe - direct") { }; condy::sync_wait(func()); } +#endif + +#if !IO_URING_CHECK_VERSION(2, 15) // >= 2.15 +TEST_CASE("test async_operations - test recv - zc multishot") { + int sv[2]; + create_tcp_socketpair(sv); + + condy::Runtime runtime( + condy::RuntimeOptions().enable_cqe32().enable_defer_taskrun()); + condy::ZeroCopyRxBufferPool pool(runtime, 256, + condy::ZeroCopyRxArea{.size = 8ul * 4096}); + + auto msg = generate_data(9ul * 4096); + ssize_t r = send(sv[1], msg.data(), msg.size(), 0); + REQUIRE(r == msg.size()); + close(sv[1]); + + auto func = [&]() -> condy::Coro { + size_t count = 0; + std::string actual; + + condy::Channel channel(16); + + auto [n, buf] = + co_await condy::async_recv_multishot(sv[0], pool, 0, [&](auto res) { + auto &[n, buf] = res; + REQUIRE(n == 4096); + actual.append(static_cast(buf.data()), n); + count++; + REQUIRE(channel.try_push(std::move(buf)) == 0); + }); + REQUIRE(n == -ENOMEM); + REQUIRE(count == 8); + + auto [r, tmp] = co_await channel.pop(); + tmp.reset(); // Release the buffer back to the pool + + auto [n2, buf2] = + co_await condy::async_recv_multishot(sv[0], pool, 0, [&](auto res) { + auto &[n, buf] = res; + REQUIRE(n == 4096); + actual.append(static_cast(buf.data()), n); + count++; + }); + REQUIRE(n2 == 0); + REQUIRE(count == 9); + + REQUIRE(actual == msg); + }; + condy::sync_wait(runtime, func()); + + close(sv[0]); +} #endif \ No newline at end of file From 55c7b82d4a3a0fed613b0ae47496849a8b102fcb Mon Sep 17 00:00:00 2001 From: wokron Date: Tue, 30 Jun 2026 15:25:43 +0800 Subject: [PATCH 5/5] add zcrx docs --- docs/guide.md | 116 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 112 insertions(+), 4 deletions(-) diff --git a/docs/guide.md b/docs/guide.md index 8f08ba0f..f7375a27 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -643,9 +643,6 @@ condy::Coro session(int session_fd) { } ``` -> [!NOTE] -> io_uring also supports Zero Copy Rx. Condy will support this feature in the future. - ### File Registration io_uring allows you to register files with the kernel. Normally, each asynchronous operation increments/decrements the file's reference count, but registering files with the kernel can skip this process and improve performance. @@ -832,7 +829,7 @@ condy::Coro co_main() { break; } - // Error handling (excluding buffer pool full) + // Error handling (excluding buffer pool empty) if (res < 0 && res != -ENOBUFS) { ch.push_close(); co_return 1; @@ -845,6 +842,117 @@ condy::Coro co_main() { int main() { return condy::sync_wait(co_main()); } ``` +### Zero Copy Rx + +Similar to Zero Copy Tx, io_uring also supports receiving data directly into user-space memory, allowing the NIC to write incoming data without kernel-to-user copies. Condy provides the `condy::ZeroCopyRxBufferPool` type to encapsulate this feature. + +`condy::ZeroCopyRxBufferPool` manages a set of pre-registered buffers. When used with `condy::async_recv_multishot()`, each received packet is placed into a buffer from the pool, and the coroutine receives a `condy::ZeroCopyRxBuffer` — an RAII type that automatically returns the buffer to the pool when it goes out of scope. + +Constructing a `condy::ZeroCopyRxBufferPool` requires: +- A network interface index (`if_idx`) and receive queue index (`if_rxq`). +- The number of receive queue entries (`rq_entries`). +- A `condy::ZeroCopyRxArea` describing the memory region, or a `condy::ZeroCopyRxDMABufArea` for DMA-BUF scenarios. + +> [!NOTE] +> See https://docs.kernel.org/networking/iou-zcrx.html for more information. + +The following example demonstrates a echo server using Zero Copy Rx. The structure is similar to the Provided Buffers example above — `condy::ZeroCopyRxBufferPool` and `condy::ProvidedBufferPool` share the same usage pattern with `condy::async_recv_multishot()`. + +```cpp +#include +#include + +// Background coroutine: handle buffers from the Channel +condy::Coro +handle_buffers(condy::Channel> &ch, + int session_fd) { + while (true) { + auto [r, data] = co_await ch.pop(); + if (r == -EPIPE) { + break; + } + assert(r == 0); + auto &[n, buffer] = data; + co_await condy::async_write(session_fd, + condy::buffer(buffer.data(), n), 0); + } +} + +// Main coroutine: receive data and use zero-copy buffer pool +condy::Coro co_main() { + sockaddr_in server_addr{}; + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(8080); + inet_pton(AF_INET, "127.0.0.1", &server_addr.sin_addr); + + int server_fd = co_await condy::async_socket(AF_INET, SOCK_STREAM, 0, 0); + if (server_fd < 0) { + co_return 1; + } + + int r = + ::bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)); + if (r < 0) { + co_await condy::async_close(server_fd); + co_return 1; + } + + r = ::listen(server_fd, 10); + if (r < 0) { + co_await condy::async_close(server_fd); + co_return 1; + } + + // Accept one client connection + sockaddr_in client_addr; + socklen_t client_addr_len = sizeof(client_addr); + int session_fd = co_await condy::async_accept( + server_fd, (struct sockaddr *)&client_addr, &client_addr_len, 0); + if (session_fd < 0) { + co_await condy::async_close(server_fd); + co_return 1; + } + + // Create zero-copy receive buffer pool. + // For demonstration, we use the device-less constructor so the example + // runs without requiring a specific NIC. In production, provide if_idx + // and if_rxq matching your hardware. + condy::ZeroCopyRxBufferPool pool(condy::current_runtime(), 256, + condy::ZeroCopyRxArea{.size = 8ul * 4096}); + condy::Channel> ch(16); + + // Spawn background task to handle writes + condy::co_spawn(handle_buffers(ch, session_fd)).detach(); + + while (true) { + // Multishot receive with zero-copy: callback handles normal results; + // coroutine resumes on final error or termination + auto [res, buf] = co_await condy::async_recv_multishot( + session_fd, pool, 0, condy::will_push(ch)); + + if (res == 0) { + // Termination signal + ch.push_close(); + break; + } + + // Error handling (excluding buffer pool empty) + if (res < 0 && res != -ENOMEM) { + ch.push_close(); + co_return 1; + } + } + + co_return 0; +} + +int main() { + condy::Runtime runtime( + condy::RuntimeOptions().enable_cqe32().enable_defer_taskrun()); + return condy::sync_wait(runtime, co_main()); +} +``` + ### Initialization Options As mentioned earlier, the `condy::Runtime` type can accept a `condy::RuntimeOptions` object, which contains a series of configurable initialization parameters for `condy::Runtime`. These parameters can be set using chained calls as shown below: