From a6000f96f6306ce97649c74825c6f69d2a6cccf0 Mon Sep 17 00:00:00 2001 From: Warren Zhu Date: Sun, 21 Jun 2020 11:14:11 -0700 Subject: [PATCH] [SPARK-32044][SS] Kakfa continuous processing print mislead initial offsets log --- .../sql/kafka010/KafkaContinuousReader.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index 561d501359321..0fdb44dcee279 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} import java.util.concurrent.TimeoutException +import java.util.function.Supplier import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException} import org.apache.kafka.common.TopicPartition @@ -70,15 +71,17 @@ class KafkaContinuousReader( private var offset: Offset = _ override def setStartOffset(start: ju.Optional[Offset]): Unit = { - offset = start.orElse { - val offsets = initialOffsets match { - case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) - case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None)) - case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) + offset = start.orElseGet(new Supplier[Offset] { + override def get(): Offset = { + val offsets = initialOffsets match { + case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) + case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None)) + case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) + } + logInfo(s"Initial offsets: $offsets") + offsets } - logInfo(s"Initial offsets: $offsets") - offsets - } + }) } override def getStartOffset(): Offset = offset