Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
3d9735c
Add AcknowledgingRecordSupplier interface and AcknowledgeType enum fo…
Shekharrajak Apr 14, 2026
8351a82
Add ShareGroupIndexTask and ShareGroupIndexTaskIOConfig for Kafka sha…
Shekharrajak Apr 14, 2026
ac827e7
Add KafkaShareGroupRecordSupplier wrapping KafkaShareConsumer with ex…
Shekharrajak Apr 14, 2026
da77a47
Add ShareGroupIndexTaskRunner with poll-parse-publish-ack ingestion loop
Shekharrajak Apr 14, 2026
10fa462
Register ShareGroupIndexTask in KafkaIndexTaskModule and bump Kafka c…
Shekharrajak Apr 14, 2026
24f1b19
Add unit tests for ShareGroupIndexTaskIOConfig, KafkaShareGroupRecord…
Shekharrajak Apr 14, 2026
6b4a40e
Add E2E test for share group ingestion with embedded Druid cluster an…
Shekharrajak Apr 14, 2026
3187c96
Add share group ingestion documentation with usage guide and examples
Shekharrajak Apr 14, 2026
6ffbfcf
Add ShareGroupKafkaResource with share group broker config for E2E tests
Shekharrajak Apr 14, 2026
57fbdfd
Fix IOConfig test: add type field to JSON deserialization test
Shekharrajak Apr 14, 2026
cb8c96d
Fix unit tests: add TimestampSpec to DataSchema, use multi-topic Kafk…
Shekharrajak Apr 14, 2026
6593b57
Add end-to-end demo runbook for share group ingestion with Druid UI
Shekharrajak Apr 15, 2026
1e13898
Implement PendingSegmentAllocatingTask for segment allocation compat
Shekharrajak Apr 15, 2026
f0aecb1
Enable APPEND lock type for share group task to support concurrent se…
Shekharrajak Apr 15, 2026
5703d98
Set share.acknowledgement.mode=explicit for ShareConsumer to enable e…
Shekharrajak Apr 15, 2026
928c0e5
Add RENEW ack type, wakeup() and acquisitionLockTimeoutMs() to Acknow…
Shekharrajak May 7, 2026
0a52999
Promote StreamChunkReader to public and widen parse() with PECS for c…
Shekharrajak May 7, 2026
9cc61a7
Add ShareGroupConsumerProperties.sanitize() to strip Kafka 4.2.0 shar…
Shekharrajak May 7, 2026
7b9ea57
Use clean acknowledge overload, add RENEW/wakeup/lockTimeout/sanitize…
Shekharrajak May 7, 2026
0f9bb3c
Fix multi-row data loss, add DIP factory, push/persist, try-finally, …
Shekharrajak May 7, 2026
5b33214
Add embedded IT validating multi-row JSON-array ingestion via share g…
Shekharrajak May 7, 2026
1413d39
Document consumer-property restrictions, graceful stop, lock duration…
Shekharrajak May 7, 2026
431e192
Fix share-consumer ack to use cached ConsumerRecord reference
Shekharrajak May 8, 2026
436f818
Register TaskRealtimeMetricsMonitor in share-group runner
Shekharrajak May 8, 2026
31579e6
Tighten share-group task and consumer-property javadocs
Shekharrajak May 8, 2026
7d7bd13
Tighten seekablestream common javadocs
Shekharrajak May 8, 2026
fd09eca
Update share-group unit tests for new ack contract
Shekharrajak May 8, 2026
bc61817
Move share-group ITs to embedded-tests and add probe test
Shekharrajak May 8, 2026
210cd04
Pin kafka-clients 4.2.0 in embedded-tests
Shekharrajak May 8, 2026
2356d65
Update docs for embedded-tests share-group IT location
Shekharrajak May 8, 2026
d2e94de
Trim share-group code comments to essentials
Shekharrajak May 8, 2026
446e004
Tighten share-group docs and fix spellcheck
Shekharrajak May 8, 2026
461b50b
Wait for segments before SQL in share-group IT
Shekharrajak May 8, 2026
1226944
Retry SQL row count to absorb broker catalog refresh lag
Shekharrajak May 8, 2026
52dfecc
fix: checkstyle violations in share-group sources and tests
Shekharrajak May 8, 2026
0b53ae7
fix: forbidden-API violations in share-group code
Shekharrajak May 8, 2026
d2882c0
ShareGroupIndexTask: use realtime task priority (75)
Shekharrajak May 9, 2026
1a8bed2
chore(test): use ${apache.kafka.version} for kafka-clients in embedde…
Shekharrajak May 13, 2026
244ffde
docs: drop Phase 1/Phase 2 phrasing in share-group ingestion doc
Shekharrajak May 13, 2026
6a25c78
docs: drop remaining Phase 1/Phase 2 phrasing in tuningConfig section
Shekharrajak May 13, 2026
5231b7c
docs: drop Phase 1 phrasing in tuningConfig intro
Shekharrajak May 13, 2026
40d5ed7
docs(share-group): replace download steps with source build for the demo
Shekharrajak May 13, 2026
626b468
docs(share-group): offer source-build (recommended) and JAR-overlay o…
Shekharrajak May 14, 2026
f182aae
fix(checkstyle): drop unused mockito imports and move static block to…
Shekharrajak May 14, 2026
a25b7e7
fix(forbidden-api): pass StandardCharsets.UTF_8 to String.getBytes in…
Shekharrajak May 14, 2026
539f3c8
fix(share-group): re-sanitise consumer props after dynamic-config pro…
Shekharrajak May 14, 2026
8cbbd70
docs(share-group): set share.auto.offset.reset=earliest before produc…
Shekharrajak May 14, 2026
bff1344
docs(share-group): backtick ABI and 37.x to satisfy mdspell
Shekharrajak May 14, 2026
2d094d5
test(share-group): align assertions with Optional topic and non-null …
Shekharrajak May 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 350 additions & 0 deletions docs/ingestion/kafka-share-group-ingestion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
---
id: kafka-share-group-ingestion
title: "Kafka share group ingestion"
sidebar_label: "Kafka share group ingestion"
description: "Queue-semantics ingestion from Apache Kafka using share groups (KIP-932). Scale consumers beyond partition count with at-least-once delivery."
---

