KAFKA-16448: Handle fatal user exception during processing error#16675
Conversation
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the PR, @sebastienviale !
Here my comments!
| } catch (final Exception fatalUserException) { | ||
| log.error( | ||
| "Processing error callback failed after processing error for record {}", | ||
| record, |
There was a problem hiding this comment.
We should not log records for privacy reasons. The exception should be enough.
| response = processingExceptionHandler | ||
| .handle(errorHandlerContext, record, e); |
There was a problem hiding this comment.
nit:
| response = processingExceptionHandler | |
| .handle(errorHandlerContext, record, e); | |
| response = processingExceptionHandler.handle(errorHandlerContext, record, e); |
| try { | ||
| response = processingExceptionHandler | ||
| .handle(errorHandlerContext, record, e); | ||
| } catch (final Exception fatalUserException) { |
There was a problem hiding this comment.
Could you please also add a unit test for this?
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the updates!
LGTM!
| try { | ||
| response = processingExceptionHandler.handle(errorHandlerContext, record, e); | ||
| } catch (final Exception fatalUserException) { | ||
| throw new StreamsException("Fatal user code error in processing error callback", fatalUserException); |
There was a problem hiding this comment.
Should we add an error log before throwing?
Btw: it seems we have such a guard only for the DeserializationExceptionHandler#handle(...) call, but not for the ProductionExceptionHanlder#call(...)? Might be good to do a follow up PR to add it to the proction-error-handler case, too?
There was a problem hiding this comment.
One more thing: if we throw StreamsException here, we might catch it in an upstream process(), right? So should we rather wrap this StreamsException with a FailedProcessingException?
Do we actually have a test for "passing through" of FailedProcessingException (ie, a test which checks if upstream ProcessingExceptionHandler are not called any longer, if a downstream processor had an error and the corresponding handler did return FAIL? If yes, we could re-use this test to also test a crashing handler. If not, let's add new tests for both cases :)
There was a problem hiding this comment.
done:
I changed the StreamException into a FailedProcessingException
and added some test to check that no more processor is executed when an exception occurs or when an exception occurs in a handler.
There was a problem hiding this comment.
Should we add an error log before throwing?
Btw: it seems we have such a guard only for the
DeserializationExceptionHandler#handle(...)call, but not for theProductionExceptionHanlder#call(...)? Might be good to do a follow up PR to add it to the proction-error-handler case, too?
@mjsax is it mandatory for this KIP ?
There was a problem hiding this comment.
One more thing: if we throw
StreamsExceptionhere, we might catch it in an upstreamprocess(), right? So should we rather wrap thisStreamsExceptionwith aFailedProcessingException?
@mjsax Correct. FailedProcessingException is thrown as is:
Do we actually have a test for "passing through" of
FailedProcessingException
We do:
It asserts FailedProcessingException is not handled and thrown as is. Another test has been added to ensure that the cause of the exception is from the processing exception handler (if it is actually thrown by it).
There was a problem hiding this comment.
Btw: it seems we have such a guard only for the DeserializationExceptionHandler#handle(...) call, but not for the ProductionExceptionHanlder#call(...)? Might be good to do a follow up PR to add it to the proction-error-handler case, too?
@mjsax is it mandatory for this KIP ?
Well, we don't want a handler which throws an exception to not crash KS, because the exception is caught in an upstream handler, right? But should be a simple thing to add?
But I see your point that it's not really related to the KIP. I can do a follow up myself.
There was a problem hiding this comment.
Btw: it seems we have such a guard only for the DeserializationExceptionHandler#handle(...) call, but not for the ProductionExceptionHanlder#call(...)? Might be good to do a follow up PR to add it to the proction-error-handler case, too?
@mjsax is it mandatory for this KIP ?
Well, we don't want a handler which throws an exception to not crash KS, because the exception is caught in an upstream handler, right? But should be a simple thing to add?
But I see your point that it's not really related to the KIP. I can do a follow up myself.
Ok, I will open a new PR for that !
77d2ed2 to
4d50ab3
Compare
|
@sebastienviale -- The PR has merge conflicts and needs to be rebased. Maybe also address the open comments from the other PRs on this one. |
9661027 to
9c6f40e
Compare
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com> KAFKA-16448 Handle fatal user exception during processing error Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com> KAFKA-16448 Handle fatal user exception during processing error Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com> KAFKA-16448 Handle fatal user exception during processing error Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com> KAFKA-16448 Handle fatal user exception during processing error Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
9c6f40e to
2be7af8
Compare
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
) This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR catch the exceptions thrown while handling a processing exception Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com> Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
|
Thanks for the PR. Merged to |
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR catch the exceptions thrown while handling a processing exception
Jira: https://issues.apache.org/jira/browse/KAFKA-16448.
Contributors
@Dabz
@sebastienviale
@loicgreffier