Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore).java"/>
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a pity 😞 ! But I do not see a quick fix to avoid the change.


<suppress checks="MethodLength"
files="KTableImpl.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.TopologyConfig.TaskConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
Expand Down Expand Up @@ -63,6 +65,7 @@
import java.util.stream.Collectors;

import static java.util.Collections.singleton;
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeRecordSensor;

Expand Down Expand Up @@ -101,6 +104,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final Sensor restoreRemainingSensor;
private final Sensor punctuateLatencySensor;
private final Sensor bufferedRecordsSensor;
private final Sensor droppedRecordsSensor;
private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();

private final RecordQueueCreator recordQueueCreator;
Expand Down Expand Up @@ -160,6 +164,7 @@ public StreamTask(final TaskId id,
processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, taskId, streamsMetrics);

for (final String terminalNodeName : topology.terminalNodes()) {
e2eLatencySensors.put(
Expand Down Expand Up @@ -915,15 +920,48 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,

try {
maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateLatencySensor);
} catch (final StreamsException e) {
} catch (final FailedProcessingException e) {
throw createStreamsException(node.name(), e.getCause());
} catch (final TaskCorruptedException | TaskMigratedException e) {
throw e;
} catch (final RuntimeException e) {
throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e);
} catch (final Exception e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we catch Exception here, but not RuntimeException? If no checked exception is declared, it seem not to be "correct"?

Copy link
Copy Markdown
Contributor

@loicgreffier loicgreffier Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax For this reason: #16300 (comment)

Exception is caught in ProcessorNode#process, RecordCollectorImpl#send and RecordDeserializer#deserialize so the last discussion mentionned to stick to Exception here to be consistent.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I disagree to @cadonna conclusion -- we should rather catch RuntimeException everywhere (consistency yes, but the other way around).

I'll cover this on the other PR I did open.

final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null,
recordContext.topic(),
recordContext.partition(),
recordContext.offset(),
recordContext.headers(),
node.name(),
id()
);

final ProcessingExceptionHandler.ProcessingHandlerResponse response;

try {
response = processingExceptionHandler.handle(errorHandlerContext, null, e);
} catch (final Exception fatalUserException) {
Copy link
Copy Markdown
Member

@mjsax mjsax Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Why not RuntimeException -- given that no checked exception is declared for the handler(...) method, it's not possible to get a Exception here.

Same seems to apply to ProcessorNode code which has the same try-catch block.

Let me do a small PR myself to address it :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Same answer: #16300 (comment). ProcessorNode#process and StreamTask#punctuate stick with RecordCollectorImpl and RecordDeserializer.

throw new FailedProcessingException(fatalUserException);
}

if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");

throw createStreamsException(node.name(), e);
} else {
droppedRecordsSensor.record();
}
} finally {
processorContext.setCurrentNode(null);
}
}

private StreamsException createStreamsException(final String processorName, final Throwable cause) {
return new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, processorName), cause);
}

@SuppressWarnings("unchecked")
private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> currNode,
final long wallClockTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
Expand Down Expand Up @@ -45,14 +46,19 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -121,6 +127,7 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -240,13 +247,18 @@ private static StreamsConfig createConfig(final String enforcedProcessingValue)
}

private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue) {
return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName());
return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class.getName());
}

private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue, final String deserializationExceptionHandler) {
return createConfig(eosConfig, enforcedProcessingValue, deserializationExceptionHandler, LogAndFailProcessingExceptionHandler.class.getName());
}

private static StreamsConfig createConfig(
final String eosConfig,
final String enforcedProcessingValue,
final String deserializationExceptionHandler) {
final String deserializationExceptionHandler,
final String processingExceptionHandler) {
final String canonicalPath;
try {
canonicalPath = BASE_DIR.getCanonicalPath();
Expand All @@ -262,7 +274,8 @@ private static StreamsConfig createConfig(
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig),
mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue),
mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler)
mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler),
mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandler)
)));
}

Expand Down Expand Up @@ -2647,6 +2660,124 @@ public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
verify(recordCollector, never()).offsets();
}

@Test
public void shouldPunctuateNotHandleFailProcessingExceptionAndThrowStreamsException() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName()));

final StreamsException streamsException = assertThrows(StreamsException.class, () ->
task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
throw new FailedProcessingException(
new RuntimeException("KABOOM!")
);
})
);

assertInstanceOf(RuntimeException.class, streamsException.getCause());
assertEquals("KABOOM!", streamsException.getCause().getMessage());
}

@Test
public void shouldPunctuateNotHandleTaskCorruptedExceptionAndThrowItAsIs() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName()));

final Set<TaskId> tasksIds = new HashSet<>();
tasksIds.add(new TaskId(0, 0));
final TaskCorruptedException expectedException = new TaskCorruptedException(tasksIds, new InvalidOffsetException("Invalid offset") {
@Override
public Set<TopicPartition> partitions() {
return new HashSet<>(Collections.singletonList(new TopicPartition("topic", 0)));
}
});

final TaskCorruptedException taskCorruptedException = assertThrows(TaskCorruptedException.class, () ->
task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
throw expectedException;
})
);

assertEquals(expectedException, taskCorruptedException);
}

@Test
public void shouldPunctuateNotHandleTaskMigratedExceptionAndThrowItAsIs() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName()));

final TaskMigratedException expectedException = new TaskMigratedException("TaskMigratedException", new RuntimeException("Task migrated cause"));

final TaskMigratedException taskCorruptedException = assertThrows(TaskMigratedException.class, () ->
task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
throw expectedException;
})
);

assertEquals(expectedException, taskCorruptedException);
}

@Test
public void shouldPunctuateNotThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithContinue() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName()));

assertDoesNotThrow(() ->
task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
throw new KafkaException("KABOOM!");
})
);
}

@Test
public void shouldPunctuateThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithFail() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class.getName()));

final StreamsException streamsException = assertThrows(StreamsException.class,
() -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
throw new KafkaException("KABOOM!");
}));

assertInstanceOf(KafkaException.class, streamsException.getCause());
assertEquals("KABOOM!", streamsException.getCause().getMessage());
}

@Test
public void shouldPunctuateThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
LogAndFailExceptionHandler.class.getName(), ProcessingExceptionHandlerMock.class.getName()));

final FailedProcessingException streamsException = assertThrows(FailedProcessingException.class,
() -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
throw new KafkaException("KABOOM!");
}));

assertInstanceOf(RuntimeException.class, streamsException.getCause());
assertEquals("KABOOM from ProcessingExceptionHandlerMock!", streamsException.getCause().getMessage());
}

public static class ProcessingExceptionHandlerMock implements ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
throw new RuntimeException("KABOOM from ProcessingExceptionHandlerMock!");
}
@Override
public void configure(final Map<String, ?> configs) {
// No-op
}
}

private ProcessorStateManager mockStateManager() {
final ProcessorStateManager manager = mock(ProcessorStateManager.class);
Expand Down