<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

:::info
Requires Apache Kafka 4.0 or higher with share groups (KIP-932) enabled on the broker.
:::

## Overview

Kafka share groups (KIP-932) let multiple consumers read from the same partition concurrently. The broker manages per-record acquisition locks and explicit acknowledgement, so consumer count is not capped by partition count, joining or leaving consumers does not pause the group, and a slow record does not block its partition.

Druid's `ShareGroupIndexTask` consumes from a share group and publishes segments with at-least-once delivery: records are acknowledged only after their segments are atomically registered in the metadata store.

## When to use share group ingestion

| Scenario | Consumer group | Share group |
|----------|---------------|-------------|
| Workers needed exceed partition count | Idle workers | All workers active |
| Elastic scaling (auto-scale events) | Rebalancing pause (30-60s) | Zero pause |
| Per-message processing time varies | Head-of-line blocking | Independent processing |
| Ordered processing required per partition | Yes | No (delivery order not guaranteed) |

Choose share groups when throughput and elastic scaling matter more than strict per-partition ordering.

## Task spec

Submit a `ShareGroupIndexTask` to the Overlord. There are no start/end offsets -- the broker tracks them.

```json
{
"type": "index_kafka_share_group",
"dataSchema": {
"dataSource": "my_datasource",
"timestampSpec": {
"column": "__time",
"format": "auto"
},
"dimensionsSpec": {
"useSchemaDiscovery": true
},
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"topic": "my_topic",
"groupId": "druid-share-group",
"consumerProperties": {
"bootstrap.servers": "kafka-broker:9092"
},
"inputFormat": {
"type": "json"
},
"pollTimeout": 2000
},
"tuningConfig": {
"type": "KafkaTuningConfig",
"maxRowsPerSegment": 5000000
}
}
```

## IO configuration

