From 71e76ba4c792cacf46fb2db69c7d1e1862d7827a Mon Sep 17 00:00:00 2001 From: Kriti-dev07 Date: Sat, 23 May 2026 12:32:50 +0530 Subject: [PATCH 1/3] Add metrics for Kafka offset commit failures --- .../beam/sdk/io/kafka/KafkaCommitOffset.java | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java index ac6650c354d4..1ffcc32da335 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java @@ -25,6 +25,8 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; @@ -62,7 +64,15 @@ public class KafkaCommitOffset static class CommitOffsetDoFn extends DoFn, Void> { private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class); + + private final Counter commitFailures = + Metrics.counter(CommitOffsetDoFn.class, "commit-failures"); + + private final Counter retriesExhausted = + Metrics.counter(CommitOffsetDoFn.class, "retries-exhausted"); + private final Map consumerConfig; + private final SerializableFunction, Consumer> consumerFactoryFn; @@ -76,16 +86,24 @@ static class CommitOffsetDoFn extends DoFn, Void @RequiresStableInput @ProcessElement public void processElement(@Element KV element) { + Map updatedConsumerConfig = overrideBootstrapServersConfig(consumerConfig, element.getKey()); + try (Consumer consumer = consumerFactoryFn.apply(updatedConsumerConfig)) { + try { consumer.commitSync( Collections.singletonMap( element.getKey().getTopicPartition(), new OffsetAndMetadata(element.getValue() + 1))); + } catch (Exception e) { - // TODO: consider retrying. + + commitFailures.inc(); + retriesExhausted.inc(); + + // TODO: consider retrying and increment retry-attempt metrics. LOG.warn("Getting exception when committing offset: {}", e.getMessage()); } } @@ -93,23 +111,30 @@ public void processElement(@Element KV element) { private Map overrideBootstrapServersConfig( Map currentConfig, KafkaSourceDescriptor description) { + checkState( currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) || description.getBootStrapServers() != null); + Map config = new HashMap<>(currentConfig); + if (description.getBootStrapServers() != null && !description.getBootStrapServers().isEmpty()) { + config.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", description.getBootStrapServers())); } + return config; } } private static final class MaxOffsetFn extends DoFn>, KV> { + private static class OffsetAndTimestamp { + OffsetAndTimestamp(long offset, Instant timestamp) { this.offset = offset; this.timestamp = timestamp; @@ -130,6 +155,7 @@ void merge(long offset, Instant timestamp) { @StartBundle public void startBundle() { + if (maxObserved == null) { maxObserved = new HashMap<>(); } else { @@ -143,13 +169,16 @@ public void startBundle() { public void processElement( @Element KV> element, @Timestamp Instant timestamp) { + maxObserved.compute( element.getKey(), (k, v) -> { long offset = element.getValue().getOffset(); + if (v == null) { return new OffsetAndTimestamp(offset, timestamp); } + v.merge(offset, timestamp); return v; }); @@ -158,6 +187,7 @@ public void processElement( @FinishBundle @SuppressWarnings("nullness") // startBundle guaranteed to initialize public void finishBundle(FinishBundleContext context) { + maxObserved.forEach( (k, v) -> context.output(KV.of(k, v.offset), v.timestamp, GlobalWindow.INSTANCE)); } @@ -165,19 +195,26 @@ public void finishBundle(FinishBundleContext context) { @Override public PCollection expand(PCollection>> input) { + try { + PCollection> offsets; + if (use259implementation) { + offsets = input.apply( MapElements.into(new TypeDescriptor>() {}) .via(element -> KV.of(element.getKey(), element.getValue().getOffset()))); + } else { + // Reduce the amount of data to combine by calculating a max within the generally dense // bundles of reading // from a Kafka partition. offsets = input.apply(ParDo.of(new MaxOffsetFn<>())); } + return offsets .setCoder( KvCoder.of( @@ -190,6 +227,7 @@ public PCollection expand(PCollection Date: Sat, 23 May 2026 12:49:06 +0530 Subject: [PATCH 2/3] Remove redundant retry exhaustion metric --- .../beam/sdk/io/kafka/KafkaCommitOffset.java | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java index 1ffcc32da335..ad3fafa5d339 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java @@ -53,30 +53,32 @@ public class KafkaCommitOffset extends PTransform< PCollection>>, PCollection> { + private final KafkaIO.ReadSourceDescriptors readSourceDescriptors; private final boolean use259implementation; KafkaCommitOffset( - KafkaIO.ReadSourceDescriptors readSourceDescriptors, boolean use259implementation) { + KafkaIO.ReadSourceDescriptors readSourceDescriptors, + boolean use259implementation) { + this.readSourceDescriptors = readSourceDescriptors; this.use259implementation = use259implementation; } static class CommitOffsetDoFn extends DoFn, Void> { + private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class); private final Counter commitFailures = Metrics.counter(CommitOffsetDoFn.class, "commit-failures"); - private final Counter retriesExhausted = - Metrics.counter(CommitOffsetDoFn.class, "retries-exhausted"); - private final Map consumerConfig; private final SerializableFunction, Consumer> consumerFactoryFn; CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) { + consumerConfig = readSourceDescriptors.getConsumerConfig(); consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn(); } @@ -93,6 +95,7 @@ public void processElement(@Element KV element) { try (Consumer consumer = consumerFactoryFn.apply(updatedConsumerConfig)) { try { + consumer.commitSync( Collections.singletonMap( element.getKey().getTopicPartition(), @@ -101,16 +104,16 @@ public void processElement(@Element KV element) { } catch (Exception e) { commitFailures.inc(); - retriesExhausted.inc(); - // TODO: consider retrying and increment retry-attempt metrics. + // TODO: consider retrying. LOG.warn("Getting exception when committing offset: {}", e.getMessage()); } } } private Map overrideBootstrapServersConfig( - Map currentConfig, KafkaSourceDescriptor description) { + Map currentConfig, + KafkaSourceDescriptor description) { checkState( currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) @@ -131,7 +134,9 @@ private Map overrideBootstrapServersConfig( } private static final class MaxOffsetFn - extends DoFn>, KV> { + extends DoFn< + KV>, + KV> { private static class OffsetAndTimestamp { @@ -141,6 +146,7 @@ private static class OffsetAndTimestamp { } void merge(long offset, Instant timestamp) { + if (this.offset < offset) { this.offset = offset; this.timestamp = timestamp; @@ -151,7 +157,8 @@ void merge(long offset, Instant timestamp) { Instant timestamp; } - private transient @MonotonicNonNull Map maxObserved; + private transient @MonotonicNonNull + Map maxObserved; @StartBundle public void startBundle() { @@ -194,7 +201,8 @@ public void finishBundle(FinishBundleContext context) { } @Override - public PCollection expand(PCollection>> input) { + public PCollection expand( + PCollection>> input) { try { @@ -204,14 +212,18 @@ public PCollection expand(PCollection>() {}) - .via(element -> KV.of(element.getKey(), element.getValue().getOffset()))); + MapElements.into( + new TypeDescriptor>() {}) + .via( + element -> + KV.of( + element.getKey(), + element.getValue().getOffset()))); } else { // Reduce the amount of data to combine by calculating a max within the generally dense - // bundles of reading - // from a Kafka partition. + // bundles of reading from a Kafka partition. offsets = input.apply(ParDo.of(new MaxOffsetFn<>())); } @@ -229,7 +241,8 @@ public PCollection expand(PCollection Date: Sat, 23 May 2026 13:40:09 +0530 Subject: [PATCH 3/3] apply spotless formatting --- .../apache/beam/sdk/io/delta/DeltaIOTest.java | 11 ++++---- .../beam/sdk/io/kafka/KafkaCommitOffset.java | 27 ++++++------------- 2 files changed, 14 insertions(+), 24 deletions(-) diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java index b09fa8f69b3e..4ab932fc5d83 100644 --- a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java @@ -37,11 +37,12 @@ public void testReadRowsBuilderAndGetters() { Map hadoopConfig = new HashMap<>(); hadoopConfig.put("fs.defaultFS", "file:///"); - ReadRows readRows = DeltaIO.readRows() - .from(tablePath) - .withVersion(version) - .withTimestamp(timestamp) - .withConfig(hadoopConfig); + ReadRows readRows = + DeltaIO.readRows() + .from(tablePath) + .withVersion(version) + .withTimestamp(timestamp) + .withConfig(hadoopConfig); Assert.assertEquals(tablePath, readRows.getTablePath()); Assert.assertEquals(Long.valueOf(version), readRows.getVersion()); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java index ad3fafa5d339..50f6a671aa80 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java @@ -58,8 +58,7 @@ public class KafkaCommitOffset private final boolean use259implementation; KafkaCommitOffset( - KafkaIO.ReadSourceDescriptors readSourceDescriptors, - boolean use259implementation) { + KafkaIO.ReadSourceDescriptors readSourceDescriptors, boolean use259implementation) { this.readSourceDescriptors = readSourceDescriptors; this.use259implementation = use259implementation; @@ -112,8 +111,7 @@ public void processElement(@Element KV element) { } private Map overrideBootstrapServersConfig( - Map currentConfig, - KafkaSourceDescriptor description) { + Map currentConfig, KafkaSourceDescriptor description) { checkState( currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) @@ -134,9 +132,7 @@ private Map overrideBootstrapServersConfig( } private static final class MaxOffsetFn - extends DoFn< - KV>, - KV> { + extends DoFn>, KV> { private static class OffsetAndTimestamp { @@ -157,8 +153,7 @@ void merge(long offset, Instant timestamp) { Instant timestamp; } - private transient @MonotonicNonNull - Map maxObserved; + private transient @MonotonicNonNull Map maxObserved; @StartBundle public void startBundle() { @@ -201,8 +196,7 @@ public void finishBundle(FinishBundleContext context) { } @Override - public PCollection expand( - PCollection>> input) { + public PCollection expand(PCollection>> input) { try { @@ -212,13 +206,8 @@ public PCollection expand( offsets = input.apply( - MapElements.into( - new TypeDescriptor>() {}) - .via( - element -> - KV.of( - element.getKey(), - element.getValue().getOffset()))); + MapElements.into(new TypeDescriptor>() {}) + .via(element -> KV.of(element.getKey(), element.getValue().getOffset()))); } else { @@ -245,4 +234,4 @@ public PCollection expand( throw new RuntimeException(e.getMessage()); } } -} \ No newline at end of file +}