From 15bb463df0091f4248ab9ac62b3f554c6bedd973 Mon Sep 17 00:00:00 2001 From: Minbo Bae Date: Fri, 26 Sep 2025 01:04:42 -0700 Subject: [PATCH] Enforce deterministic field order in Schema generated from KafkaIO classes. To prevent potential serialization issues, this change adds the `@SchemaFieldNumber` annotation to KafkaIO classes that use `DefaultSchema`. This guarantees a deterministic field order, which is not ensured when schemas are generated with `AutoValueSchema`, `JavaBeanSchema`, or `JavaFieldSchema`. --- .../transforms/providers/ErrorHandling.java | 2 + .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 26 +++++- ...KafkaReadSchemaTransformConfiguration.java | 13 +++ .../sdk/io/kafka/KafkaSourceDescriptor.java | 8 ++ .../KafkaWriteSchemaTransformProvider.java | 9 ++ .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 59 +++++++++++++ .../KafkaReadSchemaTransformProviderTest.java | 87 +++++++++++++++++++ ...KafkaWriteSchemaTransformProviderTest.java | 66 ++++++++++++++ 8 files changed, 267 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java index 053521dbfb39..111defb85b0e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java @@ -22,11 +22,13 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.values.Row; @AutoValue public abstract class ErrorHandling implements Serializable { @SchemaFieldDescription("The name of the output PCollection containing failed writes.") + @SchemaFieldNumber("0") public abstract String getOutput(); public static Builder builder() { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 045a74a8507e..5514d95d89b0 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -70,6 +70,7 @@ import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaCreate; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.transforms.Convert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -2202,8 +2203,10 @@ public void populateDisplayData(DisplayData.Builder builder) { * generating Rows. */ static class KafkaHeader { - + @SchemaFieldNumber("0") String key; + + @SchemaFieldNumber("1") byte @Nullable [] value; @SchemaCreate @@ -2222,15 +2225,32 @@ public KafkaHeader(String key, byte @Nullable [] value) { * Schema inference supports generics. */ static class ByteArrayKafkaRecord { - + @SchemaFieldNumber("0") String topic; + + @SchemaFieldNumber("1") int partition; + + @SchemaFieldNumber("2") long offset; + + @SchemaFieldNumber("3") long timestamp; + + @SchemaFieldNumber("4") byte @Nullable [] key; + + @SchemaFieldNumber("5") byte @Nullable [] value; - @Nullable List headers; + + @SchemaFieldNumber("6") + @Nullable + List headers; + + @SchemaFieldNumber("7") int timestampTypeId; + + @SchemaFieldNumber("8") String timestampTypeName; @SchemaCreate diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java index 47e0b2a9aca5..50e113f510eb 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @@ -98,16 +99,20 @@ public static Builder builder() { + " Kafka cluster. The client will make use of all servers irrespective of which servers are specified" + " here for bootstrapping—this list only impacts the initial hosts used to discover the full set" + " of servers. This list should be in the form `host1:port1,host2:port2,...`") + @SchemaFieldNumber("0") public abstract String getBootstrapServers(); + @SchemaFieldNumber("1") @Nullable public abstract String getConfluentSchemaRegistryUrl(); @SchemaFieldDescription( "The encoding format for the data stored in Kafka. Valid options are: " + VALID_FORMATS_STR) + @SchemaFieldNumber("2") @Nullable public abstract String getFormat(); + @SchemaFieldNumber("3") @Nullable public abstract String getConfluentSchemaRegistrySubject(); @@ -118,18 +123,21 @@ public static Builder builder() { + "For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/). " + "If a URL to Confluent Schema Registry is provided, then this field is ignored, and the schema " + "is fetched from Confluent Schema Registry.") + @SchemaFieldNumber("4") @Nullable public abstract String getSchema(); @SchemaFieldDescription( "The path to the Protocol Buffer File Descriptor Set file. This file is used for schema" + " definition and message serialization.") + @SchemaFieldNumber("5") @Nullable public abstract String getFileDescriptorPath(); @SchemaFieldDescription( "The name of the Protocol Buffer message to be used for schema" + " extraction and data conversion.") + @SchemaFieldNumber("6") @Nullable public abstract String getMessageName(); @@ -138,6 +146,7 @@ public static Builder builder() { + " does not exist any more on the server. (1) earliest: automatically reset the offset to the earliest" + " offset. (2) latest: automatically reset the offset to the latest offset" + " (3) none: throw exception to the consumer if no previous offset is found for the consumer’s group") + @SchemaFieldNumber("7") @Nullable public abstract String getAutoOffsetResetConfig(); @@ -146,17 +155,21 @@ public static Builder builder() { + " Most of these configurations will not be needed, but if you need to customize your Kafka consumer," + " you may use this. See a detailed list:" + " https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html") + @SchemaFieldNumber("8") @Nullable public abstract Map getConsumerConfigUpdates(); /** Sets the topic from which to read. */ + @SchemaFieldNumber("9") public abstract String getTopic(); @SchemaFieldDescription("Upper bound of how long to read from Kafka.") + @SchemaFieldNumber("10") @Nullable public abstract Integer getMaxReadTimeSeconds(); @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.") + @SchemaFieldNumber("11") @Nullable public abstract ErrorHandling getErrorHandling(); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java index d0d411c2fe27..67ee7a657833 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaCreate; import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.kafka.common.TopicPartition; import org.checkerframework.checker.nullness.qual.Nullable; @@ -38,30 +39,37 @@ @AutoValue public abstract class KafkaSourceDescriptor implements Serializable { @SchemaFieldName("topic") + @SchemaFieldNumber("0") @Pure abstract String getTopic(); @SchemaFieldName("partition") + @SchemaFieldNumber("1") @Pure abstract Integer getPartition(); @SchemaFieldName("start_read_offset") + @SchemaFieldNumber("2") @Pure abstract @Nullable Long getStartReadOffset(); @SchemaFieldName("start_read_time") + @SchemaFieldNumber("3") @Pure abstract @Nullable Instant getStartReadTime(); @SchemaFieldName("stop_read_offset") + @SchemaFieldNumber("4") @Pure abstract @Nullable Long getStopReadOffset(); @SchemaFieldName("stop_read_time") + @SchemaFieldNumber("5") @Pure abstract @Nullable Instant getStopReadTime(); @SchemaFieldName("bootstrap_servers") + @SchemaFieldNumber("6") @Pure abstract @Nullable List getBootStrapServers(); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index e2a4f394ccdb..b9c41746240a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -44,6 +44,7 @@ import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -339,8 +340,10 @@ public abstract static class KafkaWriteSchemaTransformConfiguration implements S @SchemaFieldDescription( "The encoding format for the data stored in Kafka. Valid options are: " + SUPPORTED_FORMATS_STR) + @SchemaFieldNumber("0") public abstract String getFormat(); + @SchemaFieldNumber("1") public abstract String getTopic(); @SchemaFieldDescription( @@ -348,6 +351,7 @@ public abstract static class KafkaWriteSchemaTransformConfiguration implements S + " Kafka cluster. The client will make use of all servers irrespective of which servers are specified" + " here for bootstrapping—this list only impacts the initial hosts used to discover the full set" + " of servers. | Format: host1:port1,host2:port2,...") + @SchemaFieldNumber("2") public abstract String getBootstrapServers(); @SchemaFieldDescription( @@ -355,25 +359,30 @@ public abstract static class KafkaWriteSchemaTransformConfiguration implements S + " Most of these configurations will not be needed, but if you need to customize your Kafka producer," + " you may use this. See a detailed list:" + " https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html") + @SchemaFieldNumber("3") @Nullable public abstract Map getProducerConfigUpdates(); @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.") + @SchemaFieldNumber("4") @Nullable public abstract ErrorHandling getErrorHandling(); @SchemaFieldDescription( "The path to the Protocol Buffer File Descriptor Set file. This file is used for schema" + " definition and message serialization.") + @SchemaFieldNumber("5") @Nullable public abstract String getFileDescriptorPath(); @SchemaFieldDescription( "The name of the Protocol Buffer message to be used for schema" + " extraction and data conversion.") + @SchemaFieldNumber("6") @Nullable public abstract String getMessageName(); + @SchemaFieldNumber("7") @Nullable public abstract String getSchema(); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 83c2e1b38826..65d8d8d024e2 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -91,6 +91,9 @@ import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -2450,6 +2453,62 @@ public void testWithValidConsumerPollingTimeout() { assertEquals(15, reader.getConsumerPollingTimeout()); } + // This test verifies that the schema for KafkaIO.ByteArrayKafkaRecord is correctly generated. + // This schema is used when Kafka records are serialized/deserialized with SchemaCoder. + @Test + public void testByteArrayKafkaRecordSchema() throws NoSuchSchemaException { + Schema schema = SchemaRegistry.createDefault().getSchema(KafkaIO.ByteArrayKafkaRecord.class); + + assertEquals(9, schema.getFieldCount()); + assertEquals(Schema.Field.of("topic", Schema.FieldType.STRING), schema.getField(0)); + assertEquals(Schema.Field.of("partition", Schema.FieldType.INT32), schema.getField(1)); + assertEquals(Schema.Field.of("offset", Schema.FieldType.INT64), schema.getField(2)); + assertEquals(Schema.Field.of("timestamp", Schema.FieldType.INT64), schema.getField(3)); + assertEquals(Schema.Field.nullable("key", Schema.FieldType.BYTES), schema.getField(4)); + assertEquals(Schema.Field.nullable("value", Schema.FieldType.BYTES), schema.getField(5)); + assertEquals( + Schema.Field.nullable( + "headers", + Schema.FieldType.array( + Schema.FieldType.row( + Schema.of( + Schema.Field.of("key", Schema.FieldType.STRING), + Schema.Field.nullable("value", Schema.FieldType.BYTES))))), + schema.getField(6)); + assertEquals(Schema.Field.of("timestampTypeId", Schema.FieldType.INT32), schema.getField(7)); + assertEquals(Schema.Field.of("timestampTypeName", Schema.FieldType.STRING), schema.getField(8)); + } + + // This test verifies that the schema for KafkaSourceDescriptor is correctly generated. + @Test + public void testKafkaSourceDescriptorSchema() throws NoSuchSchemaException { + Schema schema = SchemaRegistry.createDefault().getSchema(KafkaSourceDescriptor.class); + + assertEquals(7, schema.getFieldCount()); + assertEquals(Schema.Field.of("topic", Schema.FieldType.STRING), schema.getField(0)); + assertEquals(Schema.Field.of("partition", Schema.FieldType.INT32), schema.getField(1)); + assertEquals( + Schema.Field.nullable("start_read_offset", Schema.FieldType.INT64), schema.getField(2)); + assertEquals( + Schema.Field.nullable("start_read_time", Schema.FieldType.DATETIME), schema.getField(3)); + assertEquals( + Schema.Field.nullable("stop_read_offset", Schema.FieldType.INT64), schema.getField(4)); + assertEquals( + Schema.Field.nullable("stop_read_time", Schema.FieldType.DATETIME), schema.getField(5)); + assertEquals( + Schema.Field.nullable("bootstrap_servers", Schema.FieldType.array(Schema.FieldType.STRING)), + schema.getField(6)); + } + + @Test + public void testKafkaHeaderSchema() throws NoSuchSchemaException { + Schema schema = SchemaRegistry.createDefault().getSchema(KafkaIO.KafkaHeader.class); + + assertEquals(2, schema.getFieldCount()); + assertEquals(Schema.Field.of("key", Schema.FieldType.STRING), schema.getField(0)); + assertEquals(Schema.Field.nullable("value", Schema.FieldType.BYTES), schema.getField(1)); + } + private static void verifyProducerRecords( MockProducer mockProducer, String topic, diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index dc97dadf6e92..3470895b5d0d 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -30,6 +30,9 @@ import java.util.stream.StreamSupport; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.utils.YamlUtils; @@ -362,4 +365,88 @@ public void testBuildTransformWithManaged() { .expand(PBegin.in(Pipeline.create())); } } + + // This test verifies that the schema for KafkaReadSchemaTransformConfiguration is correctly + // generated. This schema is used when KafkaReadSchemaTransformConfiguration are + // serialized/deserialized with + // SchemaCoder. + @Test + public void testKafkaReadSchemaTransformConfigurationSchema() throws NoSuchSchemaException { + Schema schema = + SchemaRegistry.createDefault().getSchema(KafkaReadSchemaTransformConfiguration.class); + + assertEquals(12, schema.getFieldCount()); + + // Check field name, type, and nullability. Descriptions are not checked as they are not + // critical for serialization. + assertEquals( + Schema.Field.of("bootstrapServers", Schema.FieldType.STRING) + .withDescription(schema.getField(0).getDescription()), + schema.getField(0)); + + assertEquals( + Schema.Field.nullable("confluentSchemaRegistryUrl", Schema.FieldType.STRING) + .withDescription(schema.getField(1).getDescription()), + schema.getField(1)); + + assertEquals( + Schema.Field.nullable("format", Schema.FieldType.STRING) + .withDescription(schema.getField(2).getDescription()), + schema.getField(2)); + + assertEquals( + Schema.Field.nullable("confluentSchemaRegistrySubject", Schema.FieldType.STRING) + .withDescription(schema.getField(3).getDescription()), + schema.getField(3)); + + assertEquals( + Schema.Field.nullable("schema", Schema.FieldType.STRING) + .withDescription(schema.getField(4).getDescription()), + schema.getField(4)); + + assertEquals( + Schema.Field.nullable("fileDescriptorPath", Schema.FieldType.STRING) + .withDescription(schema.getField(5).getDescription()), + schema.getField(5)); + + assertEquals( + Schema.Field.nullable("messageName", Schema.FieldType.STRING) + .withDescription(schema.getField(6).getDescription()), + schema.getField(6)); + + assertEquals( + Schema.Field.nullable("autoOffsetResetConfig", Schema.FieldType.STRING) + .withDescription(schema.getField(7).getDescription()), + schema.getField(7)); + + assertEquals( + Schema.Field.nullable( + "consumerConfigUpdates", + Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING)) + .withDescription(schema.getField(8).getDescription()), + schema.getField(8)); + + assertEquals( + Schema.Field.of("topic", Schema.FieldType.STRING) + .withDescription(schema.getField(9).getDescription()), + schema.getField(9)); + + assertEquals( + Schema.Field.nullable("maxReadTimeSeconds", Schema.FieldType.INT32) + .withDescription(schema.getField(10).getDescription()), + schema.getField(10)); + + Schema actualRowSchemaForErrorHandling = schema.getField(11).getType().getRowSchema(); + + assertEquals( + Schema.Field.nullable( + "errorHandling", + Schema.FieldType.row( + Schema.of( + Schema.Field.of("output", Schema.FieldType.STRING) + .withDescription( + actualRowSchemaForErrorHandling.getField(0).getDescription())))) + .withDescription(schema.getField(11).getDescription()), + schema.getField(11)); + } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java index b63a9334239c..98cdb0636c2f 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.kafka; import static org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.getRowToRawBytesFunction; +import static org.junit.Assert.assertEquals; import java.io.UnsupportedEncodingException; import java.util.Arrays; @@ -35,7 +36,9 @@ import org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform.ErrorCounterFn; import org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform.GenericRecordErrorCounterFn; import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; import org.apache.beam.sdk.schemas.utils.JsonUtils; import org.apache.beam.sdk.schemas.utils.YamlUtils; @@ -267,4 +270,67 @@ public void testBuildTransformWithManaged() { .apply(Create.empty(Schema.builder().addByteArrayField("bytes").build()))); } } + + @Test + public void testKafkaWriteSchemaTransformConfigurationSchema() throws NoSuchSchemaException { + Schema schema = + SchemaRegistry.createDefault() + .getSchema( + KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransformConfiguration.class); + + System.out.println("schema = " + schema); + + assertEquals(8, schema.getFieldCount()); + + // Check field name, type, and nullability. Descriptions are not checked as they are not + // critical for serialization. + assertEquals( + Schema.Field.of("format", Schema.FieldType.STRING) + .withDescription(schema.getField(0).getDescription()), + schema.getField(0)); + + assertEquals( + Schema.Field.of("topic", Schema.FieldType.STRING) + .withDescription(schema.getField(1).getDescription()), + schema.getField(1)); + + assertEquals( + Schema.Field.of("bootstrapServers", Schema.FieldType.STRING) + .withDescription(schema.getField(2).getDescription()), + schema.getField(2)); + + assertEquals( + Schema.Field.nullable( + "producerConfigUpdates", + Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING)) + .withDescription(schema.getField(3).getDescription()), + schema.getField(3)); + + Schema actualRowSchemaForErrorHandling = schema.getField(4).getType().getRowSchema(); + assertEquals( + Schema.Field.nullable( + "errorHandling", + Schema.FieldType.row( + Schema.of( + Schema.Field.of("output", Schema.FieldType.STRING) + .withDescription( + actualRowSchemaForErrorHandling.getField(0).getDescription())))) + .withDescription(schema.getField(4).getDescription()), + schema.getField(4)); + + assertEquals( + Schema.Field.nullable("fileDescriptorPath", Schema.FieldType.STRING) + .withDescription(schema.getField(5).getDescription()), + schema.getField(5)); + + assertEquals( + Schema.Field.nullable("messageName", Schema.FieldType.STRING) + .withDescription(schema.getField(6).getDescription()), + schema.getField(6)); + + assertEquals( + Schema.Field.nullable("schema", Schema.FieldType.STRING) + .withDescription(schema.getField(7).getDescription()), + schema.getField(7)); + } }