diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java index b09fa8f69b3e..4ab932fc5d83 100644 --- a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java @@ -37,11 +37,12 @@ public void testReadRowsBuilderAndGetters() { Map hadoopConfig = new HashMap<>(); hadoopConfig.put("fs.defaultFS", "file:///"); - ReadRows readRows = DeltaIO.readRows() - .from(tablePath) - .withVersion(version) - .withTimestamp(timestamp) - .withConfig(hadoopConfig); + ReadRows readRows = + DeltaIO.readRows() + .from(tablePath) + .withVersion(version) + .withTimestamp(timestamp) + .withConfig(hadoopConfig); Assert.assertEquals(tablePath, readRows.getTablePath()); Assert.assertEquals(Long.valueOf(version), readRows.getVersion()); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java index ac6650c354d4..50f6a671aa80 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java @@ -25,6 +25,8 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; @@ -51,22 +53,31 @@ public class KafkaCommitOffset extends PTransform< PCollection>>, PCollection> { + private final KafkaIO.ReadSourceDescriptors readSourceDescriptors; private final boolean use259implementation; KafkaCommitOffset( KafkaIO.ReadSourceDescriptors readSourceDescriptors, boolean use259implementation) { + this.readSourceDescriptors = readSourceDescriptors; this.use259implementation = use259implementation; } static class CommitOffsetDoFn extends DoFn, Void> { + private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class); + + private final Counter commitFailures = + Metrics.counter(CommitOffsetDoFn.class, "commit-failures"); + private final Map consumerConfig; + private final SerializableFunction, Consumer> consumerFactoryFn; CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) { + consumerConfig = readSourceDescriptors.getConsumerConfig(); consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn(); } @@ -76,15 +87,23 @@ static class CommitOffsetDoFn extends DoFn, Void @RequiresStableInput @ProcessElement public void processElement(@Element KV element) { + Map updatedConsumerConfig = overrideBootstrapServersConfig(consumerConfig, element.getKey()); + try (Consumer consumer = consumerFactoryFn.apply(updatedConsumerConfig)) { + try { + consumer.commitSync( Collections.singletonMap( element.getKey().getTopicPartition(), new OffsetAndMetadata(element.getValue() + 1))); + } catch (Exception e) { + + commitFailures.inc(); + // TODO: consider retrying. LOG.warn("Getting exception when committing offset: {}", e.getMessage()); } @@ -93,29 +112,37 @@ public void processElement(@Element KV element) { private Map overrideBootstrapServersConfig( Map currentConfig, KafkaSourceDescriptor description) { + checkState( currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) || description.getBootStrapServers() != null); + Map config = new HashMap<>(currentConfig); + if (description.getBootStrapServers() != null && !description.getBootStrapServers().isEmpty()) { + config.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", description.getBootStrapServers())); } + return config; } } private static final class MaxOffsetFn extends DoFn>, KV> { + private static class OffsetAndTimestamp { + OffsetAndTimestamp(long offset, Instant timestamp) { this.offset = offset; this.timestamp = timestamp; } void merge(long offset, Instant timestamp) { + if (this.offset < offset) { this.offset = offset; this.timestamp = timestamp; @@ -130,6 +157,7 @@ void merge(long offset, Instant timestamp) { @StartBundle public void startBundle() { + if (maxObserved == null) { maxObserved = new HashMap<>(); } else { @@ -143,13 +171,16 @@ public void startBundle() { public void processElement( @Element KV> element, @Timestamp Instant timestamp) { + maxObserved.compute( element.getKey(), (k, v) -> { long offset = element.getValue().getOffset(); + if (v == null) { return new OffsetAndTimestamp(offset, timestamp); } + v.merge(offset, timestamp); return v; }); @@ -158,6 +189,7 @@ public void processElement( @FinishBundle @SuppressWarnings("nullness") // startBundle guaranteed to initialize public void finishBundle(FinishBundleContext context) { + maxObserved.forEach( (k, v) -> context.output(KV.of(k, v.offset), v.timestamp, GlobalWindow.INSTANCE)); } @@ -165,19 +197,25 @@ public void finishBundle(FinishBundleContext context) { @Override public PCollection expand(PCollection>> input) { + try { + PCollection> offsets; + if (use259implementation) { + offsets = input.apply( MapElements.into(new TypeDescriptor>() {}) .via(element -> KV.of(element.getKey(), element.getValue().getOffset()))); + } else { + // Reduce the amount of data to combine by calculating a max within the generally dense - // bundles of reading - // from a Kafka partition. + // bundles of reading from a Kafka partition. offsets = input.apply(ParDo.of(new MaxOffsetFn<>())); } + return offsets .setCoder( KvCoder.of( @@ -190,7 +228,9 @@ public PCollection expand(PCollection