diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 3eb3499fb9e7..c128c1b2c476 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -33,10 +33,12 @@ manage failures, and ensure that the scalability and replication requirements ar This service is provided in the `druid-kafka-indexing-service` core Apache Druid extension (see [Including Extensions](../../development/extensions.md#loading-extensions)). -> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. These changes make the +> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. It is the default behavior of Druid and make the > Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or > better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade) > if you are using older version of Kafka brokers. +> In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. +> Make sure offsets are sequential, since there is no offset gap check in Druid anymore. ## Tutorial @@ -132,7 +134,7 @@ A sample supervisor spec is shown below: |-----|----|-----------|--------| |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes| |`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes| -|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| +|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. Users can set `isolation.level` `read_uncommitted` here if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)| |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)| diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index e9c6e79358a3..365be19cc03c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -38,7 +38,6 @@ public static Map getConsumerProperties() props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); - props.put("isolation.level", "read_committed"); return props; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index fb063532356d..7e09acf8cb89 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -97,6 +97,7 @@ KafkaConsumer newConsumer() configMapper, ioConfig.getConsumerProperties() ); + props.putIfAbsent("isolation.level", "read_committed"); props.putAll(consumerConfigs); return new KafkaConsumer<>(props); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index ad870f3ef000..d26f9340bc79 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -252,6 +252,7 @@ private static KafkaConsumer getKafkaConsumer(ObjectMapper sorti final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties props = new Properties(); addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); + props.putIfAbsent("isolation.level", "read_committed"); props.putAll(consumerConfigs); ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index aa4385e67335..fa86c49933a6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2329,6 +2329,47 @@ public void testRunTransactionModeRollback() throws Exception ); } + @Test(timeout = 60_000L) + public void testRunUnTransactionMode() throws Exception + { + Map configs = kafkaServer.consumerProperties(); + configs.put("isolation.level", "read_uncommitted"); + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 13L)), + configs, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + INPUT_FORMAT + ) + ); + + final ListenableFuture future = runTask(task); + + // Insert 2 records initially + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.limit(records, 2)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.flush(); + kafkaProducer.abortTransaction(); + } + + while (countEvents(task) != 2) { + Thread.sleep(25); + } + + Assert.assertEquals(2, countEvents(task)); + } + @Test(timeout = 60_000L) public void testCanStartFromLaterThanEarliestOffset() throws Exception {