KAFKA-16448: Add ProcessingExceptionHandler in Streams configuration#16092
Conversation
…ions Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
…ions Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
cadonna
left a comment
There was a problem hiding this comment.
@loicgreffier Thanks for the PR!
Here my comment.
Since today is feature freeze, it would be great if you could make the changes as soon as possible.
| .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, | ||
| Type.CLASS, | ||
| LogAndFailProcessingExceptionHandler.class.getName(), | ||
| Importance.MEDIUM, | ||
| PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) |
There was a problem hiding this comment.
Could you please add unit tests in StreamsConfigTest?
I think you need the following unit tests:
- One where config is not set to verify the default value
- One where the config is set and you get the correct instance from
processingExceptionHandler().
…ndler Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
…ndler Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
…fig' of https://github.com/loicgreffier/kafka into KAFKA-16448-Add-Processing-Exception-Handler-StreamsConfig
…fig' of https://github.com/loicgreffier/kafka into KAFKA-16448-Add-Processing-Exception-Handler-StreamsConfig Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
…fig' of https://github.com/loicgreffier/kafka into KAFKA-16448-Add-Processing-Exception-Handler-StreamsConfig
…eptionHandler Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the updates, @loicgreffier and @sebastienviale!
After addressing my comment, I will approve and merge this PR.
| @Test | ||
| public void testInvalidProcessingExceptionHandler() { | ||
| props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler"); | ||
| assertThrows(ConfigException.class, () -> new StreamsConfig(props)); |
There was a problem hiding this comment.
Could you please also verify the message?
| assertThrows(ConfigException.class, () -> new StreamsConfig(props)); | |
| final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props)); | |
| assertThat( | |
| exception.getMessage(), | |
| containsString("<the message to verify>") | |
| ); |
There was a problem hiding this comment.
Verification has been added
…ptionHandler Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
cadonna
left a comment
There was a problem hiding this comment.
Thanks @loicgreffier @sebastienviale !
LGTM!
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
85060e6 to
60bba3b
Compare
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
|
@cadonna I've rebased this PR so file changes are now clear. Do we need to squash commits manually or is it automatically done when merging ? |
…fig' of https://github.com/loicgreffier/kafka into KAFKA-16448-Add-Processing-Exception-Handler-StreamsConfig
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Thanks! Squashing happens when merging. Don't worry! |
…uration (apache#16092)" This reverts commit 3f70c46.
…pache#16092) This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR brings ProcessingExceptionHandler in Streams configuration. Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com> Reviewer: Bruno Cadonna <cadonna@apache.org>
…uration (apache#16092)" (apache#16141) This reverts commit 3f70c46. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
…pache#16092) This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR brings ProcessingExceptionHandler in Streams configuration. Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com> Reviewer: Bruno Cadonna <cadonna@apache.org>
…uration (apache#16092)" (apache#16141) This reverts commit 3f70c46. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
…pache#16092) This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR brings ProcessingExceptionHandler in Streams configuration. Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com> Reviewer: Bruno Cadonna <cadonna@apache.org>
…uration (apache#16092)" (apache#16141) This reverts commit 3f70c46. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
…pache#16092) This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR brings ProcessingExceptionHandler in Streams configuration. Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com> Reviewer: Bruno Cadonna <cadonna@apache.org>
…uration (apache#16092)" (apache#16141) This reverts commit 3f70c46. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR brings
ProcessingExceptionHandlerin Streams configuration.Jira: https://issues.apache.org/jira/browse/KAFKA-16448.
Contributors
@Dabz
@sebastienviale
@loicgreffier
Depends On
#16090
Committer Checklist (excluded from commit message)