| Property | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `topic` | String | Yes | -- | Kafka topic to consume from. |
| `groupId` | String | Yes | -- | Share group identifier. Multiple tasks with the same `groupId` share the workload. |
| `consumerProperties` | Map | Yes | -- | Kafka consumer properties. Must include `bootstrap.servers`. See [Consumer property restrictions](#consumer-property-restrictions). |
| `inputFormat` | Object | Yes | -- | Input format for parsing records (json, csv, avro, etc.). |
| `pollTimeout` | Long | No | 2000 | Poll timeout in milliseconds. |

### Consumer property restrictions

Share consumers (KIP-932) reject some keys that are valid for regular consumer groups. Druid silently strips the keys below from `consumerProperties` (with a `WARN` log per stripped key) before constructing the `KafkaShareConsumer`:

| Stripped key | Why |
|--------------|-----|
| `auto.offset.reset` | Initial position is broker-controlled for share groups. |
| `enable.auto.commit` | Share consumers always require explicit `acknowledge()` + `commitSync()`. |
| `group.instance.id` | Share groups do not support static membership. |
| `isolation.level` | Always read-committed for share groups. |
| `partition.assignment.strategy` | Broker controls per-record delivery for share groups. |
| `interceptor.classes` | Not supported for share consumers. |
| `session.timeout.ms` | Share groups have no consumer-group session model. |
| `heartbeat.interval.ms` | Share groups have no heartbeat. |
| `group.protocol` | Always `SHARE` for share consumers. |
| `group.remote.assignor` | Not applicable to share groups. |

`share.acknowledgement.mode=explicit` is set automatically and must not be overridden.

### Tuning configuration

`tuningConfig` accepts the standard `KafkaTuningConfig` fields. The runner currently honors:

- `maxRowsInMemory` / `maxBytesInMemory`: triggers a mid-batch persist when the appenderator signals `isPersistRequired`.
- `maxRowsPerSegment`: when reached during a batch the runner logs the event; over-threshold segments are pushed at the end-of-batch publish boundary.

Mid-batch checkpoint and sequence rollover are not supported.

## How it works

1. The task subscribes to the topic with a `KafkaShareConsumer` using the configured `groupId`.
2. The broker delivers batches of records with per-record acquisition locks.
3. Each polled record is parsed by `StreamChunkReader` (the same multi-row parser as `KafkaIndexTask`); a record may produce zero, one, or many `InputRow`s. All resulting rows are added to the appenderator before the record is acknowledged.
4. Parse failures go through `ParseExceptionHandler` (so `maxParseExceptions` is honored). Bytes/processed/unparseable counters are incremented exactly once per row.
5. Segments persist mid-batch on memory pressure and unconditionally at end-of-batch, then publish atomically via `SegmentTransactionalAppendAction`.
6. After a successful publish, every offset in the batch is acknowledged with `ACCEPT` and a `commitSync()` flushes acknowledgements to the broker.
7. On task failure or graceful stop before publish, unacknowledged records are redelivered by the broker after the acquisition lock expires.

## Safety invariants

1. **ACK after publish:** `ACCEPT` is sent only after the segment is registered in the metadata store. No data loss on task failure.
2. **Multi-row safe:** every row produced from a record is added to the appenderator before that record is acknowledged.
3. **Resource safe:** `Appenderator` and `KafkaShareConsumer` are released on every exit path.
4. **Terminal state:** every polled record reaches exactly one terminal state -- `ACCEPT`, `RELEASE`, or broker redelivery after lock expiry.

## Graceful stop

When the Overlord asks a task to stop, the runner calls `KafkaShareConsumer.wakeup()`. The in-flight `poll()` throws `WakeupException`; the runner exits the loop after committing any in-flight batch. Records polled but not yet published remain unacknowledged and are redelivered by the broker after the acquisition lock expires.

## Acquisition lock duration

The broker controls the lock via `group.share.record.lock.duration.ms`. The runner logs the effective value once after the first poll:

```
Effective broker acquisition lock timeout for share-group[my-group]: 30000 ms
```

A single thread does both poll and publish. If a batch exceeds the lock duration, in-flight records may be redelivered (duplicates). Tune `pollTimeout`, `maxRowsInMemory`, and `maxRowsPerSegment` so each cycle stays well under the lock window.

## Scaling

Tasks with the same `groupId` share the workload automatically; you can run more tasks than partitions:

```
Topic: 4 partitions
Tasks with same groupId: 20
Result: All 20 tasks actively consuming (broker distributes records)
```

Adding or removing tasks does not trigger a rebalancing pause.

## Delivery semantics

At-least-once. On task failure, records between the last committed acknowledgement and the failure point are redelivered, which may produce duplicates across restarts. A deduplication cache is planned.

## Metrics

In addition to the standard ingestion metrics (`ingest/events/processed`, `ingest/events/unparseable`, `ingest/persists/count`, etc.), share-group ingestion emits:

| Metric | Description |
|--------|-------------|
| `ingest/shareGroup/commitFailures` | Per-batch count of partitions whose `commitSync()` failed. A non-zero value means the affected records will be redelivered; alert on sustained non-zero values. |

## Limitations (current release)

- Single-threaded ingestion per task; a future enhancement may add a background `RENEW` thread to extend the broker lock for long-running batches.
- No supervisor integration; tasks are submitted manually via the Overlord API. A `KafkaShareGroupSupervisor` is planned as a future enhancement.
- No deduplication cache (at-least-once).
- Delivery order within a partition is not guaranteed.
- Mid-batch checkpoint / sequence rollover is not supported. If a batch grossly exceeds `maxRowsPerSegment` the runner still publishes correctly (multiple segments per batch), but the threshold is only checked at end-of-batch boundaries.

## Demo: end-to-end validation with Druid UI

### Prerequisites

- Java 17
- Kafka 4.2.0 (with share groups enabled)
- Druid checked out from this repository (built from source)

### Step 1: Start Kafka with share groups

```bash
cd kafka_2.13-4.2.0

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

echo "group.share.enable=true" >> config/server.properties
echo "group.share.record.lock.duration.ms=30000" >> config/server.properties

bin/kafka-server-start.sh config/server.properties
```

### Step 2: Create topic and configure the share group

```bash
cd kafka_2.13-4.2.0

bin/kafka-topics.sh --create --topic druid-share-test --partitions 4 --bootstrap-server localhost:9092

# Set share-group reset to earliest so the task picks up records that already exist
# in the topic. The default broker setting is 'latest', which would skip pre-existing
# records and ingest zero rows even though the producer ran successfully.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
--entity-type groups --entity-name druid-demo-share-group \
--add-config share.auto.offset.reset=earliest
```

### Step 3: Produce sample messages

```bash
cd kafka_2.13-4.2.0

bin/kafka-console-producer.sh --topic druid-share-test --bootstrap-server localhost:9092
Comment thread
Shekharrajak marked this conversation as resolved.
```

Paste these JSON records:

```json
{"__time":"2025-06-01T00:00:00.000Z","item":"widget_a","value":100,"category":"electronics"}
{"__time":"2025-06-01T01:00:00.000Z","item":"widget_b","value":250,"category":"clothing"}
{"__time":"2025-06-01T02:00:00.000Z","item":"widget_c","value":50,"category":"electronics"}
{"__time":"2025-06-01T03:00:00.000Z","item":"widget_d","value":175,"category":"food"}
{"__time":"2025-06-01T04:00:00.000Z","item":"widget_e","value":320,"category":"electronics"}
```

### Step 4: Build Druid and run it

You can run the demo against either a freshly built Druid distribution or an existing stable Druid binary with the share-group JARs overlaid. Pick the one that matches your environment.

#### Option A: Build the full Druid distribution from source (recommended)

Builds the full distribution from this repository so the share-group code is packaged natively, with no JAR overlay required:

```bash
cd /path/to/druid
JAVA_HOME=$(/usr/libexec/java_home -v 17) \
mvn clean install -Pdist -T1C -DskipTests \
-Dforbiddenapis.skip=true -Dcheckstyle.skip=true \
-Dpmd.skip=true -Dmaven.javadoc.skip=true -Denforcer.skip=true

tar -xzf distribution/target/apache-druid-*-bin.tar.gz -C /tmp
cd /tmp/apache-druid-*

bin/start-druid
```

> Tip: For faster iteration, build only the `kafka-indexing-service` module with `mvn package -pl extensions-core/kafka-indexing-service -am -DskipTests -T1C` and overlay the resulting JAR onto the distribution from a previous full build (Option B steps below).

#### Option B: Overlay the share-group JAR onto a downloaded Druid binary (faster, best-effort)

If you already have a Druid release binary and want to avoid a full source build, you can replace the kafka-indexing-service JAR in that distribution with the one built from this branch.

> Caveat: The branch builds against `38.0.0-SNAPSHOT`. Druid does **not** guarantee extension `ABI` compatibility across major versions, so the overlay may fail at runtime against an older binary (`37.x` or earlier). Use the most recent stable Druid release available, and prefer Option A for a reliable demo.

```bash
cd /path/to/druid
JAVA_HOME=$(/usr/libexec/java_home -v 17) mvn package \
-pl extensions-core/kafka-indexing-service -am \
-Pskip-static-checks -DskipTests -T1C -q

# Use the latest stable Druid release available; 37.0.0 is the example below.
DRUID_VERSION=37.0.0
cd /tmp
curl -O "https://dlcdn.apache.org/druid/${DRUID_VERSION}/apache-druid-${DRUID_VERSION}-bin.tar.gz"
tar -xzf "apache-druid-${DRUID_VERSION}-bin.tar.gz"
cd "apache-druid-${DRUID_VERSION}"

rm extensions/druid-kafka-indexing-service/*.jar
cp /path/to/druid/extensions-core/kafka-indexing-service/target/druid-kafka-indexing-service-*.jar \
extensions/druid-kafka-indexing-service/
cp ~/.m2/repository/org/apache/kafka/kafka-clients/4.2.0/kafka-clients-4.2.0.jar \
extensions/druid-kafka-indexing-service/

bin/start-druid
```

### Step 5: Submit task via Druid console

Open `http://localhost:8888`, go to the **Ingestion** tab, click **Submit JSON task**, and paste:

```json
{
"type": "index_kafka_share_group",
"dataSchema": {
"dataSource": "share_group_demo",
"timestampSpec": {"column": "__time", "format": "auto"},
"dimensionsSpec": {"useSchemaDiscovery": true},
"granularitySpec": {"segmentGranularity": "DAY", "queryGranularity": "NONE"}
},
"ioConfig": {
"type": "kafka_share_group",
"topic": "druid-share-test",
"groupId": "druid-demo-share-group",
"consumerProperties": {"bootstrap.servers": "localhost:9092"},
"inputFormat": {"type": "json"},
"pollTimeout": 2000
},
"tuningConfig": {"type": "KafkaTuningConfig"}
}
```

### Step 6: Query data

Go to the **Query** tab and run:

```sql
SELECT COUNT(*) AS total_rows FROM share_group_demo;
SELECT category, COUNT(*) AS cnt, SUM(value) AS total FROM share_group_demo GROUP BY category;
```

## Running tests

Unit tests:

```bash
mvn test -pl extensions-core/kafka-indexing-service \
-Dtest="org.apache.druid.indexing.kafka.ShareGroupIndexTaskIOConfigTest,\
org.apache.druid.indexing.kafka.KafkaShareGroupRecordSupplierTest,\
org.apache.druid.indexing.kafka.ShareGroupIndexTaskTest,\
org.apache.druid.indexing.kafka.ShareGroupIndexTaskRunnerTest,\
org.apache.druid.indexing.kafka.ShareGroupConsumerPropertiesTest" \
-Dsurefire.failIfNoSpecifiedTests=false \
-Pskip-static-checks -Dweb.console.skip=true -T1C
```

E2E test (requires Docker; Testcontainers starts an `apache/kafka:4.1.1` broker with `group.share.enable=true`):

```bash
mvn test -pl embedded-tests -am \
-Dtest="org.apache.druid.testing.embedded.indexing.EmbeddedShareGroupIngestionTest" \
-Dsurefire.failIfNoSpecifiedTests=false \
-Pskip-static-checks -Dweb.console.skip=true -T1C
```
17 changes: 11 additions & 6 deletions embedded-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@
</parent>

<dependencies>
<!-- Declared first so its classes win on the test classpath over older
kafka classes shaded into druid-protobuf-extensions (Confluent
transitive). Required for share-group APIs (KIP-932) such as
ConfigResource.Type.GROUP. -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${apache.kafka.version}</version>
<scope>test</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.druid</groupId>
Expand Down Expand Up @@ -532,12 +543,6 @@
<artifactId>commons-codec</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${apache.kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand Down
Loading
Loading