From 8cc5e835b209d5796b044978ec4221ee22a8b9d2 Mon Sep 17 00:00:00 2001 From: frreiss Date: Thu, 8 Sep 2016 12:59:15 -0700 Subject: [PATCH 1/5] Added purge() call to scheduler --- .../apache/spark/sql/execution/streaming/MetadataLog.scala | 1 + .../spark/sql/execution/streaming/StreamExecution.scala | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index 78d6be17df05a..8de77025b1736 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming * - Allow the user to query the latest batch id. * - Allow the user to query the metadata object of a specified batch id. * - Allow the user to query metadata objects in a range of batch ids. + * - Allow the user to remove obsolete metdata */ trait MetadataLog[T] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5e1e5eeb50936..1c3c342187208 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -290,6 +290,12 @@ class StreamExecution( assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") + + // Now that we have logged the new batch, no further processing will happen for + // the previous batch, and it is safe to discard the old metadata. + // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in + // flight at the same time), this cleanup logic will need to change. + offsetLog.purge(currentBatchId - 1) } else { awaitBatchLock.lock() try { From d71366d958334ebbc81e45c7f469bad2a68d0a2d Mon Sep 17 00:00:00 2001 From: frreiss Date: Fri, 9 Sep 2016 21:23:58 -0700 Subject: [PATCH 2/5] Added test case and corrected off-by-one error. --- .../execution/streaming/StreamExecution.scala | 2 +- .../sql/streaming/StreamingQuerySuite.scala | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 1c3c342187208..c7f8cef409462 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -295,7 +295,7 @@ class StreamExecution( // the previous batch, and it is safe to discard the old metadata. // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in // flight at the same time), this cleanup logic will need to change. - offsetLog.purge(currentBatchId - 1) + offsetLog.purge(currentBatchId) } else { awaitBatchLock.lock() try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9d58315c20031..879f993cf06f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -125,6 +125,32 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbarge collection") { + val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map(6 / _) + + // Run a few batches through the application + testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + // Three batches have run, but only one set of metadata should be present + AssertOnQuery( + q => { + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) + val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 + toTest.size == 1 && toTest.head == "2" + true + } + ) + ) + } + /** * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`. * From 6b64d5cd222c70071faa7aebd8db191d5e1c0185 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Sat, 17 Sep 2016 00:16:28 -0700 Subject: [PATCH 3/5] Fix test case. --- .../sql/execution/streaming/MetadataLog.scala | 2 +- .../sql/streaming/StreamingQuerySuite.scala | 18 ++++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index 8de77025b1736..9e2604c9c069f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -24,7 +24,7 @@ package org.apache.spark.sql.execution.streaming * - Allow the user to query the latest batch id. * - Allow the user to query the metadata object of a specified batch id. * - Allow the user to query metadata objects in a range of batch ids. - * - Allow the user to remove obsolete metdata + * - Allow the user to remove obsolete metadata */ trait MetadataLog[T] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 879f993cf06f2..ffa01a28351b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -125,7 +125,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } - testQuietly("StreamExecution metadata garbarge collection") { + testQuietly("StreamExecution metadata garbage collection") { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map(6 / _) @@ -139,15 +139,13 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { CheckAnswer(6, 3, 6, 3, 1, 1), // Three batches have run, but only one set of metadata should be present - AssertOnQuery( - q => { - val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) - val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) - val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 - toTest.size == 1 && toTest.head == "2" - true - } - ) + AssertOnQuery("metadata log should contain only one file") { streamExecution => + val metadataLogDir = new java.io.File(streamExecution.offsetLog.metadataPath.toString) + val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) + val toTest = logFileNames.filter(!_.endsWith(".crc")) // Workaround for SPARK-17475 + assert(toTest.size == 1 && toTest.head == "2") + true + } ) } From f7113030551d02719b4f5b681770ae2536b3aed5 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Sat, 17 Sep 2016 00:22:39 -0700 Subject: [PATCH 4/5] Made comment more clear --- .../spark/sql/streaming/StreamingQuerySuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index ffa01a28351b6..d3e2cab1b8bd3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -129,7 +129,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map(6 / _) - // Run a few batches through the application + // Run 3 batches, and then assert that only 1 metadata file is left at the end + // since the first 2 should have been purged. testStream(mapped)( AddData(inputData, 1, 2), CheckAnswer(6, 3), @@ -138,11 +139,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { AddData(inputData, 4, 6), CheckAnswer(6, 3, 6, 3, 1, 1), - // Three batches have run, but only one set of metadata should be present - AssertOnQuery("metadata log should contain only one file") { streamExecution => - val metadataLogDir = new java.io.File(streamExecution.offsetLog.metadataPath.toString) + AssertOnQuery("metadata log should contain only one file") { q => + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) - val toTest = logFileNames.filter(!_.endsWith(".crc")) // Workaround for SPARK-17475 + val toTest = logFileNames // Workaround for SPARK-17475 assert(toTest.size == 1 && toTest.head == "2") true } From 458ed6f11376510dae63fb0e3bf7455f72de0337 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Sat, 17 Sep 2016 00:24:29 -0700 Subject: [PATCH 5/5] More comment. --- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index a3128f10aa02e..220f77dc24ce0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -293,6 +293,7 @@ class StreamExecution( // Now that we have logged the new batch, no further processing will happen for // the previous batch, and it is safe to discard the old metadata. + // Note that purge is exclusive, i.e. it purges everything before currentBatchId. // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in // flight at the same time), this cleanup logic will need to change. offsetLog.purge(currentBatchId)