KAFKA-16448: Catch and handle processing exceptions#16093
KAFKA-16448: Catch and handle processing exceptions#16093cadonna merged 1 commit intoapache:trunkfrom
Conversation
apoorvmittal10
left a comment
There was a problem hiding this comment.
Thanks for the PR, some minor comments around code.
Thanks for the review @apoorvmittal10. As mentioned, this PR is part of a biggest PR that is being divided (#15973). It should be rebase (to ease the final review) and merged after the following PRs have been merged:
|
Sounds good! |
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the PR, @loicgreffier!
Here my feedback!
700404e to
b6007b6
Compare
cadonna
left a comment
There was a problem hiding this comment.
Thanks @loicgreffier @sebastienviale @Dabz!
Here my feedback.
@cadonna The last PR update solved most of the points. I let some conversations open waiting for your feedback |
b04f7ab to
7c8430b
Compare
|
@cadonna Updated! I think only this conversation remains actually open: #16093 (comment) |
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the PR, @loicgreffier, @sebastienviale, and @Dabz!
LGTM!
|
I restarted the build since it ran into a timeout. |
|
The failures of |
|
The e.getMessage() does not contain "Injected test exception" anymore. It is now getCause().getMessage() that contains "Injected test exception" Fix is pushed |
@cadonna This is because now the processing exception is caught and wrapped into a throw new StreamsException("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
e);Thus, in the uncaught exception handler:
Maybe additional assertions should be added in these tests to validate the exception and the cause |
I did not realize this! Thank you for raising this! Your code is breaking uncaught exception handler set with the old deprecated uncaught exception handler. I have to ask how we handle backward compatibility of deprecated APIs. I would say we need to be backwards compatible. |
|
Thanks for the remarks, I did not realize that either! I see two solutions for Backward compatibility: 1/ log an error and throw the original exception: private final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
} catch (final Exception e) {
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.recordContext().rawRecord().key(),
internalProcessorContext.recordContext().rawRecord().value(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());
final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler
.handle(errorHandlerContext, record, e);
if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
throw new StreamsException(e);
} else {
droppedRecordsSensor.record();
}
}2 / append the original error message to the new Exception } catch (final Exception e) {
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.recordContext().rawRecord().key(),
internalProcessorContext.recordContext().rawRecord().value(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());
final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler
.handle(errorHandlerContext, record, e);
if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
throw new StreamsException(String.format("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately. %s", e.getMessage()),
e);
} else {
droppedRecordsSensor.record();
}
}Or maybe you are thinking of another solution ? |
|
Is only the message the issue or also the type of the exception? What exception is passed to the old uncaught exception handler in the eos tests without this PR? Is it a |
8f80225 to
35b9b99
Compare
| if (isTopologyOverride(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) { | ||
| processingExceptionHandlerSupplier = getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG); | ||
| log.info("Topology {} is overriding {} to {}", topologyName, PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG)); | ||
| } else { | ||
| processingExceptionHandlerSupplier = globalAppConfigs.getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG); | ||
| } |
There was a problem hiding this comment.
Why do you not call the same methods for creating the instance as for the DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG on line 232?
In your approach, I think configure() is never called and the variable names are really confusing, because processingExceptionHanlderSupplier is not a supplier.
There was a problem hiding this comment.
@cadonna Sorry, that was my mistake. I updated the PR to invoke getConfiguredInstance.
If I remember well, during the discussions around KIP-1033, we rule out the possibility to override the processing exception handler from the topology config (which was the reason why the config is not called DEFAULT_ ...PROCESSING_EXCEPTION_HANDLER). Considering this, I remove the condition isTopologyOverride for the processing exception handler.
| this.namedTopology = namedTopology; | ||
| } | ||
|
|
||
| public final synchronized void setProcessingExceptionHandler(final ProcessingExceptionHandler processingExceptionHandler) { |
There was a problem hiding this comment.
@loicgreffier What do you think of passing a supplier () -> getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class) for the processing exception handler to the node similar how you passed in the processing exception handler instance, node.setProcessingExceptionHandler(...). Then get the instance from the supplier during node initialization.
|
Sorry, the following comment is not correct: #16093 (comment) With my proposal we would get a processing exception handler per processor node. Let me re-iterate on this. |
|
I think it is fine to set the processing exception handler in the |
d0ac3b4 to
1106826
Compare
a5453ca to
e8b560c
Compare
cadonna
left a comment
There was a problem hiding this comment.
Thank for the updates!
Sorry for the delay of the review, but I was quite busy in the last weeks.
| keyClass, | ||
| valueClass), | ||
| e); | ||
| } catch (final FailedProcessingException | TaskCorruptedException e) { |
There was a problem hiding this comment.
Are you sure, you do not also need to consider TaskMigratedException here? I think, you need. Could you please also check other exceptions?
There was a problem hiding this comment.
I test is needed that verifies whether the TaskCorruptedException etc. are ignored by the handler.
There was a problem hiding this comment.
Exceptions with a special treatment in the StreamThread have been ignored:
TaskMigratedException (added to ignore) and TaskCorruptedException have been ignored. I believe users won't expect these exceptions to be handled, that's why they're just rethrown as is.
UnsupportedVersionException should not need to be ignored as it cannot be thrown during process.
I've added a @ParameterizedTest that asserts FailedProcessingException, TaskCorruptedException and TaskMigratedException are ignored on purpose.
| assertEquals("java.lang.RuntimeException: FailedProcessingException should not be caught and handled by the processing exception handler.", | ||
| failedProcessingException.getMessage()); |
There was a problem hiding this comment.
I would prefer to verify the cause and the message of the cause. I am not sure the format of the message when the exception does not have its own message but uses the message of the cause is standardized in Java. Different java implementations might have slightly different formats that would fail the test.
There was a problem hiding this comment.
I updated this test to a parameterized test that checks the cause and the cause message for a given exception to ignore. Let me know if you find it acceptable
0eb864e to
342ab79
Compare
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the updates @loicgreffier , @sebastienviale , and @Dabz!
I just have one comment on my own proposal 🙂
For the rest LGTM!
Could you write a message to the mailing thread [DISCUSS] Apache Kafka 3.9.0 release asking if KIP-1033 could be added to that release?
That is the last PR for KIP-1033, right?
| } | ||
|
|
||
| @Test | ||
| public void shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerResponsesWithFail() { |
There was a problem hiding this comment.
nit: I realized that in English there is not verb "to response", so you should call this method shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerRepliesWithFail()
The same applies to the method below.
Definitely my fault caused by me not being a native speaker.
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com> KAFKA-16448: Catch and handle processing exceptions Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
c559dee to
8eb1300
Compare
Regarding KIP-1033, there are 3 PRs remaining:
All PRs would need to be rebased after this one has been merged to ease the review. |
|
|
||
| this.applicationConfigs = globalAppConfigs; | ||
| this.topologyOverrides = topologyOverrides; | ||
| this.processingExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class); |
There was a problem hiding this comment.
Why do we need to use a supplier here?
There was a problem hiding this comment.
Because Streams creates a processing exception handler per task.
See
There was a problem hiding this comment.
Sure, but line 288 could just be
globalAppConfigs.getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class
Not sure why we need the supplier indirection?
For the existing timestampExtractorSupplier and deserializationExceptionHandlerSupplier is a little different because we setup different suppliers for different cases (so using suppliers simplifies the code), but for the new one we have only one supplier and thus it seem unnecessary.
No need to change anything. We follow an established pattern, but it just was confusing to me, as it seems (strictly speaking) unnecessary to have the layer of indirection.
There was a problem hiding this comment.
processingExceptionHandler has been created following the deserializationExceptionHandler. Thus a supplier has been used.
The supplier can be removed to make the code clearer if needed.
There was a problem hiding this comment.
Yes. Understood. -- I think it's not necessary to have a supplier, but also ok to keep the code as-is.
| * {@link FailedProcessingException} is the top-level exception type generated by Kafka Streams, and indicates errors have | ||
| * occurred during a {@link org.apache.kafka.streams.processor.internals.ProcessorNode ProcessorNode's} processing. | ||
| */ | ||
| public class FailedProcessingException extends KafkaException { |
There was a problem hiding this comment.
Should we extend StreamsException instead of not KafkaException?
| public class FailedProcessingException extends KafkaException { | ||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| public FailedProcessingException(final Throwable throwable) { |
There was a problem hiding this comment.
Do we really want to catch Throwable? From my understanding, it's (1) not best practice, and (2) we only hand (correctly IMHO) Exception into the new handler, so if we would really have a Throwable at hand, we could not pass it into the handler anyway.
| PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately."); | ||
| throw new FailedProcessingException(e); | ||
| } else { | ||
| droppedRecordsSensor.record(); |
There was a problem hiding this comment.
Should we log here? How do we do this elsewhere? I know that the default handlers log already (so might be redundant), but a custom handler might not log...? Just curious.
There was a problem hiding this comment.
I also thought about logging here. But since we had quite some issues with log pressure due to dropped records, I thought, it would be better to let the handler log.
| throw exception; | ||
| } catch (final RuntimeException e) { | ||
| // Do not keep the failed processing exception in the stack trace | ||
| final Throwable processingException = e instanceof FailedProcessingException ? e.getCause() : e; |
There was a problem hiding this comment.
We should not use instanceof but add a new catch (FailedProcessingException) block to cover this case.
There was a problem hiding this comment.
Having a catch (FailedProcessingException) will be now required anyway since FailedProcessingException will now extend StreamsException, and we don't FailedProcessingException to be caught by the catch (StreamsException) block from above.
| } | ||
|
|
||
| @Override | ||
| public boolean equals(final Object other) { |
There was a problem hiding this comment.
Why do we add this overload? (Same for hashCode() below). -- Seems unnecessary.
There was a problem hiding this comment.
I see. Thanks. "Interesting" -- or should I rather say "annoying" 😁
This PR is part of KAFKA-16448 (KIP-1033) which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR actually catches processing exceptions. Authors: @Dabz @sebastienviale @loicgreffier Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR actually catches processing exceptions.
Jira: https://issues.apache.org/jira/browse/KAFKA-16448.
Contributors
@Dabz
@sebastienviale
@loicgreffier
Depends On
#16092
#16090
Committer Checklist (excluded from commit message)