diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java index d2027ebf6aaa..e509d91856d2 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java @@ -23,6 +23,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaCreate; import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.kafka.common.TopicPartition; @@ -72,4 +73,17 @@ public static KafkaSourceDescriptor of( startReadTime, bootstrapServers); } + + @SchemaCreate + @SuppressWarnings("all") + // TODO(BEAM-10677): Remove this function after AutoValueSchema is fixed. + static KafkaSourceDescriptor create( + String topic, + Integer partition, + Long start_read_offset, + Instant start_read_time, + List bootstrap_servers) { + return new AutoValue_KafkaSourceDescriptor( + topic, partition, start_read_offset, start_read_time, bootstrap_servers); + } }