KAFKA-16448: Add ErrorHandlerContext in production exception handler#16433
Conversation
f512db8 to
c60b79e
Compare
36098ac to
3fe0f59
Compare
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the PR @sebastienviale, @loicgreffier, and @Dabz!
Here my feedback!
| public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { | ||
| @Override | ||
| public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, | ||
| public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, |
There was a problem hiding this comment.
I do not think we can modify this without deprecating the old method and mentioning it in the KIP since it is part of the public API.
I am aware that this method is only called internally.
There was a problem hiding this comment.
True, we did it for the deserialization exception handler implementations. I'm wondering why we did not do the same for the production.. 😄
There was a problem hiding this comment.
@cadonna I updated KIP-1033 with a word regarding deserialization and production handler implementations
| * @param record the record that failed to serialize | ||
| * @param exception the exception that occurred during serialization |
There was a problem hiding this comment.
Could you please revert the formatting changes in this two lines?
| final ProductionExceptionHandler.SerializationExceptionOrigin origin; | ||
| if (keyBytes == null) { | ||
| origin = ProductionExceptionHandler.SerializationExceptionOrigin.KEY; | ||
| } else { | ||
| origin = ProductionExceptionHandler.SerializationExceptionOrigin.VALUE; | ||
| } |
There was a problem hiding this comment.
I think, it would be better to have something like:
try {
keyBytes = keySerializer.serialize(topic, headers, key);
} catch (final ClassCastException exception) {
handleClassCastException(...)
} catch (final Exception exception) {
handleException(ProductionExceptionHandler.SerializationExceptionOrigin.KEY, ...)
}
try {
valBytes = valueSerializer.serialize(topic, headers, value);
} catch (final ClassCastException exception) {
handleClassCastException(...)
} catch (final Exception exception) {
handleException(ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, ...)
}There was a problem hiding this comment.
Done.
I had to remove the final from keyBytes and valueBytes and initialize them when moving the ClassCastException code to a dedicated method
There was a problem hiding this comment.
If you want to keep the final and make the code a bit better readable, you could rename handleClassCastException() to createStreamsExceptionForClassCastException() and return the StreamsException from there. Then you can do
} catch (final ClassCastException exception) {
throw createStreamsExceptionForClassCastException()(
topic,
key,
value,
keySerializer,
valueSerializer,
exception);With this it is clear that the catch-branch terminates the method.
There was a problem hiding this comment.
The update has been pushed!
| assertEquals(expectedContext.recordContext().rawRecord().key(), context.sourceRawKey()); | ||
| assertEquals(expectedContext.recordContext().rawRecord().value(), context.sourceRawValue()); |
There was a problem hiding this comment.
I think you need to use assertArrayEquals() here to verify the byte arrays.
d6dc8f1 to
162d75b
Compare
94edf82 to
697e0b7
Compare
| } | ||
|
|
||
| @Test | ||
| public void shouldThrowStreamsExceptionWhenValueSerializationFailedAndProductionExceptionHandlerRepliesWithFail() { |
There was a problem hiding this comment.
I am missing the version of this test where the key serialization fails.
There was a problem hiding this comment.
done, unit test added
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the updates!
LGTM!
| * Production exception handler that always instructs streams to continue when an exception | ||
| * happens while attempting to produce result records. | ||
| */ | ||
| public class AlwaysContinueProductionExceptionHandler implements ProductionExceptionHandler { |
There was a problem hiding this comment.
Why does this PR remove this class?
There was a problem hiding this comment.
We have created a parameterized ProductionExceptionHandler (ProductionExceptionHandlerMock) in the RecordCollectorTest class that additionally does some assertions, including the origin of the exception.
Thus AlwaysContinueProductionExceptionHandler was redundant with ProductionExceptionHandlerMock, it has been replaced in this PR while updating and adding some additional unit tests
There was a problem hiding this comment.
But AlwaysContinueProductionExceptionHandler is a public built-in handler users can configure... -- it's public API, and thus has nothing to do with ProductionExceptionHandlerMock and our internal testing.
It's a build-in alternative to the default handler.
If we really want to no have it any longer, we would need to deprecate -- but we cannot just remove public API. But I don't see why we would want to deprecate and remove it?
There was a problem hiding this comment.
@mjsax Unless I'm wrong, AlwaysContinueProductionExceptionHandler belongs to src/test/java and is only used for testing purpose in the RecordCollectorTest.
Given that it is only use for testing, should it be consider public? If yes, we can obviously restore the class
There was a problem hiding this comment.
Ah. You are right. My bad. I guess I mixed it up with the two built-in handler for DeserializationExceptionHandler, but we only have the "alway fail" build-in handler for the production case...
| * | ||
| * @param record The record that failed to produce | ||
| * @param exception The exception that occurred during production | ||
| * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead |
There was a problem hiding this comment.
| * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead | |
| * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. |
| * | ||
| * @param record the record that failed to serialize | ||
| * @param exception the exception that occurred during serialization | ||
| * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead |
There was a problem hiding this comment.
| * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead | |
| * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. |
| @Deprecated | ||
| default ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, | ||
| final Exception exception) { | ||
| throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
KIP says NotImplementedException -- we should update the KIP -- same for the other method below.
There was a problem hiding this comment.
KIP has been updated
| droppedRecordsSensor.record(); | ||
| } | ||
|
|
||
| private <K, V> StreamsException createStreamsExceptionForClassCastException(final String topic, |
There was a problem hiding this comment.
Given that we introduce a helper and have two dedicated try-catch blocks for key and value now, it seems we can tailor this method to key or value similar to what we do for handleException instead of this generic one covering both at once?
There was a problem hiding this comment.
done, I hope this what you are waiting !
There was a problem hiding this comment.
Yes, and I merged this PR to make progress. However, I think, we can unify both methods into a single one, passing in "key" or "value" as parameter to avoid code duplication (similar to the "origin" parameter use in handleException(...))
The generic type would could be updated to <KV> for "key or value type" or just to <T> if you prefer.
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
…r remove rawKey and rawValue Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
…r remove rawKey and rawValue Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
…r remove rawKey and rawValue Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
42839a4 to
902ae98
Compare
|
|
||
| droppedRecordsSensor.record(); | ||
| } | ||
| private <K> StreamsException createStreamsExceptionForKeyClassCastException(final String topic, |
…16433) This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR expose the new ErrorHandlerContext as a parameter to the Production exception handler and deprecate the previous handle signature. Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com> Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
|
Thanks for the PR. Merged to |
…pache#16433) This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR expose the new ErrorHandlerContext as a parameter to the Production exception handler and deprecate the previous handle signature. Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com> Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR expose the new ErrorHandlerContext as a parameter to the Production exception handler and deprecate the previous handle signature
Jira: https://issues.apache.org/jira/browse/KAFKA-16448.
Contributors
@Dabz
@sebastienviale
@loicgreffier