KAFKA-16448: Add ErrorHandlerContext in deserialization exception handler#16432
Conversation
ade2811 to
0674502
Compare
32c8edf to
8f5decd
Compare
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the PR @sebastienviale , @loicgreffier, and @Dabz!
Here my feeback!
| ); | ||
|
|
||
| final StreamsException e = assertThrows(StreamsException.class, () -> recordDeserializer.deserialize(context, rawRecord)); | ||
| assertEquals(e.getMessage(), "Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately."); |
There was a problem hiding this comment.
This line is definitely too long. Please add some new lines.
| "true, false", | ||
| "false, true" | ||
| }) | ||
| public void shouldThrowStreamsExceptionWhenDeserializationFailsAndDeserializationExceptionHandlerRepliesWithFail(final boolean keyThrowsException, |
There was a problem hiding this comment.
nit:
| public void shouldThrowStreamsExceptionWhenDeserializationFailsAndDeserializationExceptionHandlerRepliesWithFail(final boolean keyThrowsException, | |
| public void shouldThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithFail(final boolean keyThrowsException, |
| "true, false", | ||
| "false, true" | ||
| }) | ||
| public void shouldNotThrowStreamsExceptionWhenDeserializationFailsAndDeserializationExceptionHandlerRepliesWithContinue(final boolean keyThrowsException, |
There was a problem hiding this comment.
nit:
| public void shouldNotThrowStreamsExceptionWhenDeserializationFailsAndDeserializationExceptionHandlerRepliesWithContinue(final boolean keyThrowsException, | |
| public void shouldNotThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithContinue(final boolean keyThrowsException, |
| @ParameterizedTest | ||
| @CsvSource({ | ||
| "true, false", | ||
| "false, true" |
There was a problem hiding this comment.
Do you also need to test "true, true"?
There was a problem hiding this comment.
true, true should fail and exit on the deserialization of the key just like true, false. As the test is parametized, it does not cost so much to add this scenario.
| sourceNodeName, | ||
| processorContext.taskId()); | ||
| response = deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException); | ||
| } catch (final Exception fatalUserException) { |
There was a problem hiding this comment.
I realized now that you did not consider that the exception handler can also throw an exception for the processing exception handler (and I as the reviewer also missed it). Could you open a separate PR that catches exceptions from the processing exception handler and re-throws them as it is done here?
Sorry for not noticing it earlier!
There was a problem hiding this comment.
@cadonna As processing exception handler handles exception at ProcessorNode, indeed exceptions during deserialization are not caught right now. Is it your point?
Maybe we should consider catching and handling exceptions when getting the nextRecord at StreamTask level right before processing it:
Going to sleep on it
There was a problem hiding this comment.
I think, we misunderstood each other. On this line an exception originating from the user-specified handler is caught. The exception can be anything that is thrown by user code. We did not consider exceptions thrown from user code for the processing exception handler.
There was a problem hiding this comment.
done in this PR #16300 for Punctuating part
589b42b to
7add8c7
Compare
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the updates @sebastienviale, @loicgreffier, and @Dabz!
LGTM!
a9d57f8 to
c4c781c
Compare
mjsax
left a comment
There was a problem hiding this comment.
Overall LGTM. A few minor things.
| * @param context processor context | ||
| * @param record record that failed deserialization | ||
| * @param exception the actual exception | ||
| * @deprecated Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} |
There was a problem hiding this comment.
| * @deprecated Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} | |
| * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead. |
There was a problem hiding this comment.
Not sure about already merged PRs, but if they also have deprecation notice, this the the format we usually use. Including the version helps user to see how far behind they are and when the can expect the old API to be remove, so we want to include it.
Can you double check other PRs so we can update stuff if necessary?
There was a problem hiding this comment.
done
I wil check other PRs to be sure
| default DeserializationHandlerResponse handle(final ProcessorContext context, | ||
| final ConsumerRecord<byte[], byte[]> record, | ||
| final Exception exception) { | ||
| throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Nit. KIP says NotImplementedException() -- we should update the KIP accordingly.
There was a problem hiding this comment.
done KIP has been updated
| final ConsumerRecord<byte[], byte[]> rawRecord, | ||
| final Logger log, | ||
| final Sensor droppedRecordsSensor) { | ||
| handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, null); |
There was a problem hiding this comment.
Why do we still need this one? Can't we update GlobalStateManagerImpl to also pass in a sourceNode.name ?
There was a problem hiding this comment.
@sebastienviale What about this comments? Merge the PR for now to make progress.
There was a problem hiding this comment.
@mjsax I've been looking for a way to update GlobalStateManagerImpl so we can remove this method. But it looks like we have no source node name in GlobalStateManagerImpl. There's a "source" processor:
But it doesn't look to be named. E.g., in the following scenario:
streamsBuilder
.addGlobalStore(storeBuilder, "PERSON_TOPIC",
Consumed.with(Serdes.String(), SerdesUtils.specificAvroValueSerdes()),
() -> (Processor<String, SpecificRecord, Void, Void>) record -> log.info("Processing record with key = {}, value = {}", record.key(), record.value()));
// Source var is this processor ⬆️There was a problem hiding this comment.
Well, the "helper" method also only calls the second one, setting sourceNodeName to null. Why can't we just call the second one from GlobalStateManagerImpl directly also setting it to null, and thus avoiding the need for the overload? The overload does not really add much value, or does it?
| this.taskId = taskId; | ||
| } | ||
|
|
||
| public DefaultErrorHandlerContext(final ProcessorContext processorContext, |
There was a problem hiding this comment.
Should this constructor replace the other one? And we just pass in null for processorContext?
Or we use setRecordContext() -- but having constructor overloads is always something I think is better to avoid if we can.
| return this.taskId; | ||
| } | ||
|
|
||
| public ProcessorContext processorContext() { |
There was a problem hiding this comment.
Should this return Optional<ProcessorContext> given that it could be null ?
…ndler Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
…ndler Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
…ndler remove rawKey and rawValue Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
…ndler remove rawKey and rawValue Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
7ffcc19 to
c5fecd7
Compare
…ndler Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
…dler (#16432) This PR is part of KIP1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR expose the new ErrorHandlerContext as a parameter to the Deserialization exception handlers and deprecate the previous handle signature. Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com> Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
|
Thanks for the PR. Merged to |
…dler (apache#16432) This PR is part of KIP1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR expose the new ErrorHandlerContext as a parameter to the Deserialization exception handlers and deprecate the previous handle signature. Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com> Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR expose the new ErrorHandlerContext as a parameter to the Deserialization exception handlers and deprecate the previous handle signatur
Jira: https://issues.apache.org/jira/browse/KAFKA-16448.
Contributors
@Dabz
@sebastienviale
@loicgreffier