Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ manage failures, and ensure that the scalability and replication requirements ar
This service is provided in the `druid-kafka-indexing-service` core Apache Druid extension (see
[Including Extensions](../../development/extensions.md#loading-extensions)).

> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. These changes make the
> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. It is the default behavior of Druid and make the
> Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or
> better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade)
> if you are using older version of Kafka brokers.
> In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka.
> Make sure offsets are sequential, since there is no offset gap check in Druid anymore.

## Tutorial

Expand Down Expand Up @@ -132,7 +134,7 @@ A sample supervisor spec is shown below:
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. Users can set `isolation.level` `read_uncommitted` here if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public static Map<String, Object> getConsumerProperties()
props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
props.put("auto.offset.reset", "none");
props.put("enable.auto.commit", "false");
props.put("isolation.level", "read_committed");
Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 Nov 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to remove isolation.level here and leaves the configuration to users changes the current behavior.

Currently users care nothing about this kafka configuration when using a higher version of kafka such as 0.11. By removing it, they need to set this property in supervisor spec, or the default value, which is read_uncommitted, will be applied, which may be not what they expect.

If this is the root cause that limits the Druid to use Kafka lower than 0.11, I think maybe we can introduce another property, as the same way as pollTimeout property does, then we can unset isolation.level property according to the value of this new property. Only those who want to use Druid with older kafka need to set this new property.

Copy link
Copy Markdown
Contributor Author

@zhangyue19921010 zhangyue19921010 Nov 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It will change the current behavior.

You are right, most people really don't care this Kafka configuration(Let it be default behavior). As I know the default logic of the Kafka Producer is not to enable transactions feature and the same as Kafka Consumer. So that maybe Druid Kafka indexing service keep the same default logic is more reasonable.

Furthermore, if a Druid user want Druid to consume transactional Kafka topics, I think it is more reasonable to let users set this parameter in consumerProperties like 'bootstrap.servers' because he knows what he is going to do and why.

By the way, Druid from 0.15 to better can't consume old version Kafka is really confused me(I believe it's not just me). Generally speaking, higher Kafka consumer client is able to consume old version Kafka cluster unless it involves high-version-specific api. And this hard limitation block us to upgrade Druid cluster from 0.14.2 for a long time. If we don't remove this config, we have to upgrade PRD Kafka cluster, which is a much more heavy work to do because there are too many consumers of Kafka such as Spark and Flink. Orz...

Thanks for reviewing!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't know whether or not the users have enabled transactional message in their clusters, so changing the current behavior may cause backward compatibility problem.

Copy link
Copy Markdown
Contributor Author

@zhangyue19921010 zhangyue19921010 Nov 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense! I add a new property named consumeTransactionally in consumerProperties, which is a Kafka consumer level property to control isolation.level.
If users don't set consumeTransactionally or set consumeTransactionally true, druid will consume Kafka Transactionally by default.
Set consumeTransactionally false here can disable druid to consume Kafka Transactionally through unset isolation.level property, which means druid can consume lower version of Kafka now like 0.10.x
In this way, we don't change current Druid default behavior and provide a way to let druid consume lower version Kafka.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankChen021 Hi Frank, sorry to bother you. I have tested this change in our Druid cluster recently. What should I do next ?

return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ KafkaConsumer<byte[], byte[]> newConsumer()
configMapper,
ioConfig.getConsumerProperties()
);
props.putIfAbsent("isolation.level", "read_committed");
props.putAll(consumerConfigs);

return new KafkaConsumer<>(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ private static KafkaConsumer<byte[], byte[]> getKafkaConsumer(ObjectMapper sorti
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties props = new Properties();
addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
props.putIfAbsent("isolation.level", "read_committed");
props.putAll(consumerConfigs);

ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,47 @@ public void testRunTransactionModeRollback() throws Exception
);
}

@Test(timeout = 60_000L)
public void testRunUnTransactionMode() throws Exception
{
Map<String, Object> configs = kafkaServer.consumerProperties();
configs.put("isolation.level", "read_uncommitted");
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 13L)),
configs,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
INPUT_FORMAT
)
);

final ListenableFuture<TaskStatus> future = runTask(task);

// Insert 2 records initially
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 2)) {
kafkaProducer.send(record).get();
}
kafkaProducer.flush();
kafkaProducer.abortTransaction();
}

while (countEvents(task) != 2) {
Thread.sleep(25);
}

Assert.assertEquals(2, countEvents(task));
}

@Test(timeout = 60_000L)
public void testCanStartFromLaterThanEarliestOffset() throws Exception
{
Expand Down