From cc4d3b612c4cd51ae76763ff5b5b158a13a2e667 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 27 Apr 2020 15:07:52 -0700 Subject: [PATCH 1/3] [SPARK-27340][SS] Alias on TimeWindow expression cause watermark metadata lost Credit to LiangchangZ, this PR reuses the UT as well as integrate test in #24457. Thanks Liangchang for your solid work. Make metadata propagatable between Aliases. In Structured Streaming, we added an Alias for TimeWindow by default. https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3272-L3273 For some cases like stream join with watermark and window, users need to add an alias for convenience(we also added one in StreamingJoinSuite). The current metadata handling logic for `as` will lose the watermark metadata https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L1049-L1054 and finally cause the AnalysisException: ``` Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition ``` Bugfix for an alias on time window with watermark. New UTs added. One for the functionality and one for explaining the common scenario. Closes #28326 from xuanyuanking/SPARK-27340. Authored-by: Yuanjian Li Signed-off-by: Dongjoon Hyun (cherry picked from commit ba7adc494923de8104ab37d412edd78afe540f45) Signed-off-by: Dongjoon Hyun (cherry picked from commit d2724828dfdca9d3e45590078ab525bd73b3c17e) Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/sql/Column.scala | 14 +--------- .../streaming/EventTimeWatermarkSuite.scala | 11 ++++++++ .../sql/streaming/StreamingJoinSuite.scala | 26 +++++++++++++++++++ 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index a046127c3edb4..e046a15d1137d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -948,9 +948,6 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".as("colB")) * }}} * - * If the current column has metadata associated with it, this metadata will be propagated - * to the new column. If this not desired, use `as` with explicitly empty metadata. - * * @group expr_ops * @since 1.3.0 */ @@ -987,9 +984,6 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".as('colB)) * }}} * - * If the current column has metadata associated with it, this metadata will be propagated - * to the new column. If this not desired, use `as` with explicitly empty metadata. - * * @group expr_ops * @since 1.3.0 */ @@ -1016,17 +1010,11 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".name("colB")) * }}} * - * If the current column has metadata associated with it, this metadata will be propagated - * to the new column. If this not desired, use `as` with explicitly empty metadata. - * * @group expr_ops * @since 2.0.0 */ def name(alias: String): Column = withExpr { - expr match { - case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata)) - case other => Alias(other, alias)() - } + Alias(expr, alias)() } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 091b9a10205d6..120d3d38ca9ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -590,6 +590,17 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } } + test("SPARK-27340 Alias on TimeWindow expression cause watermark metadata lost") { + val inputData = MemoryStream[Int] + val aliasWindow = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select(window($"eventTime", "5 seconds") as 'aliasWindow) + // Check the eventTime metadata is kept in the top level alias. + assert(aliasWindow.logicalPlan.output.exists( + _.metadata.contains(EventTimeWatermark.delayKey))) + } + test("test no-data flag") { val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 42fe9f34ee3ec..7fab3461a910a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -712,5 +712,31 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with assertNumStateRows(total = 2, updated = 2) ) } + + test("SPARK-27340 Windowed left out join with Alias on TimeWindow") { + val (leftInput, df1) = setupStream("left", 2) + val (rightInput, df2) = setupStream("right", 3) + val left = df1.select('key, window('leftTime, "10 second") as 'leftWindow, 'leftValue) + val right = df2.select('key, window('rightTime, "10 second") as 'rightWindow, 'rightValue) + val joined = left.join( + right, + left("key") === right("key") && left("leftWindow") === right("rightWindow"), + "left_outer") + .select(left("key"), $"leftWindow.end".cast("long"), 'leftValue, 'rightValue) + + testStream(joined)( + // Test inner part of the join. + MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7), + CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)), + + MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls + CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)), + assertNumStateRows(total = 2, updated = 12), + + AddData(leftInput, 22), + CheckNewAnswer(Row(22, 30, 44, 66)), + assertNumStateRows(total = 3, updated = 1) + ) + } } From bb0925457c67294eeeffc63563435b48626d3f0d Mon Sep 17 00:00:00 2001 From: zhuliangchang Date: Mon, 27 Apr 2020 16:05:35 -0700 Subject: [PATCH 2/3] Add zhuliangchang From b6e410b68557b46324a3553c0777e9efe4694c98 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 27 Apr 2020 17:27:42 -0700 Subject: [PATCH 3/3] Address comments --- .../src/main/scala/org/apache/spark/sql/Column.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index e046a15d1137d..9c1d4da11ea75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -948,6 +948,9 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".as("colB")) * }}} * + * If the current column has metadata associated with it, this metadata will be propagated + * to the new column. If this not desired, use `as` with explicitly empty metadata. + * * @group expr_ops * @since 1.3.0 */ @@ -984,6 +987,9 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".as('colB)) * }}} * + * If the current column has metadata associated with it, this metadata will be propagated + * to the new column. If this not desired, use `as` with explicitly empty metadata. + * * @group expr_ops * @since 1.3.0 */ @@ -1010,6 +1016,9 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".name("colB")) * }}} * + * If the current column has metadata associated with it, this metadata will be propagated + * to the new column. If this not desired, use `as` with explicitly empty metadata. + * * @group expr_ops * @since 2.0.0 */