KAFKA-16448: Handle processing exceptions in punctuate#16300
KAFKA-16448: Handle processing exceptions in punctuate#16300mjsax merged 2 commits intoapache:trunkfrom
Conversation
965d110 to
0feaeaa
Compare
9049531 to
445e271
Compare
|
@cadonna This PR contains the handling of processing exceptions in
PR is ready for a first review |
445e271 to
9ee4179
Compare
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the PR, @Dabz, @sebastienviale, and @loicgreffier!
Here my feedback
| final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(recordContext.topic(), | ||
| recordContext.partition(), recordContext.offset(), recordContext.headers(), null, null, node.name(), id); |
There was a problem hiding this comment.
nit:
| final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(recordContext.topic(), | |
| recordContext.partition(), recordContext.offset(), recordContext.headers(), null, null, node.name(), id); | |
| final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( | |
| recordContext.topic(), | |
| recordContext.partition(), | |
| recordContext.offset(), | |
| recordContext.headers(), | |
| null, | |
| null, | |
| node.name(), | |
| id | |
| ); |
| @@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node, | |||
| } catch (final StreamsException e) { | |||
There was a problem hiding this comment.
Now we have a weird situation where you can throw a StreamsException from process() and Streams will handle it in the processing handler but when you throw a StreamsException from punctuate() Streams will not handle it.
There was a problem hiding this comment.
Correct, as mentioned in my previous comment we can handle all RuntimeException and if (exception instanceof StreamsException) already, then we can throw e as is.
There was a problem hiding this comment.
Oh, I haven't seen your comment. Sorry!
Your proposal does not change anything, right? We will still have the situtation that if a user throws a StreamsException from punctuate() it will not be handled. But in process() it will be handled.
I think, we have the following options:
- We live with this asymmetry between
punctuate()andprocess(). - We do not handle
StreamsExceptioninprocess(). - We do handle
StreamsExceptioninpunctuate().
What do you exactly mean with
might end up with handling exceptions that are not desirable
There was a problem hiding this comment.
Well, in the ProcessorNode we do not handle TaskCorruptedException or TaskMigratedException which extends StreamsException for example.
Unless I'm wrong these exceptions cannot be caught in the punctuate, right? (I don't want to end up in a situation where we do not handle TaskCorruptedException in the process(), but we do in the punctuate())
👉 Considering TaskCorruptedException/TaskMigratedException cannot reach punctuate(), we updated the PR to handle all StreamsException in the punctuate.
There was a problem hiding this comment.
Just catching up. For process() we currently don't handle ClassCastException as well as FailedProcessingException | TaskCorruptedException | TaskMigratedException.
However, only rethrow ClassCastException wrapped as StreamsException() and thus might catch up "upstream" -- wondering if this is intended?
I also just looked into the call-trace and it seems we actually also have dedicate
} catch (final StreamsException exception) {
record = null;
throw exception;
} catch (final RuntimeException e) {
inside StreamTask#process but we only handle FailedProcessingException in the second catch for the RuntimeException. Is this by design? Atm FailedProcessingException extends KafkaException so it still works, but as pointed out on an already merged PR, I think it should actually extend StreamsException (https://github.com/apache/kafka/pull/16093/files#r1687232849) what implies we would need some changes.
In general, StreamsException is actually not design to be thrown by users (not to be catch by users), but it can still bubble up from process() -- So I think it does make sense to catch and pass into the handler for both cases process() and punctuate().
There was a problem hiding this comment.
If punctuate() calls context.forward() and an exception occurs in a processor I'm having doubts if the exception will either:
- Go up to the
try-catchblock we added aroundpunctuate() - Go up to the
try-catchblock we added aroundProcessorNode#process
Try-catch block on punctuate() was initially added to handle exceptions that can occurs in punctuator:
context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME,
timestamp -> {
throw new RuntimeException("Punctuate error");
});Doing further tests on this one..
There was a problem hiding this comment.
@mjsax NVM, in the following case:
builder
.stream(...)
.process(...) // punctuate() calls context.forward() here
.map((key, value) -> throw new RuntimeException());RuntimeException is being caught and handled in the ProcessorNode#process, then thrown as a FailedProcessingException and caught in the StreamTask#punctuate.
In punctuate, So I think we should:
- catch
FailedProcessingExceptioncoming from nodes, unwrap it and throw a new StreamsException ➡️ It avoids passing the exception through the processing exception handler twice. - catch
TaskCorruptedException | TaskMigratedExceptionand throw it as is ➡️ We do not want to handle them / wrap them to anything else. - catch
Exception, apply the processing exception handler and throw StreamsException ➡️ We handle anything else is not coming from a processor node.
There was a problem hiding this comment.
Yes, SGTM.
It must be a mix of what we do in ProcessingNode#process() and StreamTask#process().
| @@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node, | |||
| } catch (final StreamsException e) { | |||
| throw e; | |||
| } catch (final RuntimeException e) { | |||
There was a problem hiding this comment.
For process(), Streams catches a Exception and here it is a RuntimeException. As far as I understand the difference is that RuntimeException only catches unchecked exceptions whereas Exception catches unchecked and checked exception. Since Streams does not throw any checked exceptions from either process() or punctuate it is fine to use in both places RuntimeException.
Could you change this also for process() in this PR?
There was a problem hiding this comment.
Sorry, I have to re-iterate on this. I now realized that for all other handlers we catch Exception and not RuntimeException. To be consistent let's catch Exception here and in the ProcessorNode.
| <!-- Streams --> | ||
| <suppress checks="ClassFanOutComplexity" | ||
| files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore).java"/> | ||
| files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/> |
There was a problem hiding this comment.
That is a pity 😞 ! But I do not see a quick fix to avoid the change.
| .handle(errorHandlerContext, null, e); | ||
|
|
||
| if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { | ||
| throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); |
There was a problem hiding this comment.
Could you add a error log here?
There was a problem hiding this comment.
Done. process() and punctuate will log the same error
| .map(KeyValue::new) | ||
| .mapValues(value -> value) | ||
| .process(runtimeErrorProcessorSupplierMock()) | ||
| .process(runtimeErrorPunctuateProcessorSupplierMock()) |
There was a problem hiding this comment.
You should not test punctuate with the TopologyTestDriver. Method punctuate is called outside of the topology in the poll loop. Using the TopologyTestDriver, you basically call task.maybePunctuateSystemTime(); in driver.advanceWallClockTime() which is a helper test infra method. This is not really testing the integration of ProcessingExceptionHandler with punctuate() in the Streams runtime.
Testing the integration of ProcessingExceptionHandler in process() with the TopologyTestDriver makes a little bit more sense because you can test the case when a downstream processor throws and it should not affect an upstream processor, but that case you do not have with punctuate(). However, also for process() a real intergration test would be better.
So given the tight schedule, my proposal is to revert the changes regarding punctuate() in this file and only have unit tests. Then, you can add a real integration test in a separate PR when we merged all the rest first.
e7a767e to
56a6654
Compare
| // Rethrow exceptions that should not be handled here | ||
| throw e; | ||
| } catch (final Exception e) { | ||
| } catch (final RuntimeException e) { |
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the updates!
A last nit.
| " continue after a processing error, please set the " + | ||
| PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately."); | ||
|
|
||
| throw e instanceof StreamsException ? (StreamsException) e : new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); |
There was a problem hiding this comment.
nit:
| throw e instanceof StreamsException ? (StreamsException) e : new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); | |
| throw e instanceof StreamsException ? e : new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); |
6ccc605 to
e368db8
Compare
4aede2a to
fb50fe4
Compare
| @Override | ||
| public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { | ||
| throw new RuntimeException("KABOOM!"); | ||
| } |
There was a problem hiding this comment.
nit:
| @Override | |
| public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { | |
| throw new RuntimeException("KABOOM!"); | |
| } | |
| @Override | |
| public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { | |
| throw new RuntimeException("KABOOM!"); | |
| } |
3fdeedf to
9fd5248
Compare
| throw e; | ||
| } catch (final RuntimeException e) { | ||
| throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); | ||
| } catch (final Exception e) { |
There was a problem hiding this comment.
Why do we catch Exception here, but not RuntimeException? If no checked exception is declared, it seem not to be "correct"?
There was a problem hiding this comment.
@mjsax For this reason: #16300 (comment)
Exception is caught in ProcessorNode#process, RecordCollectorImpl#send and RecordDeserializer#deserialize so the last discussion mentionned to stick to Exception here to be consistent.
There was a problem hiding this comment.
Thanks. I disagree to @cadonna conclusion -- we should rather catch RuntimeException everywhere (consistency yes, but the other way around).
I'll cover this on the other PR I did open.
|
|
||
| try { | ||
| response = processingExceptionHandler.handle(errorHandlerContext, null, e); | ||
| } catch (final Exception fatalUserException) { |
There was a problem hiding this comment.
Same here. Why not RuntimeException -- given that no checked exception is declared for the handler(...) method, it's not possible to get a Exception here.
Same seems to apply to ProcessorNode code which has the same try-catch block.
Let me do a small PR myself to address it :)
There was a problem hiding this comment.
@mjsax Same answer: #16300 (comment). ProcessorNode#process and StreamTask#punctuate stick with RecordCollectorImpl and RecordDeserializer.
3faaede to
ef50124
Compare
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
ef50124 to
0dbe434
Compare
Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com>
mjsax
left a comment
There was a problem hiding this comment.
LGTM. Can merge after Jenkins passed.
Does this complete the KIP?
|
@mjsax This PR was the last one! If any additional updates are needed, let us know I'm updating the KIP to remove references to |
Great. This would have been my next question. Also, after you updated the wiki page, please send a follow-up email to the VOTE thread of the KIP, and call out the changes made. It's not just |
This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR actually catches processing exceptions from punctuate. Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: loicgreffier <loic.greffier@michelin.com> Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
|
Merged to |
The number of processing exception handlers that are instantiated: 1 / Stream Task. This was not initially mentioned in KIP-1033. It is has been updated. |
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR actually catches processing exceptions from punctuate.
Jira: https://issues.apache.org/jira/browse/KAFKA-16448.
Contributors
@Dabz
@sebastienviale
@loicgreffier