diff --git a/docs/ingestion/kafka-share-group-ingestion.md b/docs/ingestion/kafka-share-group-ingestion.md new file mode 100644 index 000000000000..4a75a1a59223 --- /dev/null +++ b/docs/ingestion/kafka-share-group-ingestion.md @@ -0,0 +1,350 @@ +--- +id: kafka-share-group-ingestion +title: "Kafka share group ingestion" +sidebar_label: "Kafka share group ingestion" +description: "Queue-semantics ingestion from Apache Kafka using share groups (KIP-932). Scale consumers beyond partition count with at-least-once delivery." +--- + + + +:::info +Requires Apache Kafka 4.0 or higher with share groups (KIP-932) enabled on the broker. +::: + +## Overview + +Kafka share groups (KIP-932) let multiple consumers read from the same partition concurrently. The broker manages per-record acquisition locks and explicit acknowledgement, so consumer count is not capped by partition count, joining or leaving consumers does not pause the group, and a slow record does not block its partition. + +Druid's `ShareGroupIndexTask` consumes from a share group and publishes segments with at-least-once delivery: records are acknowledged only after their segments are atomically registered in the metadata store. + +## When to use share group ingestion + +| Scenario | Consumer group | Share group | +|----------|---------------|-------------| +| Workers needed exceed partition count | Idle workers | All workers active | +| Elastic scaling (auto-scale events) | Rebalancing pause (30-60s) | Zero pause | +| Per-message processing time varies | Head-of-line blocking | Independent processing | +| Ordered processing required per partition | Yes | No (delivery order not guaranteed) | + +Choose share groups when throughput and elastic scaling matter more than strict per-partition ordering. + +## Task spec + +Submit a `ShareGroupIndexTask` to the Overlord. There are no start/end offsets -- the broker tracks them. + +```json +{ + "type": "index_kafka_share_group", + "dataSchema": { + "dataSource": "my_datasource", + "timestampSpec": { + "column": "__time", + "format": "auto" + }, + "dimensionsSpec": { + "useSchemaDiscovery": true + }, + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "NONE" + } + }, + "ioConfig": { + "topic": "my_topic", + "groupId": "druid-share-group", + "consumerProperties": { + "bootstrap.servers": "kafka-broker:9092" + }, + "inputFormat": { + "type": "json" + }, + "pollTimeout": 2000 + }, + "tuningConfig": { + "type": "KafkaTuningConfig", + "maxRowsPerSegment": 5000000 + } +} +``` + +## IO configuration + +| Property | Type | Required | Default | Description | +|----------|------|----------|---------|-------------| +| `topic` | String | Yes | -- | Kafka topic to consume from. | +| `groupId` | String | Yes | -- | Share group identifier. Multiple tasks with the same `groupId` share the workload. | +| `consumerProperties` | Map | Yes | -- | Kafka consumer properties. Must include `bootstrap.servers`. See [Consumer property restrictions](#consumer-property-restrictions). | +| `inputFormat` | Object | Yes | -- | Input format for parsing records (json, csv, avro, etc.). | +| `pollTimeout` | Long | No | 2000 | Poll timeout in milliseconds. | + +### Consumer property restrictions + +Share consumers (KIP-932) reject some keys that are valid for regular consumer groups. Druid silently strips the keys below from `consumerProperties` (with a `WARN` log per stripped key) before constructing the `KafkaShareConsumer`: + +| Stripped key | Why | +|--------------|-----| +| `auto.offset.reset` | Initial position is broker-controlled for share groups. | +| `enable.auto.commit` | Share consumers always require explicit `acknowledge()` + `commitSync()`. | +| `group.instance.id` | Share groups do not support static membership. | +| `isolation.level` | Always read-committed for share groups. | +| `partition.assignment.strategy` | Broker controls per-record delivery for share groups. | +| `interceptor.classes` | Not supported for share consumers. | +| `session.timeout.ms` | Share groups have no consumer-group session model. | +| `heartbeat.interval.ms` | Share groups have no heartbeat. | +| `group.protocol` | Always `SHARE` for share consumers. | +| `group.remote.assignor` | Not applicable to share groups. | + +`share.acknowledgement.mode=explicit` is set automatically and must not be overridden. + +### Tuning configuration + +`tuningConfig` accepts the standard `KafkaTuningConfig` fields. The runner currently honors: + +- `maxRowsInMemory` / `maxBytesInMemory`: triggers a mid-batch persist when the appenderator signals `isPersistRequired`. +- `maxRowsPerSegment`: when reached during a batch the runner logs the event; over-threshold segments are pushed at the end-of-batch publish boundary. + +Mid-batch checkpoint and sequence rollover are not supported. + +## How it works + +1. The task subscribes to the topic with a `KafkaShareConsumer` using the configured `groupId`. +2. The broker delivers batches of records with per-record acquisition locks. +3. Each polled record is parsed by `StreamChunkReader` (the same multi-row parser as `KafkaIndexTask`); a record may produce zero, one, or many `InputRow`s. All resulting rows are added to the appenderator before the record is acknowledged. +4. Parse failures go through `ParseExceptionHandler` (so `maxParseExceptions` is honored). Bytes/processed/unparseable counters are incremented exactly once per row. +5. Segments persist mid-batch on memory pressure and unconditionally at end-of-batch, then publish atomically via `SegmentTransactionalAppendAction`. +6. After a successful publish, every offset in the batch is acknowledged with `ACCEPT` and a `commitSync()` flushes acknowledgements to the broker. +7. On task failure or graceful stop before publish, unacknowledged records are redelivered by the broker after the acquisition lock expires. + +## Safety invariants + +1. **ACK after publish:** `ACCEPT` is sent only after the segment is registered in the metadata store. No data loss on task failure. +2. **Multi-row safe:** every row produced from a record is added to the appenderator before that record is acknowledged. +3. **Resource safe:** `Appenderator` and `KafkaShareConsumer` are released on every exit path. +4. **Terminal state:** every polled record reaches exactly one terminal state -- `ACCEPT`, `RELEASE`, or broker redelivery after lock expiry. + +## Graceful stop + +When the Overlord asks a task to stop, the runner calls `KafkaShareConsumer.wakeup()`. The in-flight `poll()` throws `WakeupException`; the runner exits the loop after committing any in-flight batch. Records polled but not yet published remain unacknowledged and are redelivered by the broker after the acquisition lock expires. + +## Acquisition lock duration + +The broker controls the lock via `group.share.record.lock.duration.ms`. The runner logs the effective value once after the first poll: + +``` +Effective broker acquisition lock timeout for share-group[my-group]: 30000 ms +``` + +A single thread does both poll and publish. If a batch exceeds the lock duration, in-flight records may be redelivered (duplicates). Tune `pollTimeout`, `maxRowsInMemory`, and `maxRowsPerSegment` so each cycle stays well under the lock window. + +## Scaling + +Tasks with the same `groupId` share the workload automatically; you can run more tasks than partitions: + +``` +Topic: 4 partitions +Tasks with same groupId: 20 +Result: All 20 tasks actively consuming (broker distributes records) +``` + +Adding or removing tasks does not trigger a rebalancing pause. + +## Delivery semantics + +At-least-once. On task failure, records between the last committed acknowledgement and the failure point are redelivered, which may produce duplicates across restarts. A deduplication cache is planned. + +## Metrics + +In addition to the standard ingestion metrics (`ingest/events/processed`, `ingest/events/unparseable`, `ingest/persists/count`, etc.), share-group ingestion emits: + +| Metric | Description | +|--------|-------------| +| `ingest/shareGroup/commitFailures` | Per-batch count of partitions whose `commitSync()` failed. A non-zero value means the affected records will be redelivered; alert on sustained non-zero values. | + +## Limitations (current release) + +- Single-threaded ingestion per task; a future enhancement may add a background `RENEW` thread to extend the broker lock for long-running batches. +- No supervisor integration; tasks are submitted manually via the Overlord API. A `KafkaShareGroupSupervisor` is planned as a future enhancement. +- No deduplication cache (at-least-once). +- Delivery order within a partition is not guaranteed. +- Mid-batch checkpoint / sequence rollover is not supported. If a batch grossly exceeds `maxRowsPerSegment` the runner still publishes correctly (multiple segments per batch), but the threshold is only checked at end-of-batch boundaries. + +## Demo: end-to-end validation with Druid UI + +### Prerequisites + +- Java 17 +- Kafka 4.2.0 (with share groups enabled) +- Druid checked out from this repository (built from source) + +### Step 1: Start Kafka with share groups + +```bash +cd kafka_2.13-4.2.0 + +KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" +bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties + +echo "group.share.enable=true" >> config/server.properties +echo "group.share.record.lock.duration.ms=30000" >> config/server.properties + +bin/kafka-server-start.sh config/server.properties +``` + +### Step 2: Create topic and configure the share group + +```bash +cd kafka_2.13-4.2.0 + +bin/kafka-topics.sh --create --topic druid-share-test --partitions 4 --bootstrap-server localhost:9092 + +# Set share-group reset to earliest so the task picks up records that already exist +# in the topic. The default broker setting is 'latest', which would skip pre-existing +# records and ingest zero rows even though the producer ran successfully. +bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \ + --entity-type groups --entity-name druid-demo-share-group \ + --add-config share.auto.offset.reset=earliest +``` + +### Step 3: Produce sample messages + +```bash +cd kafka_2.13-4.2.0 + +bin/kafka-console-producer.sh --topic druid-share-test --bootstrap-server localhost:9092 +``` + +Paste these JSON records: + +```json +{"__time":"2025-06-01T00:00:00.000Z","item":"widget_a","value":100,"category":"electronics"} +{"__time":"2025-06-01T01:00:00.000Z","item":"widget_b","value":250,"category":"clothing"} +{"__time":"2025-06-01T02:00:00.000Z","item":"widget_c","value":50,"category":"electronics"} +{"__time":"2025-06-01T03:00:00.000Z","item":"widget_d","value":175,"category":"food"} +{"__time":"2025-06-01T04:00:00.000Z","item":"widget_e","value":320,"category":"electronics"} +``` + +### Step 4: Build Druid and run it + +You can run the demo against either a freshly built Druid distribution or an existing stable Druid binary with the share-group JARs overlaid. Pick the one that matches your environment. + +#### Option A: Build the full Druid distribution from source (recommended) + +Builds the full distribution from this repository so the share-group code is packaged natively, with no JAR overlay required: + +```bash +cd /path/to/druid +JAVA_HOME=$(/usr/libexec/java_home -v 17) \ + mvn clean install -Pdist -T1C -DskipTests \ + -Dforbiddenapis.skip=true -Dcheckstyle.skip=true \ + -Dpmd.skip=true -Dmaven.javadoc.skip=true -Denforcer.skip=true + +tar -xzf distribution/target/apache-druid-*-bin.tar.gz -C /tmp +cd /tmp/apache-druid-* + +bin/start-druid +``` + +> Tip: For faster iteration, build only the `kafka-indexing-service` module with `mvn package -pl extensions-core/kafka-indexing-service -am -DskipTests -T1C` and overlay the resulting JAR onto the distribution from a previous full build (Option B steps below). + +#### Option B: Overlay the share-group JAR onto a downloaded Druid binary (faster, best-effort) + +If you already have a Druid release binary and want to avoid a full source build, you can replace the kafka-indexing-service JAR in that distribution with the one built from this branch. + +> Caveat: The branch builds against `38.0.0-SNAPSHOT`. Druid does **not** guarantee extension `ABI` compatibility across major versions, so the overlay may fail at runtime against an older binary (`37.x` or earlier). Use the most recent stable Druid release available, and prefer Option A for a reliable demo. + +```bash +cd /path/to/druid +JAVA_HOME=$(/usr/libexec/java_home -v 17) mvn package \ + -pl extensions-core/kafka-indexing-service -am \ + -Pskip-static-checks -DskipTests -T1C -q + +# Use the latest stable Druid release available; 37.0.0 is the example below. +DRUID_VERSION=37.0.0 +cd /tmp +curl -O "https://dlcdn.apache.org/druid/${DRUID_VERSION}/apache-druid-${DRUID_VERSION}-bin.tar.gz" +tar -xzf "apache-druid-${DRUID_VERSION}-bin.tar.gz" +cd "apache-druid-${DRUID_VERSION}" + +rm extensions/druid-kafka-indexing-service/*.jar +cp /path/to/druid/extensions-core/kafka-indexing-service/target/druid-kafka-indexing-service-*.jar \ + extensions/druid-kafka-indexing-service/ +cp ~/.m2/repository/org/apache/kafka/kafka-clients/4.2.0/kafka-clients-4.2.0.jar \ + extensions/druid-kafka-indexing-service/ + +bin/start-druid +``` + +### Step 5: Submit task via Druid console + +Open `http://localhost:8888`, go to the **Ingestion** tab, click **Submit JSON task**, and paste: + +```json +{ + "type": "index_kafka_share_group", + "dataSchema": { + "dataSource": "share_group_demo", + "timestampSpec": {"column": "__time", "format": "auto"}, + "dimensionsSpec": {"useSchemaDiscovery": true}, + "granularitySpec": {"segmentGranularity": "DAY", "queryGranularity": "NONE"} + }, + "ioConfig": { + "type": "kafka_share_group", + "topic": "druid-share-test", + "groupId": "druid-demo-share-group", + "consumerProperties": {"bootstrap.servers": "localhost:9092"}, + "inputFormat": {"type": "json"}, + "pollTimeout": 2000 + }, + "tuningConfig": {"type": "KafkaTuningConfig"} +} +``` + +### Step 6: Query data + +Go to the **Query** tab and run: + +```sql +SELECT COUNT(*) AS total_rows FROM share_group_demo; +SELECT category, COUNT(*) AS cnt, SUM(value) AS total FROM share_group_demo GROUP BY category; +``` + +## Running tests + +Unit tests: + +```bash +mvn test -pl extensions-core/kafka-indexing-service \ + -Dtest="org.apache.druid.indexing.kafka.ShareGroupIndexTaskIOConfigTest,\ +org.apache.druid.indexing.kafka.KafkaShareGroupRecordSupplierTest,\ +org.apache.druid.indexing.kafka.ShareGroupIndexTaskTest,\ +org.apache.druid.indexing.kafka.ShareGroupIndexTaskRunnerTest,\ +org.apache.druid.indexing.kafka.ShareGroupConsumerPropertiesTest" \ + -Dsurefire.failIfNoSpecifiedTests=false \ + -Pskip-static-checks -Dweb.console.skip=true -T1C +``` + +E2E test (requires Docker; Testcontainers starts an `apache/kafka:4.1.1` broker with `group.share.enable=true`): + +```bash +mvn test -pl embedded-tests -am \ + -Dtest="org.apache.druid.testing.embedded.indexing.EmbeddedShareGroupIngestionTest" \ + -Dsurefire.failIfNoSpecifiedTests=false \ + -Pskip-static-checks -Dweb.console.skip=true -T1C +``` diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml index 1c77fe29744b..da985f21856f 100644 --- a/embedded-tests/pom.xml +++ b/embedded-tests/pom.xml @@ -40,6 +40,17 @@ + + + org.apache.kafka + kafka-clients + ${apache.kafka.version} + test + + org.apache.druid @@ -532,12 +543,6 @@ commons-codec test - - org.apache.kafka - kafka-clients - ${apache.kafka.version} - test - org.testcontainers testcontainers diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedShareGroupIngestionTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedShareGroupIngestionTest.java new file mode 100644 index 000000000000..e417fd1cf8b1 --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedShareGroupIngestionTest.java @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.granularity.UniformGranularitySpec; +import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; +import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig; +import org.apache.druid.indexing.kafka.ShareGroupIndexTask; +import org.apache.druid.indexing.kafka.ShareGroupIndexTaskIOConfig; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.joda.time.DateTime; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +/** End-to-end IT for Kafka share-group ingestion; requires Docker. */ +public class EmbeddedShareGroupIngestionTest extends EmbeddedClusterTestBase +{ + private static final String COL_TIMESTAMP = "__time"; + private static final String COL_ITEM = "item"; + private static final String COL_VALUE = "value"; + + private static final InputFormat DEFAULT_CSV_FORMAT = + new CsvInputFormat(List.of(COL_TIMESTAMP, COL_ITEM, COL_VALUE), null, null, false, 0, false); + + /** Lets a freshly submitted share-group task reach STABLE before records are produced. */ + private static final long SHARE_CONSUMER_READY_DELAY_MS = 3_000L; + + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer(); + private final EmbeddedHistorical historical = new EmbeddedHistorical(); + private final EmbeddedBroker broker = new EmbeddedBroker(); + + private ShareGroupKafkaResource kafkaServer; + + @Override + public EmbeddedDruidCluster createCluster() + { + kafkaServer = new ShareGroupKafkaResource(); + + final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper(); + indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + + cluster.addExtension(KafkaIndexTaskModule.class) + .addResource(kafkaServer) + .useLatchableEmitter() + .useDefaultTimeoutForLatchableEmitter(30) + .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s") + .addServer(coordinator) + .addServer(overlord) + .addServer(indexer) + .addServer(historical) + .addServer(broker); + + return cluster; + } + + @Test + public void test_shareGroupIngestion_basicEndToEnd() throws InterruptedException + { + final String topic = dataSource + "_topic"; + kafkaServer.createTopicWithPartitions(topic, 2); + + final String taskId = submitShareGroupTask( + topic, + "druid-share-group-test", + kafkaServer.consumerProperties(), + DEFAULT_CSV_FORMAT, + defaultTuningConfig() + ); + + Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS); + + final int numRecords = 10; + kafkaServer.produceRecordsToTopic( + generateRecords(topic, numRecords, DateTimes.of("2025-06-01")) + ); + + waitForRowsProcessed(numRecords); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + assertRowCountEventually(numRecords); + + cancelAndAwaitTermination(taskId); + } + + /** Verifies that one Kafka record carrying multiple JSON objects yields one row per object. */ + @Test + public void test_shareGroupIngestion_multiRowPerKafkaRecord() throws InterruptedException + { + final String topic = dataSource + "_topic"; + kafkaServer.createTopicWithPartitions(topic, 2); + + final String taskId = submitShareGroupTask( + topic, + "druid-share-group-multirow-test", + kafkaServer.consumerProperties(), + new JsonInputFormat(null, Collections.emptyMap(), null, null, true), + defaultTuningConfig() + ); + + Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS); + + final int numRecords = 5; + final int rowsPerRecord = 4; + kafkaServer.produceRecordsToTopic( + generateMultiObjectJsonRecords(topic, numRecords, rowsPerRecord, DateTimes.of("2025-07-01")) + ); + + final long expectedRows = (long) numRecords * rowsPerRecord; + waitForRowsProcessed(expectedRows); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + assertRowCountEventually(expectedRows); + + cancelAndAwaitTermination(taskId); + } + + /** Forbidden share-group consumer properties are stripped so the task starts and ingests. */ + @Test + public void test_shareGroupIngestion_unsupportedConsumerProperty_isSanitized() throws InterruptedException + { + final String topic = dataSource + "_topic"; + kafkaServer.createTopicWithPartitions(topic, 2); + + final String taskId = submitShareGroupTask( + topic, + "druid-share-group-sanitize-test", + consumerPropsWithUnsupportedKeys(), + DEFAULT_CSV_FORMAT, + defaultTuningConfig() + ); + + Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS); + + final int numRecords = 10; + kafkaServer.produceRecordsToTopic( + generateRecords(topic, numRecords, DateTimes.of("2025-08-01")) + ); + + waitForRowsProcessed(numRecords); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + assertRowCountEventually(numRecords); + + cancelAndAwaitTermination(taskId); + } + + /** Cancel must drain the in-flight batch so rows ingested before cancel remain queryable. */ + @Test + public void test_shareGroupIngestion_gracefulStop_publishesInflightBatch() throws InterruptedException + { + final String topic = dataSource + "_topic"; + kafkaServer.createTopicWithPartitions(topic, 2); + + final String taskId = submitShareGroupTask( + topic, + "druid-share-group-graceful-stop", + kafkaServer.consumerProperties(), + DEFAULT_CSV_FORMAT, + defaultTuningConfig() + ); + + Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS); + + final int numRecords = 5; + kafkaServer.produceRecordsToTopic( + generateRecords(topic, numRecords, DateTimes.of("2025-09-01")) + ); + + waitForRowsProcessed(numRecords); + + cancelAndAwaitTermination(taskId); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + assertRowCountEventually(numRecords); + } + + /** Three tasks sharing one group on a 2-partition topic ingest every record exactly once (KIP-932). */ + @Test + public void test_shareGroupIngestion_multiTaskShareGroup_noDuplicatesNoLoss() throws InterruptedException + { + final String topic = dataSource + "_topic"; + kafkaServer.createTopicWithPartitions(topic, 2); + + final String groupId = "druid-share-group-fanout"; + final String taskA = submitShareGroupTask( + topic, + groupId, + kafkaServer.consumerProperties(), + DEFAULT_CSV_FORMAT, + defaultTuningConfig() + ); + final String taskB = submitShareGroupTask( + topic, + groupId, + kafkaServer.consumerProperties(), + DEFAULT_CSV_FORMAT, + defaultTuningConfig() + ); + final String taskC = submitShareGroupTask( + topic, + groupId, + kafkaServer.consumerProperties(), + DEFAULT_CSV_FORMAT, + defaultTuningConfig() + ); + + Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS); + + final int numRecords = 30; + kafkaServer.produceRecordsToTopic( + generateRecords(topic, numRecords, DateTimes.of("2025-10-01")) + ); + + waitForRowsProcessed(numRecords); + + cancelAndAwaitTermination(taskA); + cancelAndAwaitTermination(taskB); + cancelAndAwaitTermination(taskC); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + assertRowCountEventually(numRecords); + } + + /** 1000 rows with {@code maxRowsInMemory=100} forces mid-batch persists yet ingests all rows. */ + @Test + public void test_shareGroupIngestion_lowMaxRowsInMemory_dataCorrect() throws InterruptedException + { + final String topic = dataSource + "_topic"; + kafkaServer.createTopicWithPartitions(topic, 2); + + final String taskId = submitShareGroupTask( + topic, + "druid-share-group-persist-test", + kafkaServer.consumerProperties(), + DEFAULT_CSV_FORMAT, + tuningConfigWithMaxRowsInMemory(100) + ); + + Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS); + + final int numRecords = 1000; + kafkaServer.produceRecordsToTopic( + generateRecords(topic, numRecords, DateTimes.of("2025-11-01")) + ); + + waitForRowsProcessed(numRecords); + + cancelAndAwaitTermination(taskId); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + assertRowCountEventually(numRecords); + } + + private DataSchema buildDataSchema() + { + return DataSchema.builder() + .withDataSource(dataSource) + .withTimestamp(new TimestampSpec(COL_TIMESTAMP, null, null)) + .withDimensions(DimensionsSpec.EMPTY) + .withGranularity(new UniformGranularitySpec( + Granularities.DAY, + Granularities.NONE, + null + )) + .build(); + } + + private static KafkaIndexTaskTuningConfig defaultTuningConfig() + { + return new KafkaIndexTaskTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + + private static KafkaIndexTaskTuningConfig tuningConfigWithMaxRowsInMemory(int maxRowsInMemory) + { + return new KafkaIndexTaskTuningConfig( + null, + maxRowsInMemory, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + + private String submitShareGroupTask( + String topic, + String groupId, + Map consumerProperties, + InputFormat inputFormat, + KafkaIndexTaskTuningConfig tuningConfig + ) + { + // Broker default is LATEST; deliver pre-existing records to this group. + kafkaServer.setShareGroupAutoOffsetReset(groupId, "earliest"); + + final ShareGroupIndexTaskIOConfig ioConfig = new ShareGroupIndexTaskIOConfig( + topic, + groupId, + consumerProperties, + inputFormat, + null + ); + + final ObjectMapper mapper = new DefaultObjectMapper(); + final ShareGroupIndexTask task = new ShareGroupIndexTask( + null, + null, + buildDataSchema(), + tuningConfig, + ioConfig, + null, + mapper + ); + + cluster.callApi().submitTask(task); + return task.getId(); + } + + /** + * Cancels and waits for any terminal state. User-initiated cancels are + * always reported as FAILED by the Overlord; only supervisors can drive + * SUCCESS via {@code shutdownWithSuccess}, so data correctness is verified + * separately via SQL row counts. + */ + private void cancelAndAwaitTermination(String taskId) + { + cluster.callApi().onLeaderOverlord(o -> o.cancelTask(taskId)); + cluster.callApi().waitForTaskToFinish(taskId, overlord.latchableEmitter()); + } + + private void waitForRowsProcessed(long expected) + { + indexer.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("ingest/events/processed") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasSumAtLeast(expected) + ); + } + + /** + * Polls SQL for the expected row count, swallowing transient + * "datasource not found" errors that can arise immediately after a task + * cancel before the broker's SQL catalog has refreshed. + */ + private void assertRowCountEventually(long expected) + { + final String expectedStr = String.valueOf(expected); + final long deadlineMillis = System.currentTimeMillis() + 30_000L; + String last = null; + Exception lastException = null; + while (System.currentTimeMillis() < deadlineMillis) { + try { + last = cluster.runSql("SELECT COUNT(*) FROM %s", dataSource); + if (expectedStr.equals(last)) { + return; + } + lastException = null; + } + catch (Exception e) { + lastException = e; + } + try { + Thread.sleep(250L); + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + Assertions.fail( + "Expected row count [" + expectedStr + "] but last result was [" + last + + "], lastException=" + lastException + ); + } + + /** Builds a property map containing keys forbidden by {@code SHARE_GROUP_UNSUPPORTED_CONFIGS}. */ + private Map consumerPropsWithUnsupportedKeys() + { + final Map props = new HashMap<>(kafkaServer.consumerProperties()); + props.put("enable.auto.commit", "false"); + props.put("auto.offset.reset", "earliest"); + props.put("group.instance.id", "share-it-instance-0"); + return props; + } + + private List> generateRecords( + String topic, + int numRecords, + DateTime startTime + ) + { + final List> records = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + final String csv = StringUtils.format( + "%s,item_%d,%d", + startTime.plusDays(i), + i, + ThreadLocalRandom.current().nextInt(1000) + ); + records.add( + new ProducerRecord<>(topic, i % 2, null, StringUtils.toUtf8(csv)) + ); + } + return records; + } + + private List> generateMultiObjectJsonRecords( + String topic, + int numRecords, + int rowsPerRecord, + DateTime startTime + ) + { + final List> records = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + final StringBuilder json = new StringBuilder(); + for (int j = 0; j < rowsPerRecord; j++) { + if (j > 0) { + json.append(' '); + } + json.append(StringUtils.format( + "{\"%s\":\"%s\",\"%s\":\"item_%d_%d\",\"%s\":%d}", + COL_TIMESTAMP, + startTime.plusHours(i * rowsPerRecord + j), + COL_ITEM, + i, + j, + COL_VALUE, + ThreadLocalRandom.current().nextInt(1000) + )); + } + records.add(new ProducerRecord<>(topic, i % 2, null, StringUtils.toUtf8(json.toString()))); + } + return records; + } +} diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ShareGroupKafkaResource.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ShareGroupKafkaResource.java new file mode 100644 index 000000000000..64731e850bfb --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ShareGroupKafkaResource.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.config.ConfigResource; +import org.testcontainers.kafka.KafkaContainer; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * A {@link KafkaResource} that starts a Kafka broker with share groups enabled. + * Share groups (KIP-932) require the broker config {@code group.share.enable=true}. + */ +public class ShareGroupKafkaResource extends KafkaResource +{ + @Override + protected KafkaContainer createContainer() + { + final KafkaContainer container = super.createContainer(); + container.withEnv("KAFKA_GROUP_SHARE_ENABLE", "true"); + container.withEnv("KAFKA_GROUP_SHARE_RECORD_LOCK_DURATION_MS", "30000"); + // Single-broker test cluster: override replication for the internal + // __share_group_state topic so the share coordinator can initialize. + container.withEnv("KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR", "1"); + container.withEnv("KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR", "1"); + return container; + } + + /** + * Sets {@code share.auto.offset.reset} for the named share group via + * AdminClient. Broker default is {@code LATEST}; tests that produce + * records before subscribing need {@code earliest}. + */ + public void setShareGroupAutoOffsetReset(String groupId, String value) + { + try (Admin admin = newAdminClient()) { + final ConfigResource resource = new ConfigResource(ConfigResource.Type.GROUP, groupId); + final Collection ops = List.of( + new AlterConfigOp( + new ConfigEntry("share.auto.offset.reset", value), + AlterConfigOp.OpType.SET + ) + ); + admin.incrementalAlterConfigs(Map.of(resource, ops)).all().get(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ShareGroupProbeTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ShareGroupProbeTest.java new file mode 100644 index 000000000000..53160f780a81 --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ShareGroupProbeTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Diagnostic probe: bypasses Druid entirely and uses {@link KafkaShareConsumer} + * directly against the testcontainers broker to verify share groups work. + */ +public class ShareGroupProbeTest +{ + private ShareGroupKafkaResource kafkaServer; + + @BeforeEach + public void setUp() + { + kafkaServer = new ShareGroupKafkaResource(); + final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper(); + cluster.addResource(kafkaServer); + kafkaServer.beforeStart(cluster); + kafkaServer.start(); + kafkaServer.onStarted(cluster); + } + + @AfterEach + public void tearDown() + { + if (kafkaServer != null) { + kafkaServer.stop(); + } + } + + @Test + public void probe_mimicDruid_pollLoop100ms_areConsumed() throws Exception + { + final String topic = "probe_topic_mimic_" + System.currentTimeMillis(); + final String groupId = "probe_group_mimic_" + System.currentTimeMillis(); + + kafkaServer.createTopicWithPartitions(topic, 2); + kafkaServer.setShareGroupAutoOffsetReset(groupId, "earliest"); + + // Mirror Druid's KafkaShareGroupRecordSupplier props pipeline. + final Map raw = new HashMap<>(kafkaServer.consumerProperties()); + final Map sanitized = + org.apache.druid.indexing.kafka.ShareGroupConsumerProperties.sanitize(raw); + final Properties props = new Properties(); + for (Map.Entry e : sanitized.entrySet()) { + props.setProperty(e.getKey(), String.valueOf(e.getValue())); + } + props.setProperty("group.id", groupId); + props.setProperty("share.acknowledgement.mode", "explicit"); + + final AtomicInteger received = new AtomicInteger(); + + Thread producerThread = new Thread(() -> { + try { + Thread.sleep(3_000); + final java.util.ArrayList> records = + new java.util.ArrayList<>(); + for (int i = 0; i < 10; i++) { + records.add(new org.apache.kafka.clients.producer.ProducerRecord<>(topic, i % 2, null, ("v-" + i).getBytes(StandardCharsets.UTF_8))); + } + kafkaServer.produceRecordsToTopic(records); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, "probe-producer"); + producerThread.start(); + + try (KafkaShareConsumer consumer = + new KafkaShareConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + consumer.subscribe(List.of(topic)); + + final long deadlineMs = System.currentTimeMillis() + 30_000; + while (received.get() < 10 && System.currentTimeMillis() < deadlineMs) { + final ConsumerRecords polled = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord r : polled) { + received.incrementAndGet(); + consumer.acknowledge(r, AcknowledgeType.ACCEPT); + } + consumer.commitSync(); + } + } + + producerThread.join(2_000); + Assertions.assertEquals(10, received.get(), "Expected 10 records via Druid-mimicking probe"); + } + + @Test + public void probe_noWarmup_transactionalProducer_areConsumed() throws Exception + { + final String topic = "probe_topic_nowarmup_" + System.currentTimeMillis(); + final String groupId = "probe_group_nowarmup_" + System.currentTimeMillis(); + + kafkaServer.createTopicWithPartitions(topic, 2); + kafkaServer.setShareGroupAutoOffsetReset(groupId, "earliest"); + + final Properties props = new Properties(); + props.put("bootstrap.servers", kafkaServer.getBootstrapServerUrl()); + props.put("group.id", groupId); + props.put("share.acknowledgement.mode", "explicit"); + + final AtomicInteger received = new AtomicInteger(); + + try (KafkaShareConsumer consumer = + new KafkaShareConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + consumer.subscribe(List.of(topic)); + + Thread.sleep(3_000); + + final java.util.ArrayList> records = + new java.util.ArrayList<>(); + for (int i = 0; i < 10; i++) { + records.add(new org.apache.kafka.clients.producer.ProducerRecord<>(topic, i % 2, null, ("value-" + i).getBytes(StandardCharsets.UTF_8))); + } + kafkaServer.produceRecordsToTopic(records); + + final long deadlineMs = System.currentTimeMillis() + 30_000; + while (received.get() < 10 && System.currentTimeMillis() < deadlineMs) { + final ConsumerRecords polled = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord r : polled) { + received.incrementAndGet(); + consumer.acknowledge(r, AcknowledgeType.ACCEPT); + } + consumer.commitSync(); + } + } + + Assertions.assertEquals(10, received.get(), "Expected 10 records via KafkaShareConsumer (no warmup, transactional producer)"); + } + + @Test + public void probe_recordsProducedAfterSubscribe_areConsumed() throws Exception + { + final String topic = "probe_topic_" + System.currentTimeMillis(); + final String groupId = "probe_group_" + System.currentTimeMillis(); + + kafkaServer.createTopicWithPartitions(topic, 2); + + try (Admin admin = kafkaServer.newAdminClient()) { + final ConfigResource brokerCfg = new ConfigResource(ConfigResource.Type.BROKER, "1"); + final DescribeConfigsResult res = admin.describeConfigs(List.of(brokerCfg)); + for (ConfigEntry e : res.all().get().get(brokerCfg).entries()) { + if (e.name().startsWith("group.share")) { + System.out.println("BROKER_CFG: " + e.name() + "=" + e.value() + " source=" + e.source()); + } + } + } + + kafkaServer.setShareGroupAutoOffsetReset(groupId, "earliest"); + + try (Admin admin = kafkaServer.newAdminClient()) { + final ConfigResource grp = new ConfigResource(ConfigResource.Type.GROUP, groupId); + final DescribeConfigsResult res = admin.describeConfigs(List.of(grp)); + for (ConfigEntry e : res.all().get().get(grp).entries()) { + if (e.name().startsWith("share")) { + System.out.println("GROUP_CFG: " + e.name() + "=" + e.value() + " source=" + e.source()); + } + } + } + + final Properties props = new Properties(); + props.put("bootstrap.servers", kafkaServer.getBootstrapServerUrl()); + props.put("group.id", groupId); + props.put("share.acknowledgement.mode", "explicit"); + + final AtomicInteger received = new AtomicInteger(); + + try (KafkaShareConsumer consumer = + new KafkaShareConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + consumer.subscribe(List.of(topic)); + + final long subscribeAt = System.currentTimeMillis(); + while (System.currentTimeMillis() - subscribeAt < 3_000) { + consumer.poll(Duration.ofMillis(200)); + } + + final Map producerProps = new HashMap<>(); + producerProps.put("bootstrap.servers", kafkaServer.getBootstrapServerUrl()); + producerProps.put("key.serializer", ByteArraySerializer.class.getName()); + producerProps.put("value.serializer", ByteArraySerializer.class.getName()); + producerProps.put("acks", "all"); + + try (KafkaProducer producer = new KafkaProducer<>(producerProps)) { + for (int i = 0; i < 10; i++) { + producer.send(new ProducerRecord<>(topic, i % 2, null, ("value-" + i).getBytes(StandardCharsets.UTF_8))); + } + producer.flush(); + } + + final long deadlineMs = System.currentTimeMillis() + 30_000; + while (received.get() < 10 && System.currentTimeMillis() < deadlineMs) { + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + for (ConsumerRecord r : records) { + received.incrementAndGet(); + consumer.acknowledge(r, AcknowledgeType.ACCEPT); + } + consumer.commitSync(); + } + } + + Assertions.assertEquals(10, received.get(), "Expected 10 records via KafkaShareConsumer"); + } +} diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml index f86138c8742a..203abb5e31dd 100644 --- a/extensions-core/kafka-indexing-service/pom.xml +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -189,6 +189,11 @@ test-jar test + + org.mockito + mockito-core + test + org.easymock easymock diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java index 6a364fc5f031..559f867f4be4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java @@ -43,8 +43,10 @@ public List getJacksonModules() new SimpleModule(getClass().getSimpleName()) .registerSubtypes( new NamedType(KafkaIndexTask.class, "index_kafka"), + new NamedType(ShareGroupIndexTask.class, "index_kafka_share_group"), new NamedType(KafkaDataSourceMetadata.class, SCHEME), new NamedType(KafkaIndexTaskIOConfig.class, SCHEME), + new NamedType(ShareGroupIndexTaskIOConfig.class, "kafka_share_group"), // "KafkaTuningConfig" is not the ideal name, but is needed for backwards compatibility. // (Older versions of Druid didn't specify a type name and got this one by default.) new NamedType(KafkaIndexTaskTuningConfig.class, "KafkaTuningConfig"), diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaShareGroupRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaShareGroupRecordSupplier.java new file mode 100644 index 000000000000..a6054bd5c061 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaShareGroupRecordSupplier.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; +import org.apache.druid.data.input.kafka.KafkaTopicPartition; +import org.apache.druid.indexing.seekablestream.common.AcknowledgeType; +import org.apache.druid.indexing.seekablestream.common.AcknowledgingRecordSupplier; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import javax.annotation.Nonnull; +import javax.validation.constraints.NotNull; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Adapts {@link KafkaShareConsumer} to {@link AcknowledgingRecordSupplier}. + * Delivery state lives on the broker; the supplier sets {@code group.id} to + * the configured share group name. + */ +public class KafkaShareGroupRecordSupplier + implements AcknowledgingRecordSupplier +{ + private static final Logger log = new Logger(KafkaShareGroupRecordSupplier.class); + + private final KafkaShareConsumer consumer; + + /** + * Records returned by the most recent {@link #poll(long)}, retained so + * {@link #acknowledge} can pass the original {@link ConsumerRecord} to the + * Kafka client. The {@code (topic, partition, offset)} ack variant is fragile + * once the share-fetch buffer rolls over (KIP-932). + */ + private final Map> deliveredRecords = new HashMap<>(); + private boolean closed; + + public KafkaShareGroupRecordSupplier( + Map consumerProperties, + ObjectMapper sortingMapper, + String groupId + ) + { + this(createShareConsumer(consumerProperties, sortingMapper, groupId)); + } + + @VisibleForTesting + public KafkaShareGroupRecordSupplier(KafkaShareConsumer consumer) + { + this.consumer = consumer; + } + + @Override + public void subscribe(Set topics) + { + consumer.subscribe(topics); + } + + @Override + public void unsubscribe() + { + consumer.unsubscribe(); + } + + @Override + public Set subscription() + { + return consumer.subscription(); + } + + @NotNull + @Override + public List> poll(long timeoutMs) + { + deliveredRecords.clear(); + final List> polledRecords = + new ArrayList<>(); + for (ConsumerRecord record : consumer.poll(Duration.ofMillis(timeoutMs))) { + deliveredRecords.put(new RecordKey(record.topic(), record.partition(), record.offset()), record); + polledRecords.add(new OrderedPartitionableRecord<>( + record.topic(), + new KafkaTopicPartition(true, record.topic(), record.partition()), + record.offset(), + record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record)), + record.timestamp() + )); + } + return polledRecords; + } + + @Override + public void acknowledge(KafkaTopicPartition partitionId, Long offset) + { + acknowledge(partitionId, offset, AcknowledgeType.ACCEPT); + } + + @Override + public void acknowledge(KafkaTopicPartition partitionId, Long offset, AcknowledgeType type) + { + final String topic = partitionId.topic().orElseThrow( + () -> new IllegalArgumentException("Cannot acknowledge record without topic") + ); + final ConsumerRecord record = deliveredRecords.get( + new RecordKey(topic, partitionId.partition(), offset) + ); + if (record == null) { + throw new IllegalStateException(StringUtils.format( + "Cannot acknowledge unknown record at topic[%s] partition[%d] offset[%d]; " + + "either it was not delivered by the most recent poll() or it has already been acknowledged.", + topic, partitionId.partition(), offset + )); + } + consumer.acknowledge(record, toKafkaAcknowledgeType(type)); + } + + @Override + public void acknowledge( + Map> offsets, + AcknowledgeType type + ) + { + final org.apache.kafka.clients.consumer.AcknowledgeType kafkaType = toKafkaAcknowledgeType(type); + for (Map.Entry> entry : offsets.entrySet()) { + final KafkaTopicPartition partition = entry.getKey(); + final String topic = partition.topic().orElseThrow( + () -> new IllegalArgumentException("Cannot acknowledge record without topic") + ); + for (Long offset : entry.getValue()) { + final ConsumerRecord record = deliveredRecords.get( + new RecordKey(topic, partition.partition(), offset) + ); + if (record == null) { + throw new IllegalStateException(StringUtils.format( + "Cannot acknowledge unknown record at topic[%s] partition[%d] offset[%d]; " + + "either it was not delivered by the most recent poll() or it has already been acknowledged.", + topic, partition.partition(), offset + )); + } + consumer.acknowledge(record, kafkaType); + } + } + } + + @Override + public Map> commitSync() + { + final Map> result = consumer.commitSync(); + final Map> mapped = new HashMap<>(); + for (Map.Entry> entry : result.entrySet()) { + final TopicIdPartition tip = entry.getKey(); + mapped.put( + new KafkaTopicPartition(true, tip.topic(), tip.partition()), + entry.getValue().map(e -> (Exception) e) + ); + } + return mapped; + } + + @Override + public Set getPartitionIds(String stream) + { + // Share consumer does not expose partition assignment; broker manages it. + return Collections.emptySet(); + } + + @Override + public void wakeup() + { + consumer.wakeup(); + } + + @Override + public Optional acquisitionLockTimeoutMs() + { + return consumer.acquisitionLockTimeoutMs(); + } + + @Override + public void close() + { + if (closed) { + return; + } + closed = true; + try { + consumer.close(); + } + catch (Exception e) { + log.warn(e, "Exception closing KafkaShareConsumer"); + } + } + + @Nonnull + private static org.apache.kafka.clients.consumer.AcknowledgeType toKafkaAcknowledgeType(AcknowledgeType type) + { + switch (type) { + case ACCEPT: + return org.apache.kafka.clients.consumer.AcknowledgeType.ACCEPT; + case RELEASE: + return org.apache.kafka.clients.consumer.AcknowledgeType.RELEASE; + case REJECT: + return org.apache.kafka.clients.consumer.AcknowledgeType.REJECT; + case RENEW: + return org.apache.kafka.clients.consumer.AcknowledgeType.RENEW; + default: + throw new IllegalArgumentException("Unknown acknowledge type: " + type); + } + } + + private static KafkaShareConsumer createShareConsumer( + Map consumerProperties, + ObjectMapper sortingMapper, + String groupId + ) + { + final Map sanitized = ShareGroupConsumerProperties.sanitize(consumerProperties); + + final Properties props = new Properties(); + KafkaRecordSupplier.addConsumerPropertiesFromConfig(props, sortingMapper, sanitized); + ShareGroupConsumerProperties.sanitize(props); + props.setProperty("group.id", groupId); + props.setProperty("share.acknowledgement.mode", "explicit"); + + final ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(KafkaShareGroupRecordSupplier.class.getClassLoader()); + return new KafkaShareConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } + } + + private static final class RecordKey + { + private final String topic; + private final int partition; + private final long offset; + + RecordKey(String topic, int partition, long offset) + { + this.topic = topic; + this.partition = partition; + this.offset = offset; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof RecordKey)) { + return false; + } + final RecordKey that = (RecordKey) o; + return partition == that.partition && offset == that.offset && topic.equals(that.topic); + } + + @Override + public int hashCode() + { + int result = topic.hashCode(); + result = 31 * result + partition; + result = 31 * result + Long.hashCode(offset); + return result; + } + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupConsumerProperties.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupConsumerProperties.java new file mode 100644 index 000000000000..cb149171745d --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupConsumerProperties.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CollectionUtils; + +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Sanitises Kafka consumer properties before they reach + * {@link org.apache.kafka.clients.consumer.KafkaShareConsumer}: strips keys + * forbidden by {@code ShareConsumerConfig.SHARE_GROUP_UNSUPPORTED_CONFIGS} + * (Kafka 4.2.0) so the task starts cleanly when users supply a regular + * consumer config. + */ +public final class ShareGroupConsumerProperties +{ + private static final Logger log = new Logger(ShareGroupConsumerProperties.class); + + /** Mirrors {@code ShareConsumerConfig.SHARE_GROUP_UNSUPPORTED_CONFIGS} (Kafka 4.2.0). */ + static final Set UNSUPPORTED_CONFIGS = ImmutableSet.of( + "auto.offset.reset", + "enable.auto.commit", + "group.instance.id", + "isolation.level", + "partition.assignment.strategy", + "interceptor.classes", + "session.timeout.ms", + "heartbeat.interval.ms", + "group.protocol", + "group.remote.assignor" + ); + + private ShareGroupConsumerProperties() + { + } + + /** + * Returns a copy of {@code consumerProperties} with unsupported keys + * removed; each removed key is logged at WARN. Iteration order is preserved. + */ + public static Map sanitize(Map consumerProperties) + { + final Map sanitized = CollectionUtils.newLinkedHashMapWithExpectedSize(consumerProperties.size()); + for (Map.Entry entry : consumerProperties.entrySet()) { + if (UNSUPPORTED_CONFIGS.contains(entry.getKey())) { + log.warn( + "Stripping unsupported consumer property [%s] for share-group consumer " + + "(see Kafka ShareConsumerConfig).", + entry.getKey() + ); + } else { + sanitized.put(entry.getKey(), entry.getValue()); + } + } + return sanitized; + } + + /** + * Removes unsupported keys from a fully-resolved {@link Properties} object in place. + * Use this after expanding {@code druid.dynamic.config.provider} so that values + * pulled from a secrets vault cannot reintroduce a forbidden key. + */ + public static void sanitize(Properties properties) + { + for (String key : UNSUPPORTED_CONFIGS) { + if (properties.remove(key) != null) { + log.warn( + "Stripping unsupported consumer property [%s] for share-group consumer " + + "(see Kafka ShareConsumerConfig).", + key + ); + } + } + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupIndexTask.java new file mode 100644 index 000000000000..ee0f9657cd92 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupIndexTask.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.security.AuthorizationUtils; +import org.apache.druid.server.security.ResourceAction; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Indexing task that consumes from a Kafka topic with share-group semantics + * (KIP-932). The broker tracks delivery state; the task acknowledges records + * after the segments containing them are published. Phase 1: single-threaded, + * no supervisor, no dedup cache. + */ +public class ShareGroupIndexTask extends AbstractTask implements PendingSegmentAllocatingTask +{ + private static final Logger log = new Logger(ShareGroupIndexTask.class); + private static final String TYPE = "index_kafka_share_group"; + + public static final Set INPUT_SOURCE_RESOURCES = Set.of( + AuthorizationUtils.createExternalResourceReadAction(KafkaIndexTaskModule.SCHEME) + ); + + private final DataSchema dataSchema; + private final KafkaIndexTaskTuningConfig tuningConfig; + private final ShareGroupIndexTaskIOConfig ioConfig; + private final ObjectMapper configMapper; + + private final AtomicBoolean stopRequested = new AtomicBoolean(false); + private final AtomicReference activeRunner = new AtomicReference<>(); + + @JsonCreator + public ShareGroupIndexTask( + @JsonProperty("id") @Nullable String id, + @JsonProperty("resource") @Nullable TaskResource taskResource, + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("tuningConfig") KafkaIndexTaskTuningConfig tuningConfig, + @JsonProperty("ioConfig") ShareGroupIndexTaskIOConfig ioConfig, + @JsonProperty("context") @Nullable Map context, + @JacksonInject ObjectMapper configMapper + ) + { + super( + getOrMakeId(id, TYPE, dataSchema.getDataSource()), + null, + taskResource, + dataSchema.getDataSource(), + addConcurrentLocksContext(context), + IngestionMode.APPEND + ); + this.dataSchema = dataSchema; + this.tuningConfig = tuningConfig; + this.ioConfig = ioConfig; + this.configMapper = configMapper; + } + + @Override + public String getType() + { + return TYPE; + } + + @Override + public boolean isReady(TaskActionClient taskActionClient) + { + return true; + } + + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + return INPUT_SOURCE_RESOURCES; + } + + @Override + public TaskStatus runTask(TaskToolbox toolbox) throws Exception + { + final ShareGroupIndexTaskRunner runner = new ShareGroupIndexTaskRunner(this, toolbox, configMapper); + activeRunner.set(runner); + try { + return runner.run(); + } + finally { + activeRunner.set(null); + } + } + + @Override + public void stopGracefully(TaskConfig taskConfig) + { + log.info("Graceful stop requested for task[%s].", getId()); + stopRequested.set(true); + final ShareGroupIndexTaskRunner runner = activeRunner.get(); + if (runner != null) { + runner.requestWakeup(); + } + } + + @JsonProperty + public DataSchema getDataSchema() + { + return dataSchema; + } + + @JsonProperty + public KafkaIndexTaskTuningConfig getTuningConfig() + { + return tuningConfig; + } + + @JsonProperty("ioConfig") + public ShareGroupIndexTaskIOConfig getIOConfig() + { + return ioConfig; + } + + @Override + public String getTaskAllocatorId() + { + return getTaskResource().getAvailabilityGroup(); + } + + @Override + public int getPriority() + { + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); + } + + boolean isStopRequested() + { + return stopRequested.get(); + } + + ObjectMapper getConfigMapper() + { + return configMapper; + } + + private static Map addConcurrentLocksContext(@Nullable Map context) + { + final Map merged = new HashMap<>(); + if (context != null) { + merged.putAll(context); + } + merged.putIfAbsent(Tasks.USE_CONCURRENT_LOCKS, true); + return merged; + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskIOConfig.java new file mode 100644 index 000000000000..2b00a5529cbf --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskIOConfig.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; +import org.apache.druid.segment.indexing.IOConfig; + +import javax.annotation.Nullable; +import java.util.Map; + +/** + * IO configuration for {@link ShareGroupIndexTask}. + * + * Unlike {@link KafkaIndexTaskIOConfig}, this config does not carry start/end + * offsets because the Kafka broker manages offset tracking for share groups. + * The task only needs the topic, group ID, consumer properties, and input format. + */ +public class ShareGroupIndexTaskIOConfig implements IOConfig +{ + private static final long DEFAULT_POLL_TIMEOUT_MILLIS = KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; + + private final String topic; + private final String groupId; + private final Map consumerProperties; + private final InputFormat inputFormat; + private final long pollTimeout; + + @JsonCreator + public ShareGroupIndexTaskIOConfig( + @JsonProperty("topic") String topic, + @JsonProperty("groupId") String groupId, + @JsonProperty("consumerProperties") Map consumerProperties, + @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, + @JsonProperty("pollTimeout") @Nullable Long pollTimeout + ) + { + this.topic = Preconditions.checkNotNull(topic, "topic"); + this.groupId = Preconditions.checkNotNull(groupId, "groupId"); + this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); + this.inputFormat = inputFormat; + this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS; + } + + @JsonProperty + public String getTopic() + { + return topic; + } + + @JsonProperty + public String getGroupId() + { + return groupId; + } + + @JsonProperty + public Map getConsumerProperties() + { + return consumerProperties; + } + + @Nullable + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public InputFormat getInputFormat() + { + return inputFormat; + } + + @JsonProperty + public long getPollTimeout() + { + return pollTimeout; + } + + @Override + public String toString() + { + return "ShareGroupIndexTaskIOConfig{" + + "topic='" + topic + '\'' + + ", groupId='" + groupId + '\'' + + ", consumerProperties=" + consumerProperties + + ", inputFormat=" + inputFormat + + ", pollTimeout=" + pollTimeout + + '}'; + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskRunner.java new file mode 100644 index 000000000000..b4fe2df8da5a --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskRunner.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.Configs; +import org.apache.druid.data.input.Committer; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; +import org.apache.druid.data.input.kafka.KafkaTopicPartition; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever; +import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.LockReleaseAction; +import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; +import org.apache.druid.indexing.common.actions.TaskLocks; +import org.apache.druid.indexing.common.task.InputRowFilter; +import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.indexing.input.InputRowSchemas; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.seekablestream.SeekableStreamAppenderatorConfig; +import org.apache.druid.indexing.seekablestream.StreamChunkReader; +import org.apache.druid.indexing.seekablestream.common.AcknowledgeType; +import org.apache.druid.indexing.seekablestream.common.AcknowledgingRecordSupplier; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.SegmentSchemaMapping; +import org.apache.druid.segment.incremental.ParseExceptionHandler; +import org.apache.druid.segment.incremental.RowIngestionMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.realtime.SegmentGenerationMetrics; +import org.apache.druid.segment.realtime.appenderator.Appenderator; +import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; +import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver; +import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedPartialShardSpec; +import org.apache.kafka.common.errors.WakeupException; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +/** + * Single-threaded ingestion loop for {@link ShareGroupIndexTask}. Records are + * {@code ACCEPT}-acknowledged only after the containing segment is published; + * any earlier failure leaves them unacknowledged so the broker redelivers + * them once the acquisition lock expires. + */ +public class ShareGroupIndexTaskRunner +{ + private static final Logger log = new Logger(ShareGroupIndexTaskRunner.class); + private static final String SEQUENCE_NAME = "share_group_seq_0"; + + static final String METRIC_COMMIT_FAILURES = "ingest/shareGroup/commitFailures"; + + private final ShareGroupIndexTask task; + private final TaskToolbox toolbox; + private final ObjectMapper configMapper; + private final Function> supplierFactory; + + private final AtomicReference> + activeSupplier = new AtomicReference<>(); + + ShareGroupIndexTaskRunner( + ShareGroupIndexTask task, + TaskToolbox toolbox, + ObjectMapper configMapper + ) + { + this(task, toolbox, configMapper, null); + } + + @VisibleForTesting + ShareGroupIndexTaskRunner( + ShareGroupIndexTask task, + TaskToolbox toolbox, + ObjectMapper configMapper, + @Nullable Function> supplierFactory + ) + { + this.task = task; + this.toolbox = toolbox; + this.configMapper = configMapper; + this.supplierFactory = supplierFactory != null ? supplierFactory : this::createDefaultRecordSupplier; + } + + public TaskStatus run() throws Exception + { + final DataSchema dataSchema = task.getDataSchema(); + final KafkaIndexTaskTuningConfig tuningConfig = task.getTuningConfig(); + final ShareGroupIndexTaskIOConfig ioConfig = task.getIOConfig(); + + final RowIngestionMeters rowIngestionMeters = + toolbox.getRowIngestionMetersFactory().createRowIngestionMeters(); + final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler( + rowIngestionMeters, + tuningConfig.isLogParseExceptions(), + tuningConfig.getMaxParseExceptions(), + tuningConfig.getMaxSavedParseExceptions() + ); + final SegmentGenerationMetrics segmentGenerationMetrics = new SegmentGenerationMetrics(); + + final InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(dataSchema); + final InputFormat inputFormat = ioConfig.getInputFormat(); + if (inputFormat == null) { + throw new ISE("inputFormat must be specified in ioConfig"); + } + + // Raw type mirrors SeekableStreamIndexTaskRunner; works around + // OrderedPartitionableRecord.getData() returning a wildcard list. + @SuppressWarnings({"rawtypes", "unchecked"}) + final StreamChunkReader chunkReader = new StreamChunkReader( + inputFormat, + inputRowSchema, + dataSchema.getTransformSpec(), + toolbox.getIndexingTmpDir(), + InputRowFilter.allowAll(), + rowIngestionMeters, + parseExceptionHandler + ); + + final LockGranularity lockGranularity = Configs.valueOrDefault( + task.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK), + Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK + ) + ? LockGranularity.TIME_CHUNK + : LockGranularity.SEGMENT; + final TaskLockType lockType = TaskLocks.determineLockTypeForAppend(task.getContext()); + + final Appenderator appenderator = toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask( + toolbox.getSegmentLoaderConfig(), + task.getId(), + dataSchema, + SeekableStreamAppenderatorConfig.fromTuningConfig( + tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), + toolbox.getProcessingConfig() + ), + toolbox.getConfig(), + segmentGenerationMetrics, + toolbox.getSegmentPusher(), + toolbox.getJsonMapper(), + toolbox.getIndexIO(), + toolbox.getIndexMerger(), + toolbox.getQueryRunnerFactoryConglomerate(), + toolbox.getSegmentAnnouncer(), + toolbox.getEmitter(), + toolbox.getQueryProcessingPool(), + toolbox.getJoinableFactory(), + toolbox.getCache(), + toolbox.getCacheConfig(), + toolbox.getCachePopulatorStats(), + toolbox.getPolicyEnforcer(), + rowIngestionMeters, + parseExceptionHandler, + toolbox.getCentralizedTableSchemaConfig(), + interval -> { + toolbox.getTaskActionClient().submit(new LockReleaseAction(interval)); + } + ); + + final StreamAppenderatorDriver driver = new StreamAppenderatorDriver( + appenderator, + new ActionBasedSegmentAllocator( + toolbox.getTaskActionClient(), + dataSchema, + (schema, row, sequenceName, previousSegmentId, skipSegmentLineageCheck) -> new SegmentAllocateAction( + schema.getDataSource(), + row.getTimestamp(), + schema.getGranularitySpec().getQueryGranularity(), + schema.getGranularitySpec().getSegmentGranularity(), + sequenceName, + previousSegmentId, + skipSegmentLineageCheck, + NumberedPartialShardSpec.instance(), + lockGranularity, + lockType + ) + ), + toolbox.getSegmentHandoffNotifierFactory(), + new ActionBasedPublishedSegmentRetriever(toolbox.getTaskActionClient()), + toolbox.getDataSegmentKiller(), + toolbox.getJsonMapper(), + segmentGenerationMetrics + ); + + final org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor metricsMonitor = + new org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor( + segmentGenerationMetrics, + rowIngestionMeters, + task.getMetricBuilder() + ); + toolbox.addMonitor(metricsMonitor); + + boolean appenderatorClosedNormally = false; + try (final AcknowledgingRecordSupplier recordSupplier = + supplierFactory.apply(ioConfig)) { + activeSupplier.set(recordSupplier); + recordSupplier.subscribe(Collections.singleton(ioConfig.getTopic())); + + driver.startJob(segmentId -> true); + + // Share groups manage delivery state on the broker; the Committer is a + // no-op placeholder required by the appenderator driver contract. + final Supplier committerSupplier = () -> new Committer() + { + @Override + public Object getMetadata() + { + return ImmutableMap.of("type", "share_group"); + } + + @Override + public void run() + { + } + }; + + log.info("Starting share group ingestion loop for topic[%s], group[%s].", + ioConfig.getTopic(), ioConfig.getGroupId()); + + long totalRowsIngested = 0; + boolean lockTimeoutLogged = false; + + while (!task.isStopRequested()) { + + final List> records; + try { + records = recordSupplier.poll(ioConfig.getPollTimeout()); + } + catch (WakeupException e) { + if (task.isStopRequested()) { + log.info("Wake-up received while stopGracefully is in progress; exiting ingestion loop."); + break; + } + throw e; + } + + if (!lockTimeoutLogged) { + final Optional effectiveLockMs = recordSupplier.acquisitionLockTimeoutMs(); + log.info( + "Effective broker acquisition lock timeout for share-group[%s]: %s ms", + ioConfig.getGroupId(), + effectiveLockMs.map(String::valueOf).orElse("") + ); + lockTimeoutLogged = true; + } + + if (records.isEmpty()) { + continue; + } + + final Map> batchOffsets = new HashMap<>(); + boolean midBatchPersistNeeded = false; + boolean pushThresholdLogged = false; + + for (OrderedPartitionableRecord record : records) { + batchOffsets.computeIfAbsent(record.getPartitionId(), k -> new ArrayList<>()) + .add(record.getSequenceNumber()); + + final List rows = chunkReader.parse(record.getData(), false); + + for (InputRow row : rows) { + final AppenderatorDriverAddResult addResult = driver.add( + row, + SEQUENCE_NAME, + committerSupplier, + true, + false + ); + + if (!addResult.isOk()) { + // Throw without acknowledging so the broker redelivers this batch. + throw new ISE( + "Could not allocate segment for row with timestamp[%s]", + row.getTimestamp() + ); + } + + totalRowsIngested++; + midBatchPersistNeeded |= addResult.isPersistRequired(); + + if (!pushThresholdLogged && addResult.isPushRequired( + tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(), + tuningConfig.getPartitionsSpec() + .getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS) + )) { + pushThresholdLogged = true; + log.info( + "Segment row threshold reached during batch (current segment[%s], rows[%d]); " + + "will publish at end-of-batch boundary.", + addResult.getSegmentIdentifier(), + addResult.getNumRowsInSegment() + ); + } + } + } + + if (midBatchPersistNeeded) { + driver.persist(committerSupplier.get()); + } + + driver.persist(committerSupplier.get()); + + final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() + { + @Override + public SegmentPublishResult publishAnnotatedSegments( + @Nullable Set mustBeNullOrEmptyOverwriteSegments, + Set segmentsToPush, + @Nullable Object commitMetadata, + SegmentSchemaMapping segmentSchemaMapping + ) throws IOException + { + if (segmentsToPush.isEmpty()) { + return SegmentPublishResult.ok(segmentsToPush); + } + return toolbox.getTaskActionClient().submit( + SegmentTransactionalAppendAction.forSegments(segmentsToPush, segmentSchemaMapping) + ); + } + + @Override + public boolean supportsEmptyPublish() + { + return false; + } + }; + + final SegmentsAndCommitMetadata published = driver.publish( + publisher, + committerSupplier.get(), + Collections.singletonList(SEQUENCE_NAME) + ).get(); + + final int segmentCount = published.getSegments().size(); + if (segmentCount > 0) { + log.info("Published %d segments with %d total rows.", segmentCount, totalRowsIngested); + } + + // Acknowledge only after the segment is published; any earlier failure + // leaves records unacknowledged so the broker redelivers them. + for (Map.Entry> entry : batchOffsets.entrySet()) { + for (Long offset : entry.getValue()) { + recordSupplier.acknowledge(entry.getKey(), offset, AcknowledgeType.ACCEPT); + } + } + + final Map> commitResult = recordSupplier.commitSync(); + long commitFailures = 0L; + for (Map.Entry> entry : commitResult.entrySet()) { + if (entry.getValue().isPresent()) { + commitFailures++; + log.warn( + entry.getValue().get(), + "Commit failed for partition[%s]. Records may be redelivered.", + entry.getKey() + ); + } + } + if (commitFailures > 0) { + task.emitMetric(toolbox.getEmitter(), METRIC_COMMIT_FAILURES, commitFailures); + } + } + + driver.persist(committerSupplier.get()); + + log.info("Share group ingestion complete. Total rows ingested: %d", totalRowsIngested); + appenderator.close(); + appenderatorClosedNormally = true; + } + finally { + activeSupplier.set(null); + try { + toolbox.removeMonitor(metricsMonitor); + } + catch (Exception e) { + log.warn(e, "Exception removing TaskRealtimeMetricsMonitor; continuing teardown."); + } + try { + driver.close(); + } + catch (Exception e) { + log.warn(e, "Exception closing StreamAppenderatorDriver; continuing teardown."); + } + if (!appenderatorClosedNormally) { + try { + appenderator.closeNow(); + } + catch (Exception e) { + log.warn(e, "Exception during emergency closeNow() of Appenderator; continuing teardown."); + } + } + } + + return TaskStatus.success(task.getId()); + } + + /** Interrupts an in-flight poll so the loop can exit on graceful stop. */ + void requestWakeup() + { + final AcknowledgingRecordSupplier supplier = activeSupplier.get(); + if (supplier == null) { + return; + } + try { + supplier.wakeup(); + } + catch (Exception e) { + log.warn(e, "Exception calling wakeup() on the active record supplier; ignoring."); + } + } + + private AcknowledgingRecordSupplier createDefaultRecordSupplier( + ShareGroupIndexTaskIOConfig ioConfig + ) + { + final ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + return new KafkaShareGroupRecordSupplier( + ioConfig.getConsumerProperties(), + configMapper, + ioConfig.getGroupId() + ); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } + } + +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaShareGroupRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaShareGroupRecordSupplierTest.java new file mode 100644 index 000000000000..5f1bd6608a22 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaShareGroupRecordSupplierTest.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import org.apache.druid.data.input.kafka.KafkaRecordEntity; +import org.apache.druid.data.input.kafka.KafkaTopicPartition; +import org.apache.druid.indexing.seekablestream.common.AcknowledgeType; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class KafkaShareGroupRecordSupplierTest +{ + private KafkaShareConsumer mockConsumer; + private KafkaShareGroupRecordSupplier supplier; + + @SuppressWarnings("unchecked") + @Before + public void setUp() + { + mockConsumer = mock(KafkaShareConsumer.class); + supplier = new KafkaShareGroupRecordSupplier(mockConsumer); + } + + @After + public void tearDown() + { + supplier.close(); + } + + @Test + public void testSubscribeAndSubscription() + { + final Set topics = Set.of("topic-a", "topic-b"); + when(mockConsumer.subscription()).thenReturn(topics); + + supplier.subscribe(topics); + verify(mockConsumer).subscribe(topics); + + Assert.assertEquals(topics, supplier.subscription()); + } + + @Test + public void testUnsubscribe() + { + supplier.unsubscribe(); + verify(mockConsumer).unsubscribe(); + } + + @Test + public void testPollWrapsRecords() + { + final ConsumerRecord record1 = new ConsumerRecord<>( + "test-topic", 0, 100L, "key1".getBytes(StandardCharsets.UTF_8), "value1".getBytes(StandardCharsets.UTF_8) + ); + final ConsumerRecord record2 = new ConsumerRecord<>( + "test-topic", 1, 200L, "key2".getBytes(StandardCharsets.UTF_8), "value2".getBytes(StandardCharsets.UTF_8) + ); + + final Map>> recordMap = new HashMap<>(); + recordMap.put(new TopicPartition("test-topic", 0), List.of(record1)); + recordMap.put(new TopicPartition("test-topic", 1), List.of(record2)); + final ConsumerRecords consumerRecords = new ConsumerRecords<>(recordMap); + + when(mockConsumer.poll(any(Duration.class))).thenReturn(consumerRecords); + + final List> result = + supplier.poll(1000); + + Assert.assertEquals(2, result.size()); + + // Verify first record + final OrderedPartitionableRecord polled1 = + result.stream().filter(r -> r.getSequenceNumber() == 100L).findFirst().orElse(null); + Assert.assertNotNull(polled1); + Assert.assertEquals("test-topic", polled1.getStream()); + Assert.assertEquals(0, polled1.getPartitionId().partition()); + Assert.assertNotNull(polled1.getData()); + Assert.assertEquals(1, polled1.getData().size()); + + // Verify second record + final OrderedPartitionableRecord polled2 = + result.stream().filter(r -> r.getSequenceNumber() == 200L).findFirst().orElse(null); + Assert.assertNotNull(polled2); + Assert.assertEquals(1, polled2.getPartitionId().partition()); + } + + @Test + public void testPollWithNullValue() + { + final ConsumerRecord record = new ConsumerRecord<>( + "test-topic", 0, 50L, "key".getBytes(StandardCharsets.UTF_8), null + ); + final Map>> recordMap = new HashMap<>(); + recordMap.put(new TopicPartition("test-topic", 0), List.of(record)); + when(mockConsumer.poll(any(Duration.class))).thenReturn(new ConsumerRecords<>(recordMap)); + + final List> result = + supplier.poll(1000); + + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.get(0).getData().isEmpty()); + } + + @Test + public void testPollReturnsEmptyOnTimeout() + { + when(mockConsumer.poll(any(Duration.class))).thenReturn(ConsumerRecords.empty()); + final List> result = + supplier.poll(100); + Assert.assertTrue(result.isEmpty()); + } + + @Test + public void testAcknowledgeDefaultAccept() + { + final ConsumerRecord record = new ConsumerRecord<>( + "test-topic", 0, 42L, "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8) + ); + pollSingleRecord(record); + + final KafkaTopicPartition partition = new KafkaTopicPartition(true, "test-topic", 0); + supplier.acknowledge(partition, 42L); + + Mockito.verify(mockConsumer).acknowledge( + Mockito.same(record), + Mockito.eq(org.apache.kafka.clients.consumer.AcknowledgeType.ACCEPT) + ); + } + + @Test + public void testAcknowledgeWithRelease() + { + final ConsumerRecord record = new ConsumerRecord<>( + "test-topic", 0, 10L, "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8) + ); + pollSingleRecord(record); + + final KafkaTopicPartition partition = new KafkaTopicPartition(true, "test-topic", 0); + supplier.acknowledge(partition, 10L, AcknowledgeType.RELEASE); + + Mockito.verify(mockConsumer).acknowledge( + Mockito.same(record), + Mockito.eq(org.apache.kafka.clients.consumer.AcknowledgeType.RELEASE) + ); + } + + @Test + public void testAcknowledgeWithReject() + { + final ConsumerRecord record = new ConsumerRecord<>( + "test-topic", 0, 10L, "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8) + ); + pollSingleRecord(record); + + final KafkaTopicPartition partition = new KafkaTopicPartition(true, "test-topic", 0); + supplier.acknowledge(partition, 10L, AcknowledgeType.REJECT); + + Mockito.verify(mockConsumer).acknowledge( + Mockito.same(record), + Mockito.eq(org.apache.kafka.clients.consumer.AcknowledgeType.REJECT) + ); + } + + @Test + public void testAcknowledgeWithRenew() + { + final ConsumerRecord record = new ConsumerRecord<>( + "test-topic", 0, 99L, "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8) + ); + pollSingleRecord(record); + + final KafkaTopicPartition partition = new KafkaTopicPartition(true, "test-topic", 0); + supplier.acknowledge(partition, 99L, AcknowledgeType.RENEW); + + Mockito.verify(mockConsumer).acknowledge( + Mockito.same(record), + Mockito.eq(org.apache.kafka.clients.consumer.AcknowledgeType.RENEW) + ); + } + + @Test + public void testAcknowledgeBatch() + { + final List> recordsP0 = Arrays.asList( + new ConsumerRecord<>("test-topic", 0, 1L, "k".getBytes(StandardCharsets.UTF_8), "v".getBytes(StandardCharsets.UTF_8)), + new ConsumerRecord<>("test-topic", 0, 2L, "k".getBytes(StandardCharsets.UTF_8), "v".getBytes(StandardCharsets.UTF_8)), + new ConsumerRecord<>("test-topic", 0, 3L, "k".getBytes(StandardCharsets.UTF_8), "v".getBytes(StandardCharsets.UTF_8)) + ); + final List> recordsP1 = Arrays.asList( + new ConsumerRecord<>("test-topic", 1, 10L, "k".getBytes(StandardCharsets.UTF_8), "v".getBytes(StandardCharsets.UTF_8)), + new ConsumerRecord<>("test-topic", 1, 11L, "k".getBytes(StandardCharsets.UTF_8), "v".getBytes(StandardCharsets.UTF_8)) + ); + final Map>> recordMap = new HashMap<>(); + recordMap.put(new TopicPartition("test-topic", 0), recordsP0); + recordMap.put(new TopicPartition("test-topic", 1), recordsP1); + Mockito.when(mockConsumer.poll(Mockito.any(Duration.class))) + .thenReturn(new ConsumerRecords<>(recordMap)); + supplier.poll(1000); + + final KafkaTopicPartition p0 = new KafkaTopicPartition(true, "test-topic", 0); + final KafkaTopicPartition p1 = new KafkaTopicPartition(true, "test-topic", 1); + + final Map> offsets = new HashMap<>(); + offsets.put(p0, Arrays.asList(1L, 2L, 3L)); + offsets.put(p1, Arrays.asList(10L, 11L)); + + supplier.acknowledge(offsets, AcknowledgeType.ACCEPT); + + Mockito.verify(mockConsumer, Mockito.times(5)).acknowledge( + Mockito.>any(), + Mockito.eq(org.apache.kafka.clients.consumer.AcknowledgeType.ACCEPT) + ); + } + + private void pollSingleRecord(ConsumerRecord record) + { + final Map>> recordMap = new HashMap<>(); + recordMap.put(new TopicPartition(record.topic(), record.partition()), List.of(record)); + Mockito.when(mockConsumer.poll(Mockito.any(Duration.class))) + .thenReturn(new ConsumerRecords<>(recordMap)); + supplier.poll(1000); + } + + @Test + public void testCommitSync() + { + final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("test-topic", 0)); + final Map> kafkaResult = new HashMap<>(); + kafkaResult.put(tip, Optional.empty()); + + when(mockConsumer.commitSync()).thenReturn(kafkaResult); + + final Map> result = supplier.commitSync(); + Assert.assertEquals(1, result.size()); + + final Map.Entry> entry = result.entrySet().iterator().next(); + Assert.assertEquals(Optional.of("test-topic"), entry.getKey().topic()); + Assert.assertEquals(0, entry.getKey().partition()); + Assert.assertFalse(entry.getValue().isPresent()); + } + + @Test + public void testCommitSyncWithError() + { + final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("test-topic", 0)); + final KafkaException error = new KafkaException("commit failed"); + final Map> kafkaResult = new HashMap<>(); + kafkaResult.put(tip, Optional.of(error)); + + when(mockConsumer.commitSync()).thenReturn(kafkaResult); + + final Map> result = supplier.commitSync(); + Assert.assertEquals(1, result.size()); + + final Optional maybeError = result.values().iterator().next(); + Assert.assertTrue(maybeError.isPresent()); + Assert.assertEquals("commit failed", maybeError.get().getMessage()); + } + + @Test + public void testCloseIsIdempotent() + { + supplier.close(); + supplier.close(); + verify(mockConsumer, Mockito.times(1)).close(); + } + + @Test + public void testWakeupForwardsToConsumer() + { + supplier.wakeup(); + Mockito.verify(mockConsumer).wakeup(); + } + + @Test + public void testAcquisitionLockTimeoutMsForwardsToConsumer() + { + Mockito.when(mockConsumer.acquisitionLockTimeoutMs()).thenReturn(Optional.of(45_000)); + final Optional lockMs = supplier.acquisitionLockTimeoutMs(); + Assert.assertTrue(lockMs.isPresent()); + Assert.assertEquals(Integer.valueOf(45_000), lockMs.get()); + } + + @Test + public void testAcquisitionLockTimeoutMsEmpty() + { + Mockito.when(mockConsumer.acquisitionLockTimeoutMs()).thenReturn(Optional.empty()); + Assert.assertTrue(supplier.acquisitionLockTimeoutMs().isEmpty()); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupConsumerPropertiesTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupConsumerPropertiesTest.java new file mode 100644 index 000000000000..595eaa425af5 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupConsumerPropertiesTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class ShareGroupConsumerPropertiesTest +{ + @Test + public void testSanitizeKeepsSupportedProperties() + { + final Map input = ImmutableMap.of( + "bootstrap.servers", "broker:9092", + "max.poll.records", 100, + "ssl.protocol", "TLSv1.3" + ); + final Map sanitized = ShareGroupConsumerProperties.sanitize(input); + Assert.assertEquals(input, sanitized); + } + + @Test + public void testSanitizeStripsAutoOffsetReset() + { + final Map input = ImmutableMap.of( + "bootstrap.servers", "broker:9092", + "auto.offset.reset", "earliest" + ); + final Map sanitized = ShareGroupConsumerProperties.sanitize(input); + Assert.assertEquals(1, sanitized.size()); + Assert.assertEquals("broker:9092", sanitized.get("bootstrap.servers")); + Assert.assertFalse(sanitized.containsKey("auto.offset.reset")); + } + + @Test + public void testSanitizeStripsAllUnsupportedKeys() + { + final Map input = new LinkedHashMap<>(); + input.put("bootstrap.servers", "broker:9092"); + for (String key : ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS) { + input.put(key, "some-value"); + } + final Map sanitized = ShareGroupConsumerProperties.sanitize(input); + Assert.assertEquals(1, sanitized.size()); + Assert.assertEquals("broker:9092", sanitized.get("bootstrap.servers")); + } + + @Test + public void testSanitizePreservesInsertionOrder() + { + final Map input = new LinkedHashMap<>(); + input.put("a.first", 1); + input.put("group.protocol", "consumer"); + input.put("b.second", 2); + input.put("c.third", 3); + + final Map sanitized = ShareGroupConsumerProperties.sanitize(input); + Assert.assertArrayEquals( + new String[]{"a.first", "b.second", "c.third"}, + sanitized.keySet().toArray(new String[0]) + ); + } + + @Test + public void testSanitizeOnEmptyMap() + { + Assert.assertTrue(ShareGroupConsumerProperties.sanitize(ImmutableMap.of()).isEmpty()); + } + + @Test + public void testUnsupportedConfigsContainsKnownKafka42Keys() + { + Assert.assertTrue(ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS.contains("auto.offset.reset")); + Assert.assertTrue(ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS.contains("enable.auto.commit")); + Assert.assertTrue(ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS.contains("group.instance.id")); + Assert.assertTrue(ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS.contains("isolation.level")); + Assert.assertTrue(ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS.contains("partition.assignment.strategy")); + Assert.assertTrue(ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS.contains("interceptor.classes")); + Assert.assertTrue(ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS.contains("session.timeout.ms")); + Assert.assertTrue(ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS.contains("heartbeat.interval.ms")); + Assert.assertTrue(ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS.contains("group.protocol")); + Assert.assertTrue(ShareGroupConsumerProperties.UNSUPPORTED_CONFIGS.contains("group.remote.assignor")); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskIOConfigTest.java new file mode 100644 index 000000000000..94e6c518ea77 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskIOConfigTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +public class ShareGroupIndexTaskIOConfigTest +{ + private ObjectMapper mapper; + + @Before + public void setUp() + { + mapper = new DefaultObjectMapper(); + mapper.registerSubtypes( + new NamedType(ShareGroupIndexTaskIOConfig.class, "kafka_share_group") + ); + } + + @Test + public void testSerdeWithAllFields() throws IOException + { + final Map consumerProps = ImmutableMap.of( + "bootstrap.servers", "localhost:9092" + ); + final ShareGroupIndexTaskIOConfig config = new ShareGroupIndexTaskIOConfig( + "test-topic", + "my-share-group", + consumerProps, + null, + 5000L + ); + + final String json = mapper.writeValueAsString(config); + final ShareGroupIndexTaskIOConfig deserialized = mapper.readValue(json, ShareGroupIndexTaskIOConfig.class); + + Assert.assertEquals("test-topic", deserialized.getTopic()); + Assert.assertEquals("my-share-group", deserialized.getGroupId()); + Assert.assertEquals(consumerProps, deserialized.getConsumerProperties()); + Assert.assertNull(deserialized.getInputFormat()); + Assert.assertEquals(5000L, deserialized.getPollTimeout()); + } + + @Test + public void testSerdeWithDefaultPollTimeout() throws IOException + { + final Map consumerProps = ImmutableMap.of( + "bootstrap.servers", "localhost:9092" + ); + final ShareGroupIndexTaskIOConfig config = new ShareGroupIndexTaskIOConfig( + "test-topic", + "my-share-group", + consumerProps, + null, + null + ); + + final String json = mapper.writeValueAsString(config); + final ShareGroupIndexTaskIOConfig deserialized = mapper.readValue(json, ShareGroupIndexTaskIOConfig.class); + + Assert.assertEquals("test-topic", deserialized.getTopic()); + Assert.assertEquals("my-share-group", deserialized.getGroupId()); + // Default poll timeout from KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS + Assert.assertTrue(deserialized.getPollTimeout() > 0); + } + + @Test + public void testDeserializationFromJson() throws IOException + { + final String json = "{" + + "\"type\": \"kafka_share_group\"," + + "\"topic\": \"events\"," + + "\"groupId\": \"druid-share\"," + + "\"consumerProperties\": {\"bootstrap.servers\": \"broker:9092\"}," + + "\"pollTimeout\": 2000" + + "}"; + + final ShareGroupIndexTaskIOConfig config = mapper.readValue(json, ShareGroupIndexTaskIOConfig.class); + Assert.assertEquals("events", config.getTopic()); + Assert.assertEquals("druid-share", config.getGroupId()); + Assert.assertEquals("broker:9092", config.getConsumerProperties().get("bootstrap.servers")); + Assert.assertEquals(2000L, config.getPollTimeout()); + } + + @Test(expected = NullPointerException.class) + public void testTopicRequired() + { + new ShareGroupIndexTaskIOConfig( + null, + "my-share-group", + ImmutableMap.of("bootstrap.servers", "localhost:9092"), + null, + null + ); + } + + @Test(expected = NullPointerException.class) + public void testGroupIdRequired() + { + new ShareGroupIndexTaskIOConfig( + "test-topic", + null, + ImmutableMap.of("bootstrap.servers", "localhost:9092"), + null, + null + ); + } + + @Test(expected = NullPointerException.class) + public void testConsumerPropertiesRequired() + { + new ShareGroupIndexTaskIOConfig( + "test-topic", + "my-share-group", + null, + null, + null + ); + } + + @Test + public void testToString() + { + final ShareGroupIndexTaskIOConfig config = new ShareGroupIndexTaskIOConfig( + "test-topic", + "my-share-group", + ImmutableMap.of("bootstrap.servers", "localhost:9092"), + null, + null + ); + final String str = config.toString(); + Assert.assertTrue(str.contains("test-topic")); + Assert.assertTrue(str.contains("my-share-group")); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskRunnerTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskRunnerTest.java new file mode 100644 index 000000000000..6ba7690db62e --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskRunnerTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.segment.indexing.DataSchema; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Map; + +/** + * Lightweight unit tests for {@link ShareGroupIndexTaskRunner}; the run loop + * itself is covered end-to-end by {@code EmbeddedShareGroupIngestionTest}. + */ +public class ShareGroupIndexTaskRunnerTest +{ + private ObjectMapper mapper; + private ShareGroupIndexTask task; + private TaskToolbox toolbox; + + @Before + public void setUp() + { + mapper = new DefaultObjectMapper(); + toolbox = Mockito.mock(TaskToolbox.class); + + final DataSchema dataSchema = DataSchema.builder() + .withDataSource("test_datasource") + .withTimestamp(new TimestampSpec("__time", null, null)) + .withDimensions(DimensionsSpec.EMPTY) + .build(); + + final Map consumerProps = ImmutableMap.of("bootstrap.servers", "localhost:9092"); + final ShareGroupIndexTaskIOConfig ioConfig = new ShareGroupIndexTaskIOConfig( + "test-topic", + "test-share-group", + consumerProps, + null, + null + ); + final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + task = new ShareGroupIndexTask( + "task_runner_test", + null, + dataSchema, + tuningConfig, + ioConfig, + null, + mapper + ); + } + + @Test + public void testRequestWakeupIsNullSafeWhenNoActiveSupplier() + { + final ShareGroupIndexTaskRunner runner = new ShareGroupIndexTaskRunner(task, toolbox, mapper); + runner.requestWakeup(); + } + + @Test + public void testStopGracefullyBeforeRunTaskIsSafe() + { + Assert.assertFalse(task.isStopRequested()); + task.stopGracefully(null); + Assert.assertTrue(task.isStopRequested()); + } + + @Test + public void testRunnerAcceptsCustomSupplierFactory() + { + final ShareGroupIndexTaskRunner runner = new ShareGroupIndexTaskRunner( + task, + toolbox, + mapper, + ioConfig -> Mockito.mock( + org.apache.druid.indexing.seekablestream.common.AcknowledgingRecordSupplier.class + ) + ); + Assert.assertNotNull(runner); + runner.requestWakeup(); + } + + @Test + public void testCommitFailureMetricName() + { + Assert.assertEquals( + "ingest/shareGroup/commitFailures", + ShareGroupIndexTaskRunner.METRIC_COMMIT_FAILURES + ); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskTest.java new file mode 100644 index 000000000000..7120742f3855 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.segment.indexing.DataSchema; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +/** + * Unit tests for {@link ShareGroupIndexTask} focusing on task type, + * serialization/deserialization, and basic properties. + */ +public class ShareGroupIndexTaskTest +{ + private ObjectMapper mapper; + + @Before + public void setUp() + { + mapper = new DefaultObjectMapper(); + mapper.registerSubtypes( + new NamedType(ShareGroupIndexTask.class, "index_kafka_share_group"), + new NamedType(ShareGroupIndexTaskIOConfig.class, "kafka_share_group"), + new NamedType(KafkaIndexTaskTuningConfig.class, "KafkaTuningConfig") + ); + // Inject the ObjectMapper itself since ShareGroupIndexTask uses @JacksonInject + mapper.setInjectableValues(new com.fasterxml.jackson.databind.InjectableValues.Std() + .addValue(ObjectMapper.class, mapper)); + } + + @Test + public void testTaskType() + { + final ShareGroupIndexTask task = createTask("task_1"); + Assert.assertEquals("index_kafka_share_group", task.getType()); + } + + @Test + public void testTaskIsAlwaysReady() throws Exception + { + final ShareGroupIndexTask task = createTask("task_2"); + Assert.assertTrue(task.isReady(null)); + } + + @Test + public void testTaskDataSource() + { + final ShareGroupIndexTask task = createTask("task_3"); + Assert.assertEquals("test_datasource", task.getDataSource()); + } + + @Test + public void testTaskIdGeneration() + { + final ShareGroupIndexTask task = createTask(null); + Assert.assertNotNull(task.getId()); + Assert.assertTrue(task.getId().contains("index_kafka_share_group")); + } + + @Test + public void testGracefulStop() + { + final ShareGroupIndexTask task = createTask("task_4"); + Assert.assertFalse(task.isStopRequested()); + task.stopGracefully(null); + Assert.assertTrue(task.isStopRequested()); + } + + @Test + public void testIOConfigAccessor() + { + final ShareGroupIndexTask task = createTask("task_5"); + final ShareGroupIndexTaskIOConfig ioConfig = task.getIOConfig(); + Assert.assertEquals("test-topic", ioConfig.getTopic()); + Assert.assertEquals("test-share-group", ioConfig.getGroupId()); + } + + @Test + public void testSerdeRoundTrip() throws IOException + { + final ShareGroupIndexTask task = createTask("task_serde"); + final String json = mapper.writeValueAsString(task); + + // Verify the type field is present + Assert.assertTrue(json.contains("index_kafka_share_group")); + + final ShareGroupIndexTask deserialized = mapper.readValue(json, ShareGroupIndexTask.class); + Assert.assertEquals(task.getId(), deserialized.getId()); + Assert.assertEquals(task.getDataSource(), deserialized.getDataSource()); + Assert.assertEquals(task.getType(), deserialized.getType()); + Assert.assertEquals(task.getIOConfig().getTopic(), deserialized.getIOConfig().getTopic()); + Assert.assertEquals(task.getIOConfig().getGroupId(), deserialized.getIOConfig().getGroupId()); + } + + @Test + public void testInputSourceResources() + { + final ShareGroupIndexTask task = createTask("task_6"); + Assert.assertNotNull(task.getInputSourceResources()); + Assert.assertFalse(task.getInputSourceResources().isEmpty()); + } + + @Test + public void testDefaultPriorityIsRealtime() + { + final ShareGroupIndexTask task = createTask("task_priority_default"); + Assert.assertEquals(Tasks.DEFAULT_REALTIME_TASK_PRIORITY, task.getPriority()); + } + + @Test + public void testContextPriorityOverridesDefault() + { + final ShareGroupIndexTask task = createTask( + "task_priority_override", + ImmutableMap.of(Tasks.PRIORITY_KEY, 99) + ); + Assert.assertEquals(99, task.getPriority()); + } + + private ShareGroupIndexTask createTask(String id) + { + return createTask(id, null); + } + + private ShareGroupIndexTask createTask(String id, Map context) + { + final DataSchema dataSchema = DataSchema.builder() + .withDataSource("test_datasource") + .withTimestamp(new TimestampSpec("__time", null, null)) + .withDimensions(DimensionsSpec.EMPTY) + .build(); + + final Map consumerProps = ImmutableMap.of( + "bootstrap.servers", "localhost:9092" + ); + + final ShareGroupIndexTaskIOConfig ioConfig = new ShareGroupIndexTaskIOConfig( + "test-topic", + "test-share-group", + consumerProps, + null, + null + ); + + final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + return new ShareGroupIndexTask( + id, + null, + dataSchema, + tuningConfig, + ioConfig, + context, + mapper + ); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java index a0ac1f01ea5a..0d4a51ff7cee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkReader.java @@ -41,9 +41,13 @@ import java.util.List; /** - * Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader}. + * Parses one stream record's worth of {@link ByteEntity} chunks into zero-or-more + * {@link InputRow}s, honouring the configured {@link InputRowFilter}, + * {@link RowIngestionMeters}, and {@link ParseExceptionHandler}. Public so it + * can be reused by stream-ingestion implementations outside the + * {@code seekablestream} package. */ -class StreamChunkReader +public class StreamChunkReader { private final SettableByteEntityReader byteEntityReader; private final InputRowFilter rowFilter; @@ -53,7 +57,7 @@ class StreamChunkReader /** * Either parser or inputFormat shouldn't be null. */ - StreamChunkReader( + public StreamChunkReader( InputFormat inputFormat, InputRowSchema inputRowSchema, TransformSpec transformSpec, @@ -89,7 +93,8 @@ class StreamChunkReader this.parseExceptionHandler = parseExceptionHandler; } - List parse(@Nullable List streamChunk, boolean isEndOfShard) throws IOException + public List parse(@Nullable List streamChunk, boolean isEndOfShard) + throws IOException { if (streamChunk == null || streamChunk.isEmpty()) { if (!isEndOfShard) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/AcknowledgeType.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/AcknowledgeType.java new file mode 100644 index 000000000000..7b41d8657198 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/AcknowledgeType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.common; + +/** + * Acknowledgement types for queue-semantics record suppliers; mirrors Kafka's + * {@code AcknowledgeType}. + */ +public enum AcknowledgeType +{ + /** Record processed successfully; broker commits it. */ + ACCEPT, + + /** Release the record back to the broker for redelivery. */ + RELEASE, + + /** Reject the record permanently (poison pill); never redelivered. */ + REJECT, + + /** Extend the acquisition lock without accepting or releasing. */ + RENEW +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/AcknowledgingRecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/AcknowledgingRecordSupplier.java new file mode 100644 index 000000000000..57a4ef4a7118 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/AcknowledgingRecordSupplier.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.common; + +import org.apache.druid.data.input.impl.ByteEntity; + +import javax.validation.constraints.NotNull; +import java.io.Closeable; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Record supplier for queue-semantics streams (e.g. Kafka share groups, + * KIP-932) where the broker owns delivery state and consumers acknowledge + * records explicitly. Unlike {@link RecordSupplier}, callers do not assign + * or seek partitions; they subscribe to topics and ack individual records. + * + * @param partition identifier type + * @param sequence/offset number type + * @param record entity type + */ +public interface AcknowledgingRecordSupplier + extends Closeable +{ + void subscribe(Set topics); + + void unsubscribe(); + + Set subscription(); + + /** + * Poll for records. Records carry acquisition locks and must be + * acknowledged before the lock expires. + */ + @NotNull + List> poll(long timeoutMs); + + /** Acknowledge a single record with the default type ({@code ACCEPT}). */ + void acknowledge(PartitionIdType partitionId, SequenceOffsetType offset); + + /** Acknowledge a single record with the given type. */ + void acknowledge(PartitionIdType partitionId, SequenceOffsetType offset, AcknowledgeType type); + + /** Acknowledge a batch of records with the given type. */ + void acknowledge(Map> offsets, AcknowledgeType type); + + /** + * Commit pending acknowledgements; returns a per-partition exception when + * the commit failed for that partition. + */ + Map> commitSync(); + + Set getPartitionIds(String stream); + + /** + * Interrupt a blocked {@link #poll(long)} so the loop can exit promptly. + * Implementations wrapping {@link org.apache.kafka.clients.consumer.ShareConsumer} + * should delegate to its {@code wakeup()}, after which the in-flight or + * next {@link #poll(long)} throws an implementation-specific wake-up + * exception (e.g. {@link org.apache.kafka.common.errors.WakeupException}). + * Default is a no-op. + */ + default void wakeup() + { + } + + /** + * Broker-effective acquisition lock timeout, if known. For Kafka share + * groups this surfaces {@code group.share.record.lock.duration.ms} after + * the first {@link #poll(long)}; default is {@link Optional#empty()}. + */ + default Optional acquisitionLockTimeoutMs() + { + return Optional.empty(); + } + + @Override + void close(); +} diff --git a/website/.spelling b/website/.spelling index 28701817f362..cf6abf244670 100644 --- a/website/.spelling +++ b/website/.spelling @@ -2646,3 +2646,15 @@ nginx - ../docs/development/extensions-core/s3.md NIO + +- ../docs/ingestion/kafka-share-group-ingestion.md +KIP +KIP-932 +appenderator +isPersistRequired +ACK +rebalancing +deduplication +Testcontainers +30-60s +E2E