Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 107 additions & 2 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,113 @@ condy::Coro<void> session(int session_fd) {
}
```

> [!NOTE]
> io_uring also supports Zero Copy Rx. Condy will support this feature in the future.
### Zero Copy Rx

Similar to Zero Copy Tx, io_uring also supports receiving data directly into user-space memory, allowing the NIC to write incoming data without kernel-to-user copies. Condy provides the `condy::ZeroCopyRxBufferPool` type to encapsulate this feature.

`condy::ZeroCopyRxBufferPool` manages a set of pre-registered buffers. When used with `condy::async_recv_multishot()`, each received packet is placed into a buffer from the pool, and the coroutine receives a `condy::ZeroCopyRxBuffer` — an RAII type that automatically returns the buffer to the pool when it goes out of scope.

Constructing a `condy::ZeroCopyRxBufferPool` requires:
- A network interface index (`if_idx`) and receive queue index (`if_rxq`).
- The number of receive queue entries (`rq_entries`).
- A `condy::ZeroCopyRxArea` describing the memory region, or a `condy::ZeroCopyRxDMABufArea` for DMA-BUF scenarios.

The following example demonstrates a echo server using Zero Copy Rx. The structure is similar to the Provided Buffers example above — `condy::ZeroCopyRxBufferPool` and `condy::ProvidedBufferPool` share the same usage pattern with `condy::async_recv_multishot()`.

```cpp
#include <arpa/inet.h>
#include <condy.hpp>

// Background coroutine: handle buffers from the Channel
condy::Coro<void>
handle_buffers(condy::Channel<std::pair<int, condy::ZeroCopyRxBuffer>> &ch,
int session_fd) {
while (true) {
auto [r, data] = co_await ch.pop();
if (r == -EPIPE) {
break;
}
assert(r == 0);
auto &[n, buffer] = data;
co_await condy::async_write(session_fd,
condy::buffer(buffer.data(), n), 0);
}
}

// Main coroutine: receive data and use zero-copy buffer pool
condy::Coro<int> co_main() {
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8080);
inet_pton(AF_INET, "127.0.0.1", &server_addr.sin_addr);

int server_fd = co_await condy::async_socket(AF_INET, SOCK_STREAM, 0, 0);
if (server_fd < 0) {
co_return 1;
}

int r =
::bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (r < 0) {
co_await condy::async_close(server_fd);
co_return 1;
}

r = ::listen(server_fd, 10);
if (r < 0) {
co_await condy::async_close(server_fd);
co_return 1;
}

// Accept one client connection
sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int session_fd = co_await condy::async_accept(
server_fd, (struct sockaddr *)&client_addr, &client_addr_len, 0);
if (session_fd < 0) {
co_await condy::async_close(server_fd);
co_return 1;
}

// Create zero-copy receive buffer pool.
// For demonstration, we use the device-less constructor so the example
// runs without requiring a specific NIC. In production, provide if_idx
// and if_rxq matching your hardware.
condy::ZeroCopyRxBufferPool pool(condy::current_runtime(), 256,
condy::ZeroCopyRxArea{.size = 8ul * 4096});
condy::Channel<std::pair<int, condy::ZeroCopyRxBuffer>> ch(16);

// Spawn background task to handle writes
condy::co_spawn(handle_buffers(ch, session_fd)).detach();

while (true) {
// Multishot receive with zero-copy: callback handles normal results;
// coroutine resumes on final error or termination
auto [res, buf] = co_await condy::async_recv_multishot(
session_fd, pool, 0, condy::will_push(ch));

if (res == 0) {
// Termination signal
ch.push_close();
break;
}

// Error handling (excluding buffer pool full)
if (res < 0 && res != -ENOMEM) {
ch.push_close();
co_return 1;
}
}

co_return 0;
}

int main() {
condy::Runtime runtime(
condy::RuntimeOptions().enable_cqe32().enable_defer_taskrun());
return condy::sync_wait(runtime, co_main());
}
```

### File Registration

Expand Down
1 change: 1 addition & 0 deletions include/condy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "condy/sync_wait.hpp" // IWYU pragma: export
#include "condy/task.hpp" // IWYU pragma: export
#include "condy/version.hpp" // IWYU pragma: export
#include "condy/zcrx.hpp" // IWYU pragma: export

/**
* @brief The main namespace for the Condy library.
Expand Down
22 changes: 22 additions & 0 deletions include/condy/async_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "condy/condy_uring.hpp"
#include "condy/detail/async_operations.hpp"
#include "condy/detail/helpers.hpp"
#include "condy/zcrx.hpp"

namespace condy {

Expand Down Expand Up @@ -647,6 +648,27 @@ inline auto async_recv_multishot(Fd sockfd, Buffer &buf, int flags,
}
#endif

#if !IO_URING_CHECK_VERSION(2, 15) // >= 2.15
/**
* @brief See https://docs.kernel.org/networking/iou-zcrx.html
*/
template <FdLike Fd, typename MultiShotFunc>
inline auto async_recv_multishot(Fd sockfd, ZeroCopyRxBufferPool &pool,
[[maybe_unused]] int flags,
MultiShotFunc &&func) {
auto zcrx_id = pool.zcrx_id();
auto prep_func = [=](detail::Ring *ring) {
auto *sqe = ring->get_sqe();
detail::prep_recv_zc_multishot(sqe, sockfd, zcrx_id);
return sqe;
};
auto op = build_multishot_op_awaiter<
SelectBufferCQEHandler<ZeroCopyRxBufferPool>>(
std::move(prep_func), std::forward<MultiShotFunc>(func), &pool);
return detail::maybe_flag_fixed_fd(std::move(op), sockfd);
}
#endif

/**
* @brief See io_uring_prep_openat2
*/
Expand Down
4 changes: 3 additions & 1 deletion include/condy/cqe_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ struct SimpleCQEHandler {
* result of the operation (the value of `cqe->res`) and the selected buffer,
* whose type is determined by the buffer ring.
*/
template <BufferRingLike Br> class SelectBufferCQEHandler {
template <typename Br>
requires(requires(Br *br, io_uring_cqe *cqe) { br->handle_finish(cqe); })
class SelectBufferCQEHandler {
public:
SelectBufferCQEHandler(Br *buffers) : buffers_(buffers) {}

Expand Down
9 changes: 9 additions & 0 deletions include/condy/detail/async_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,5 +191,14 @@ inline void prep_sendto_zc_fixed(io_uring_sqe *sqe, int sockfd, const void *buf,
sqe->buf_index = buf_index;
}

#if !IO_URING_CHECK_VERSION(2, 15) // >= 2.15
inline void prep_recv_zc_multishot(io_uring_sqe *sqe, int fd,
uint32_t zcrx_id) {
io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, fd, nullptr, 0, 0);
sqe->ioprio |= IORING_RECV_MULTISHOT;
sqe->zcrx_ifq_idx = zcrx_id;
}
#endif

} // namespace detail
} // namespace condy
Loading
Loading