Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
import org.apache.beam.sdk.values.Row;

@AutoValue
public abstract class ErrorHandling implements Serializable {
@SchemaFieldDescription("The name of the output PCollection containing failed writes.")
@SchemaFieldNumber("0")
public abstract String getOutput();

public static Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -2202,8 +2203,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
* generating Rows.
*/
static class KafkaHeader {

@SchemaFieldNumber("0")
String key;

@SchemaFieldNumber("1")
byte @Nullable [] value;

@SchemaCreate
Expand All @@ -2222,15 +2225,32 @@ public KafkaHeader(String key, byte @Nullable [] value) {
* Schema inference supports generics.
*/
static class ByteArrayKafkaRecord {

@SchemaFieldNumber("0")
String topic;

@SchemaFieldNumber("1")
int partition;

@SchemaFieldNumber("2")
long offset;

@SchemaFieldNumber("3")
long timestamp;

@SchemaFieldNumber("4")
byte @Nullable [] key;

@SchemaFieldNumber("5")
byte @Nullable [] value;
@Nullable List<KafkaHeader> headers;

@SchemaFieldNumber("6")
@Nullable
List<KafkaHeader> headers;

@SchemaFieldNumber("7")
int timestampTypeId;

@SchemaFieldNumber("8")
String timestampTypeName;

@SchemaCreate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;

Expand Down Expand Up @@ -98,16 +99,20 @@ public static Builder builder() {
+ " Kafka cluster. The client will make use of all servers irrespective of which servers are specified"
+ " here for bootstrapping—this list only impacts the initial hosts used to discover the full set"
+ " of servers. This list should be in the form `host1:port1,host2:port2,...`")
@SchemaFieldNumber("0")
public abstract String getBootstrapServers();

@SchemaFieldNumber("1")
@Nullable
public abstract String getConfluentSchemaRegistryUrl();

@SchemaFieldDescription(
"The encoding format for the data stored in Kafka. Valid options are: " + VALID_FORMATS_STR)
@SchemaFieldNumber("2")
@Nullable
public abstract String getFormat();

@SchemaFieldNumber("3")
@Nullable
public abstract String getConfluentSchemaRegistrySubject();

Expand All @@ -118,18 +123,21 @@ public static Builder builder() {
+ "For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/). "
+ "If a URL to Confluent Schema Registry is provided, then this field is ignored, and the schema "
+ "is fetched from Confluent Schema Registry.")
@SchemaFieldNumber("4")
@Nullable
public abstract String getSchema();

@SchemaFieldDescription(
"The path to the Protocol Buffer File Descriptor Set file. This file is used for schema"
+ " definition and message serialization.")
@SchemaFieldNumber("5")
@Nullable
public abstract String getFileDescriptorPath();

@SchemaFieldDescription(
"The name of the Protocol Buffer message to be used for schema"
+ " extraction and data conversion.")
@SchemaFieldNumber("6")
@Nullable
public abstract String getMessageName();

Expand All @@ -138,6 +146,7 @@ public static Builder builder() {
+ " does not exist any more on the server. (1) earliest: automatically reset the offset to the earliest"
+ " offset. (2) latest: automatically reset the offset to the latest offset"
+ " (3) none: throw exception to the consumer if no previous offset is found for the consumer’s group")
@SchemaFieldNumber("7")
@Nullable
public abstract String getAutoOffsetResetConfig();

Expand All @@ -146,17 +155,21 @@ public static Builder builder() {
+ " Most of these configurations will not be needed, but if you need to customize your Kafka consumer,"
+ " you may use this. See a detailed list:"
+ " https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html")
@SchemaFieldNumber("8")
@Nullable
public abstract Map<String, String> getConsumerConfigUpdates();

/** Sets the topic from which to read. */
@SchemaFieldNumber("9")
public abstract String getTopic();

@SchemaFieldDescription("Upper bound of how long to read from Kafka.")
@SchemaFieldNumber("10")
@Nullable
public abstract Integer getMaxReadTimeSeconds();

@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
@SchemaFieldNumber("11")
@Nullable
public abstract ErrorHandling getErrorHandling();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.kafka.common.TopicPartition;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -38,30 +39,37 @@
@AutoValue
public abstract class KafkaSourceDescriptor implements Serializable {
@SchemaFieldName("topic")
@SchemaFieldNumber("0")
@Pure
abstract String getTopic();

@SchemaFieldName("partition")
@SchemaFieldNumber("1")
@Pure
abstract Integer getPartition();

@SchemaFieldName("start_read_offset")
@SchemaFieldNumber("2")
@Pure
abstract @Nullable Long getStartReadOffset();

@SchemaFieldName("start_read_time")
@SchemaFieldNumber("3")
@Pure
abstract @Nullable Instant getStartReadTime();

@SchemaFieldName("stop_read_offset")
@SchemaFieldNumber("4")
@Pure
abstract @Nullable Long getStopReadOffset();

@SchemaFieldName("stop_read_time")
@SchemaFieldNumber("5")
@Pure
abstract @Nullable Instant getStopReadTime();

@SchemaFieldName("bootstrap_servers")
@SchemaFieldNumber("6")
@Pure
abstract @Nullable List<String> getBootStrapServers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
Expand Down Expand Up @@ -339,41 +340,49 @@ public abstract static class KafkaWriteSchemaTransformConfiguration implements S
@SchemaFieldDescription(
"The encoding format for the data stored in Kafka. Valid options are: "
+ SUPPORTED_FORMATS_STR)
@SchemaFieldNumber("0")
public abstract String getFormat();

@SchemaFieldNumber("1")
public abstract String getTopic();

@SchemaFieldDescription(
"A list of host/port pairs to use for establishing the initial connection to the"
+ " Kafka cluster. The client will make use of all servers irrespective of which servers are specified"
+ " here for bootstrapping—this list only impacts the initial hosts used to discover the full set"
+ " of servers. | Format: host1:port1,host2:port2,...")
@SchemaFieldNumber("2")
public abstract String getBootstrapServers();

@SchemaFieldDescription(
"A list of key-value pairs that act as configuration parameters for Kafka producers."
+ " Most of these configurations will not be needed, but if you need to customize your Kafka producer,"
+ " you may use this. See a detailed list:"
+ " https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html")
@SchemaFieldNumber("3")
@Nullable
public abstract Map<String, String> getProducerConfigUpdates();

@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
@SchemaFieldNumber("4")
@Nullable
public abstract ErrorHandling getErrorHandling();

@SchemaFieldDescription(
"The path to the Protocol Buffer File Descriptor Set file. This file is used for schema"
+ " definition and message serialization.")
@SchemaFieldNumber("5")
@Nullable
public abstract String getFileDescriptorPath();

@SchemaFieldDescription(
"The name of the Protocol Buffer message to be used for schema"
+ " extraction and data conversion.")
@SchemaFieldNumber("6")
@Nullable
public abstract String getMessageName();

@SchemaFieldNumber("7")
@Nullable
public abstract String getSchema();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -2450,6 +2453,62 @@ public void testWithValidConsumerPollingTimeout() {
assertEquals(15, reader.getConsumerPollingTimeout());
}

// This test verifies that the schema for KafkaIO.ByteArrayKafkaRecord is correctly generated.
// This schema is used when Kafka records are serialized/deserialized with SchemaCoder.
@Test
public void testByteArrayKafkaRecordSchema() throws NoSuchSchemaException {
Schema schema = SchemaRegistry.createDefault().getSchema(KafkaIO.ByteArrayKafkaRecord.class);

assertEquals(9, schema.getFieldCount());
assertEquals(Schema.Field.of("topic", Schema.FieldType.STRING), schema.getField(0));
assertEquals(Schema.Field.of("partition", Schema.FieldType.INT32), schema.getField(1));
assertEquals(Schema.Field.of("offset", Schema.FieldType.INT64), schema.getField(2));
assertEquals(Schema.Field.of("timestamp", Schema.FieldType.INT64), schema.getField(3));
assertEquals(Schema.Field.nullable("key", Schema.FieldType.BYTES), schema.getField(4));
assertEquals(Schema.Field.nullable("value", Schema.FieldType.BYTES), schema.getField(5));
assertEquals(
Schema.Field.nullable(
"headers",
Schema.FieldType.array(
Schema.FieldType.row(
Schema.of(
Schema.Field.of("key", Schema.FieldType.STRING),
Schema.Field.nullable("value", Schema.FieldType.BYTES))))),
schema.getField(6));
assertEquals(Schema.Field.of("timestampTypeId", Schema.FieldType.INT32), schema.getField(7));
assertEquals(Schema.Field.of("timestampTypeName", Schema.FieldType.STRING), schema.getField(8));
}

// This test verifies that the schema for KafkaSourceDescriptor is correctly generated.
@Test
public void testKafkaSourceDescriptorSchema() throws NoSuchSchemaException {
Schema schema = SchemaRegistry.createDefault().getSchema(KafkaSourceDescriptor.class);

assertEquals(7, schema.getFieldCount());
assertEquals(Schema.Field.of("topic", Schema.FieldType.STRING), schema.getField(0));
assertEquals(Schema.Field.of("partition", Schema.FieldType.INT32), schema.getField(1));
assertEquals(
Schema.Field.nullable("start_read_offset", Schema.FieldType.INT64), schema.getField(2));
assertEquals(
Schema.Field.nullable("start_read_time", Schema.FieldType.DATETIME), schema.getField(3));
assertEquals(
Schema.Field.nullable("stop_read_offset", Schema.FieldType.INT64), schema.getField(4));
assertEquals(
Schema.Field.nullable("stop_read_time", Schema.FieldType.DATETIME), schema.getField(5));
assertEquals(
Schema.Field.nullable("bootstrap_servers", Schema.FieldType.array(Schema.FieldType.STRING)),
schema.getField(6));
}

@Test
public void testKafkaHeaderSchema() throws NoSuchSchemaException {
Schema schema = SchemaRegistry.createDefault().getSchema(KafkaIO.KafkaHeader.class);

assertEquals(2, schema.getFieldCount());
assertEquals(Schema.Field.of("key", Schema.FieldType.STRING), schema.getField(0));
assertEquals(Schema.Field.nullable("value", Schema.FieldType.BYTES), schema.getField(1));
}

private static void verifyProducerRecords(
MockProducer<Integer, Long> mockProducer,
String topic,
Expand Down
Loading
Loading