Support for zero-copying and alloc free publish#1912
Support for zero-copying and alloc free publish#1912PauloHMattos wants to merge 6 commits intorabbitmq:mainfrom
Conversation
|
I initially experimented with exposing publish APIs that accept
With the implementation of |
ad16cde to
4df1a69
Compare
This commit replaces the generic `RentedMemory` wrapper with a domain-specific `OutgoingFrameMemory` struct, laying the groundwork for streamed, zero-copy frame writing. By introducing the `WriteTo(IBufferWriter<byte>)` method, we invert the control of how frames are written to the pipeline. While this implementation still relies on an intermediate rented array from the ArrayPool, this seam allows us to eventually serialize data directly into the PipeWriter's memory segments.
- Converts `SocketFrameHandler` into an abstract base class. - Moves the `System.Threading.Channels` queue and `WriteLoopAsync` task into a new `BackgroundSocketFrameHandler` implementation. This separates the asynchronous dispatching logic from the core socket handling. It sets the stage for future performance optimizations, specifically the addition of a synchronous/inline frame handler that writes directly to the pipeline.
This commit shifts the frame serialization model from pre-allocating a single, fully framed byte array to a deferred, streamed writing model using `IBufferWriter<byte>`. Key changes: - Added `BodySegment.WriteTo(IBufferWriter<byte>, ...)` to allow body chunks to be written directly into the pipeline's memory via `GetSpan()` and `Advance()`, bypassing intermediate framing arrays. - Refactored `OutgoingFrameMemory` to store the pre-serialized method/header separate from the payload body. The final assembly and chunking now happen dynamically when `WriteTo` is called. - In `SerializeToFrames`, the payload body is still temporarily copied into a rented array (`rentedBody`). This preserves thread safety for the existing background dispatch queue. This is a transitional architectural step. By moving the chunking logic into the push-based `WriteTo` phase, we are now prepared to introduce an inline socket writer and completely eliminate the body copy
This commit extracts the copying of the message body out of the core framing logic (`Frame.cs`) and isolates it to the edge of the background dispatch queue (`BackgroundSocketFrameHandler`). Key changes: - Modifies `Frame.SerializeToFrames` to no longer allocate a rented array for the message body. The framing phase now strictly returns an `OutgoingFrameMemory` containing the pre-serialized headers and the raw, uncopied `ReadOnlyMemory<byte>` body. - Adds an ownership-transferring copy method to `OutgoingFrameMemory` to materialize the payload into a rented array safely. - Updates `BackgroundSocketFrameHandler.InternalWriteAsync` to invoke this copy right before pushing to the channel, ensuring the memory remains valid for the background writer loop. By deferring the body copy to the specific handler that requires it, we isolate the performance penalty of copying payloads strictly to the background queueing model. Because `OutgoingFrameMemory` is a struct, materializing the body requires transferring ownership of the underlying rented arrays to a new struct instance to prevent double-dispose bugs on the ArrayPool. This decoupling is the final structural prerequisite for zero-copy publish
This commit introduces `InlineSocketFrameHandler`, the culmination of the recent framing refactoring designed to achieve zero-copy payload serialization. Unlike the `BackgroundSocketFrameHandler`, this inline implementation bypasses the `System.Threading.Channels` queue entirely. By executing synchronously on the caller's thread, it can take the raw `OutgoingFrameMemory` and stream the user's `ReadOnlyMemory<byte>` body directly to the `PipeWriter` without triggering the expensive `TransferOwnershipAndCopyBody()` array allocation/copy. Key details: - Achieves true zero-copy for message bodies by leveraging `IBufferWriter<byte>.GetSpan()` directly. - Introduces a `SemaphoreSlim(1, 1)` to serialize access to the `PipeWriter`, ensuring thread safety since multiple threads can invoke `TransmitAsync` concurrently, and `PipeWriter` does not support concurrent writes. - Disposes of the frame inline to immediately return the rented header arrays to the ArrayPool. Note: This commit only adds the implementation. The active factory logic to select between the background and inline handlers will be introduced via configuration in the next commit.
This commit introduces a configuration flag on `ConnectionFactory` to allow users to toggle between the default background frame writer and the newly implemented zero-copy inline frame writer. Key changes: - Added `UseBackgroundFrameWriter` (default: true) to `ConnectionFactory`. - Updated `SocketFrameHandler.CreateAsync` to act as a factory, returning either `BackgroundSocketFrameHandler` or `InlineSocketFrameHandler` based on the configuration. By defaulting to `true`, the standard behavior is strictly maintained: publish calls will continue to push frames to an internal queue and return immediately. Users who publish large payloads or prefer to eliminate this overhead of copying can now opt-in to the inline writer. This achieves true zero-copy publishing by writing uncopied payloads directly to the socket pipeline within the caller's execution context. This completely bypasses the ArrayPool and LOH for message bodies, trading an immediate return for significantly reduced GC pressure and eliminated copy overhead.
4df1a69 to
4949e28
Compare
|
@PauloHMattos thanks for this work. I've marked this PR as draft since it seems like you're still working on it. Take your time and when you are ready for a review, mark it as ready. Thanks. |
|
A couple of clarification questions regarding the write path:
Regarding the memory ownership model:
|
Yep - @PauloHMattos please ensure that both the I immediately removed this PR from the 7.3.0 milestone given its scope and @danielmarbach's comments but have moved it back, for now, if we can address them. |
@lukebakken I was just cleaning some changes from the commits. Now it's "ready", other than any requested changes. |
@danielmarbach I'm AFK today, so I will only be able to give you detailed answers tomorrow. But I think that most of my design decisions were made with the objective of avoiding breaking changes to allow this PR to land on v7.x. If we don't care about breaking changes, or if this will be a strictly 8.x change, it would change a lot of the decisions here. My applications primarily publish large payloads, so the rent/copy overhead is affecting us a lot. So the objective was to have this improvement available as quickly as possible. |
I will take a look at this tomorrow. An easy way to handle this only on CI would be reading the config from an environment variable and configuring a test matrix. |
Proposed Changes
This PR introduces an architectural refactoring of the outgoing framing pipeline to support zero-copy payload serialization, significantly reducing memory allocations, GC pressure, and Large Object Heap (LOH) fragmentation for high-throughput publishers.
Currently, the client uses a background task to dispatch frames to the socket. To safely cross this asynchronous boundary, message payloads must be copied into rented
ArrayPool<byte>buffers. While amortized, copying large messages (>85KB) forces the ArrayPool to rent from the Large Object Heap, leading to severe LOH fragmentation and expensive Gen 2 garbage collections under heavy load.To solve this while maintaining strict backward compatibility, this PR:
Refactors Framing Lifecycle: Replaces
RentedMemorywith a customOutgoingFrameMemorystruct, deferring payload buffer allocations until the exact moment the thread boundary is crossed.Decouples Dispatch Models: Extracts the existing queue logic into
BackgroundSocketFrameHandlerand introduces a newInlineSocketFrameHandler.Implements Zero-Copy: The new
InlineSocketFrameHandlerusesIBufferWriter<byte>.GetSpan()to stream the caller's originalReadOnlyMemory<byte>payload directly to the socket pipeline. This completely bypasses the ArrayPool rent + copy for the message body.Exposes Configuration: Adds
UseBackgroundFrameWriter(defaults to true) toConnectionFactory. By defaulting to true, existing applications see no behavioral changes. Users can set this to false to opt-in to inline, zero-copy publishing (trading immediate task return for lower CPU/GC overhead).Types of Changes
Checklist
CONTRIBUTING.mddocumentFurther Comments
I recommend to review one commit at a time. Each commit contains a small and compete change, that compiles and pass all the tests. If is prefered I can split this into multiple PRs.
Public API naming
I went with the name
UseBackgroundFrameWriterfor theConnectionFactoryoption, but I'm not sure if is clear enough and if it follows the conventions of the projectStruct Ownership Semantics
Because C# lacks strict move semantics, crossing the background thread boundary now requires an explicit call to
OutgoingFrameMemory.TransferOwnershipAndCopyBody().This materializes the payload while transferring ownership of the rented header arrays, preventing double-dispose bugs on the ArrayPool.Thread Safety in Inline Writing
PipeWriterdoes not support concurrent writes. BecauseSessionBase.TransmitAsynccan be called concurrently by multiple threads, the newInlineSocketFrameHandleruses aSemaphoreSlim(1, 1)to synchronize access to the pipe writer.Testing
I've updated the
TestBasicPublishAsyncto test both implementations, but I'm unsure on how to assert that the config is working as expected and the correctSocketFrameHandlerimplementation is being used.I also don't know what other tests should be updated to run with both implementations.
Benchmark
Using the program available in https://github.com/PauloHMattos/RabbitMQ.MemoryTest
I get the following results:
Official version
Body size : 1 MB Iterations : 100 Tasks : 16 Non-copying : False Startup memory : 16 MB--- Start ---
Memory usage: 17 MB
Memory usage: 19 MB
Memory usage: 20 MB
Memory usage: 22 MB
Memory usage: 23 MB
Memory usage: 2239 MB
Memory usage: 2247 MB
Memory usage: 2252 MB
Memory usage: 2252 MB
Memory usage: 2252 MB
Memory usage: 2252 MB
Memory usage: 2252 MB
--- Results ---
Avg time : 9002 ms
Min time : 7358 ms
Max time : 9361 ms
Memory : 2254 MB
Queue length : 5867 / 8000
Valid messages : 100 / 100 (first 100 of 5867)
PR + UseBackgroundFrameWriter = true
Body size : 1 MB Iterations : 100 Tasks : 16 Non-copying : False Startup memory : 17 MB--- Start ---
Memory usage: 17 MB
Memory usage: 19 MB
Memory usage: 20 MB
Memory usage: 22 MB
Memory usage: 23 MB
Memory usage: 2115 MB
Memory usage: 2128 MB
Memory usage: 2133 MB
Memory usage: 2134 MB
Memory usage: 2134 MB
Memory usage: 2135 MB
--- Results ---
Avg time : 8780 ms
Min time : 8546 ms
Max time : 9027 ms
Memory : 2136 MB
Queue length : 8000 / 8000
Valid messages : 100 / 100 (first 100 of 8000)
PR + UseBackgroundFrameWriter = false
Body size : 1 MB Iterations : 100 Tasks : 16 Non-copying : True Startup memory : 16 MB--- Start ---
Memory usage: 17 MB
Memory usage: 19 MB
Memory usage: 19 MB
Memory usage: 22 MB
Memory usage: 22 MB
Memory usage: 98 MB
Memory usage: 47 MB
Memory usage: 98 MB
Memory usage: 76 MB
Memory usage: 70 MB
Memory usage: 66 MB
--- Results ---
Avg time : 8756 ms
Min time : 8470 ms
Max time : 9017 ms
Memory : 51 MB
Queue length : 8000 / 8000
Valid messages : 100 / 100 (first 100 of 8000)
Linked issues
Supeeseds #1445
Closes (?) #1446
Also is somewhat related to #1826