diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ba28341d1a3ac..8c96ce71237a1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -193,7 +193,7 @@
+ files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8b253c6e16abb..6f2edd442b049 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -29,12 +29,14 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.TopologyConfig.TaskConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
@@ -63,6 +65,7 @@
import java.util.stream.Collectors;
import static java.util.Collections.singleton;
+import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeRecordSensor;
@@ -101,6 +104,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final Sensor restoreRemainingSensor;
private final Sensor punctuateLatencySensor;
private final Sensor bufferedRecordsSensor;
+ private final Sensor droppedRecordsSensor;
private final Map e2eLatencySensors = new HashMap<>();
private final RecordQueueCreator recordQueueCreator;
@@ -160,6 +164,7 @@ public StreamTask(final TaskId id,
processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
+ droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, taskId, streamsMetrics);
for (final String terminalNodeName : topology.terminalNodes()) {
e2eLatencySensors.put(
@@ -915,15 +920,48 @@ public void punctuate(final ProcessorNode, ?, ?, ?> node,
try {
maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateLatencySensor);
- } catch (final StreamsException e) {
+ } catch (final FailedProcessingException e) {
+ throw createStreamsException(node.name(), e.getCause());
+ } catch (final TaskCorruptedException | TaskMigratedException e) {
throw e;
- } catch (final RuntimeException e) {
- throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e);
+ } catch (final Exception e) {
+ final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
+ null,
+ recordContext.topic(),
+ recordContext.partition(),
+ recordContext.offset(),
+ recordContext.headers(),
+ node.name(),
+ id()
+ );
+
+ final ProcessingExceptionHandler.ProcessingHandlerResponse response;
+
+ try {
+ response = processingExceptionHandler.handle(errorHandlerContext, null, e);
+ } catch (final Exception fatalUserException) {
+ throw new FailedProcessingException(fatalUserException);
+ }
+
+ if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
+ log.error("Processing exception handler is set to fail upon" +
+ " a processing error. If you would rather have the streaming pipeline" +
+ " continue after a processing error, please set the " +
+ PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
+
+ throw createStreamsException(node.name(), e);
+ } else {
+ droppedRecordsSensor.record();
+ }
} finally {
processorContext.setCurrentNode(null);
}
}
+ private StreamsException createStreamsException(final String processorName, final Throwable cause) {
+ return new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, processorName), cause);
+ }
+
@SuppressWarnings("unchecked")
private void updateProcessorContext(final ProcessorNode, ?, ?, ?> currNode,
final long wallClockTime,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 817ffe1f74dba..a8771c21539da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -18,6 +18,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -45,14 +46,19 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
+import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
@@ -121,6 +127,7 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -240,13 +247,18 @@ private static StreamsConfig createConfig(final String enforcedProcessingValue)
}
private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue) {
- return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName());
+ return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class.getName());
+ }
+
+ private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue, final String deserializationExceptionHandler) {
+ return createConfig(eosConfig, enforcedProcessingValue, deserializationExceptionHandler, LogAndFailProcessingExceptionHandler.class.getName());
}
private static StreamsConfig createConfig(
final String eosConfig,
final String enforcedProcessingValue,
- final String deserializationExceptionHandler) {
+ final String deserializationExceptionHandler,
+ final String processingExceptionHandler) {
final String canonicalPath;
try {
canonicalPath = BASE_DIR.getCanonicalPath();
@@ -262,7 +274,8 @@ private static StreamsConfig createConfig(
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig),
mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue),
- mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler)
+ mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler),
+ mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandler)
)));
}
@@ -2647,6 +2660,124 @@ public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
verify(recordCollector, never()).offsets();
}
+ @Test
+ public void shouldPunctuateNotHandleFailProcessingExceptionAndThrowStreamsException() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName()));
+
+ final StreamsException streamsException = assertThrows(StreamsException.class, () ->
+ task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
+ throw new FailedProcessingException(
+ new RuntimeException("KABOOM!")
+ );
+ })
+ );
+
+ assertInstanceOf(RuntimeException.class, streamsException.getCause());
+ assertEquals("KABOOM!", streamsException.getCause().getMessage());
+ }
+
+ @Test
+ public void shouldPunctuateNotHandleTaskCorruptedExceptionAndThrowItAsIs() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName()));
+
+ final Set tasksIds = new HashSet<>();
+ tasksIds.add(new TaskId(0, 0));
+ final TaskCorruptedException expectedException = new TaskCorruptedException(tasksIds, new InvalidOffsetException("Invalid offset") {
+ @Override
+ public Set partitions() {
+ return new HashSet<>(Collections.singletonList(new TopicPartition("topic", 0)));
+ }
+ });
+
+ final TaskCorruptedException taskCorruptedException = assertThrows(TaskCorruptedException.class, () ->
+ task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
+ throw expectedException;
+ })
+ );
+
+ assertEquals(expectedException, taskCorruptedException);
+ }
+
+ @Test
+ public void shouldPunctuateNotHandleTaskMigratedExceptionAndThrowItAsIs() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName()));
+
+ final TaskMigratedException expectedException = new TaskMigratedException("TaskMigratedException", new RuntimeException("Task migrated cause"));
+
+ final TaskMigratedException taskCorruptedException = assertThrows(TaskMigratedException.class, () ->
+ task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
+ throw expectedException;
+ })
+ );
+
+ assertEquals(expectedException, taskCorruptedException);
+ }
+
+ @Test
+ public void shouldPunctuateNotThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithContinue() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName()));
+
+ assertDoesNotThrow(() ->
+ task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
+ throw new KafkaException("KABOOM!");
+ })
+ );
+ }
+
+ @Test
+ public void shouldPunctuateThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithFail() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class.getName()));
+
+ final StreamsException streamsException = assertThrows(StreamsException.class,
+ () -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
+ throw new KafkaException("KABOOM!");
+ }));
+
+ assertInstanceOf(KafkaException.class, streamsException.getCause());
+ assertEquals("KABOOM!", streamsException.getCause().getMessage());
+ }
+
+ @Test
+ public void shouldPunctuateThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(), ProcessingExceptionHandlerMock.class.getName()));
+
+ final FailedProcessingException streamsException = assertThrows(FailedProcessingException.class,
+ () -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
+ throw new KafkaException("KABOOM!");
+ }));
+
+ assertInstanceOf(RuntimeException.class, streamsException.getCause());
+ assertEquals("KABOOM from ProcessingExceptionHandlerMock!", streamsException.getCause().getMessage());
+ }
+
+ public static class ProcessingExceptionHandlerMock implements ProcessingExceptionHandler {
+ @Override
+ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record, ?> record, final Exception exception) {
+ throw new RuntimeException("KABOOM from ProcessingExceptionHandlerMock!");
+ }
+ @Override
+ public void configure(final Map configs) {
+ // No-op
+ }
+ }
private ProcessorStateManager mockStateManager() {
final ProcessorStateManager manager = mock(ProcessorStateManager.class);