Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,36 +43,51 @@ import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

object MemoryStream extends LowPriorityMemoryStreamImplicits {
object MemoryStream {
protected val currentBlockId = new AtomicInteger(0)
protected val memoryStreamId = new AtomicInteger(0)

def apply[A : Encoder](implicit sparkSession: SparkSession): MemoryStream[A] =
new MemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession)

def apply[A : Encoder](numPartitions: Int)(implicit sparkSession: SparkSession): MemoryStream[A] =
new MemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession, Some(numPartitions))
}

/**
* Provides lower-priority implicits for MemoryStream to prevent ambiguity when both
* SparkSession and SQLContext are in scope. The implicits in the companion object,
* which use SparkSession, take higher precedence.
*/
trait LowPriorityMemoryStreamImplicits {
this: MemoryStream.type =>

// Deprecated: Used when an implicit SQLContext is in scope
@deprecated("Use MemoryStream.apply with an implicit SparkSession instead of SQLContext", "4.1.0")
def apply[A: Encoder]()(implicit sqlContext: SQLContext): MemoryStream[A] =

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the problem. This is not backward compatible, as the previous API is def apply[A: Encoder](implicit sqlContext: SQLContext): MemoryStream[A] (no parentheses).

There is no way to keep both implicits. So the proposal here is to only keep implicit SQLContext, and require to pass SparkSession implicitly.

/**
* Creates a MemoryStream with an implicit SQLContext (backward compatible).
* Usage: `MemoryStream[Int]`
*/
def apply[A: Encoder](implicit sqlContext: SQLContext): MemoryStream[A] =
new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext.sparkSession)

@deprecated("Use MemoryStream.apply with an implicit SparkSession instead of SQLContext", "4.1.0")
def apply[A: Encoder](numPartitions: Int)(implicit sqlContext: SQLContext): MemoryStream[A] =
/**
* Creates a MemoryStream with specified partitions using implicit SQLContext.
* Usage: `MemoryStream[Int](numPartitions)`
*/
def apply[A: Encoder](numPartitions: Int)(
implicit sqlContext: SQLContext): MemoryStream[A] =
new MemoryStream[A](
memoryStreamId.getAndIncrement(),
sqlContext.sparkSession,
Some(numPartitions))

/**
* Creates a MemoryStream with explicit SparkSession.
* Usage: `MemoryStream[Int](spark)`
*/
def apply[A: Encoder](sparkSession: SparkSession): MemoryStream[A] =
new MemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession)

/**
* Creates a MemoryStream with specified partitions using explicit SparkSession.
* Usage: `MemoryStream[Int](spark, numPartitions)`
*/
def apply[A: Encoder](sparkSession: SparkSession, numPartitions: Int): MemoryStream[A] =
new MemoryStream[A](
memoryStreamId.getAndIncrement(),
sparkSession,
Some(numPartitions))

/**
* Creates a MemoryStream with explicit encoder and SparkSession.
* Usage: `MemoryStream(Encoders.scalaInt, spark)`
*/
def apply[A](encoder: Encoder[A], sparkSession: SparkSession): MemoryStream[A] =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I roughly remember the intention was to discourage the usage of SQLContext - if that's the case, we probably want to have the way to pass numPartitions parameter.

That said, looks like this method (explicit encoder instance) is newly added. Is there any usage of this? We don't seem to add the same in ContinuousMemoryStream and LowLatencyMemoryStream.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in quite some places like StreamingQueryManagerSuite

  testQuietly("can start a streaming query with the same name in a different session") {
    val session2 = spark.cloneSession()

    val ds1 = MemoryStream(Encoders.INT, spark).toDS()
    val ds2 = MemoryStream(Encoders.INT, session2).toDS()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've add a new overload to specific numPartitions with SparkSession.

new MemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession)(encoder)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,47 +112,36 @@ class ContinuousMemoryStream[A : Encoder](
override def commit(end: Offset): Unit = {}
}

object ContinuousMemoryStream extends LowPriorityContinuousMemoryStreamImplicits {
object ContinuousMemoryStream {
protected val memoryStreamId = new AtomicInteger(0)

def apply[A : Encoder](implicit sparkSession: SparkSession): ContinuousMemoryStream[A] =
new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession)

def apply[A : Encoder](numPartitions: Int)(implicit sparkSession: SparkSession):
ContinuousMemoryStream[A] =
new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession, numPartitions)

def singlePartition[A : Encoder](implicit sparkSession: SparkSession): ContinuousMemoryStream[A] =
new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession, 1)
}

/**
* Provides lower-priority implicits for ContinuousMemoryStream to prevent ambiguity when both
* SparkSession and SQLContext are in scope. The implicits in the companion object,
* which use SparkSession, take higher precedence.
*/
trait LowPriorityContinuousMemoryStreamImplicits {
this: ContinuousMemoryStream.type =>

// Deprecated: Used when an implicit SQLContext is in scope
@deprecated("Use ContinuousMemoryStream with an implicit SparkSession " +
"instead of SQLContext", "4.1.0")
def apply[A: Encoder]()(implicit sqlContext: SQLContext): ContinuousMemoryStream[A] =
/** Creates a ContinuousMemoryStream with an implicit SQLContext (backward compatible). */
def apply[A: Encoder](implicit sqlContext: SQLContext): ContinuousMemoryStream[A] =
new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext.sparkSession)

@deprecated("Use ContinuousMemoryStream with an implicit SparkSession " +
"instead of SQLContext", "4.1.0")
def apply[A: Encoder](numPartitions: Int)(implicit sqlContext: SQLContext):
ContinuousMemoryStream[A] =
/** Creates a ContinuousMemoryStream with specified partitions (SQLContext). */
def apply[A: Encoder](numPartitions: Int)(
implicit sqlContext: SQLContext): ContinuousMemoryStream[A] =
new ContinuousMemoryStream[A](
memoryStreamId.getAndIncrement(),
sqlContext.sparkSession,
numPartitions)

@deprecated("Use ContinuousMemoryStream.singlePartition with an implicit SparkSession " +
"instead of SQLContext", "4.1.0")
def singlePartition[A: Encoder]()(implicit sqlContext: SQLContext): ContinuousMemoryStream[A] =
/** Creates a ContinuousMemoryStream with explicit SparkSession. */
def apply[A: Encoder](sparkSession: SparkSession): ContinuousMemoryStream[A] =
new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession)

/** Creates a ContinuousMemoryStream with specified partitions (SparkSession). */
def apply[A: Encoder](sparkSession: SparkSession, numPartitions: Int): ContinuousMemoryStream[A] =
new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession, numPartitions)

/** Creates a single partition ContinuousMemoryStream (SQLContext). */
def singlePartition[A: Encoder](implicit sqlContext: SQLContext): ContinuousMemoryStream[A] =
new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext.sparkSession, 1)

/** Creates a single partition ContinuousMemoryStream (SparkSession). */
def singlePartition[A: Encoder](sparkSession: SparkSession): ContinuousMemoryStream[A] =
new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession, 1)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,53 +183,39 @@ class LowLatencyMemoryStream[A: Encoder](
}
}

object LowLatencyMemoryStream extends LowPriorityLowLatencyMemoryStreamImplicits {
object LowLatencyMemoryStream {
protected val memoryStreamId = new AtomicInteger(0)

def apply[A: Encoder](implicit sparkSession: SparkSession): LowLatencyMemoryStream[A] =
new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession)
/** Creates a LowLatencyMemoryStream with an implicit SQLContext (backward compatible). */
def apply[A: Encoder](implicit sqlContext: SQLContext): LowLatencyMemoryStream[A] =
new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext.sparkSession)

/** Creates a LowLatencyMemoryStream with specified partitions (SQLContext). */
def apply[A: Encoder](numPartitions: Int)(
implicit
sparkSession: SparkSession): LowLatencyMemoryStream[A] =
implicit sqlContext: SQLContext): LowLatencyMemoryStream[A] =
new LowLatencyMemoryStream[A](
memoryStreamId.getAndIncrement(),
sparkSession,
numPartitions = numPartitions
)

def singlePartition[A: Encoder](implicit sparkSession: SparkSession): LowLatencyMemoryStream[A] =
new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession, 1)
}

/**
* Provides lower-priority implicits for LowLatencyMemoryStream to prevent ambiguity when both
* SparkSession and SQLContext are in scope. The implicits in the companion object,
* which use SparkSession, take higher precedence.
*/
trait LowPriorityLowLatencyMemoryStreamImplicits {
this: LowLatencyMemoryStream.type =>
sqlContext.sparkSession,
numPartitions = numPartitions)

// Deprecated: Used when an implicit SQLContext is in scope
@deprecated("Use LowLatencyMemoryStream with an implicit SparkSession " +
"instead of SQLContext", "4.1.0")
def apply[A: Encoder]()(implicit sqlContext: SQLContext): LowLatencyMemoryStream[A] =
new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext.sparkSession)
/** Creates a LowLatencyMemoryStream with explicit SparkSession. */
def apply[A: Encoder](sparkSession: SparkSession): LowLatencyMemoryStream[A] =
new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession)

@deprecated("Use LowLatencyMemoryStream with an implicit SparkSession " +
"instead of SQLContext", "4.1.0")
def apply[A: Encoder](numPartitions: Int)(implicit sqlContext: SQLContext):
LowLatencyMemoryStream[A] =
/** Creates a LowLatencyMemoryStream with specified partitions (SparkSession). */
def apply[A: Encoder](sparkSession: SparkSession, numPartitions: Int): LowLatencyMemoryStream[A] =
new LowLatencyMemoryStream[A](
memoryStreamId.getAndIncrement(),
sqlContext.sparkSession,
numPartitions = numPartitions
)
sparkSession,
numPartitions = numPartitions)

@deprecated("Use LowLatencyMemoryStream.singlePartition with an implicit SparkSession " +
"instead of SQLContext", "4.1.0")
def singlePartition[A: Encoder]()(implicit sqlContext: SQLContext): LowLatencyMemoryStream[A] =
/** Creates a single partition LowLatencyMemoryStream (SQLContext). */
def singlePartition[A: Encoder](implicit sqlContext: SQLContext): LowLatencyMemoryStream[A] =
new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext.sparkSession, 1)

/** Creates a single partition LowLatencyMemoryStream (SparkSession). */
def singlePartition[A: Encoder](sparkSession: SparkSession): LowLatencyMemoryStream[A] =
new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession, 1)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ class PythonStreamingDataSourceWriteSuite extends PythonDataSourceSuiteBase {
val dataSource =
createUserDefinedPythonDataSource(dataSourceName, simpleDataStreamWriterScript)
spark.dataSource.registerPython(dataSourceName, dataSource)
val inputData = MemoryStream[Int](numPartitions = 3)
val inputData = MemoryStream[Int](spark, numPartitions = 3)
val df = inputData.toDF()
withTempDir { dir =>
val path = dir.getAbsolutePath
Expand Down Expand Up @@ -998,7 +998,7 @@ class PythonStreamingDataSourceWriteSuite extends PythonDataSourceSuiteBase {
|""".stripMargin
val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript)
spark.dataSource.registerPython(dataSourceName, dataSource)
val inputData = MemoryStream[Int](numPartitions = 3)
val inputData = MemoryStream[Int](spark, numPartitions = 3)
withTempDir { dir =>
val path = dir.getAbsolutePath
val checkpointDir = new File(path, "checkpoint")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,84 +343,6 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
intsToDF(expected)(schema))
}

test("LowPriorityMemoryStreamImplicits works with implicit sqlContext") {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol we added tests in wrong place... the file seems to be for memory "sink", not memory "source".

// Test that MemoryStream can be created using implicit sqlContext
implicit val sqlContext: SQLContext = spark.sqlContext

// Test MemoryStream[A]() with implicit sqlContext
val stream1 = MemoryStream[Int]()
assert(stream1 != null)

// Test MemoryStream[A](numPartitions) with implicit sqlContext
val stream2 = MemoryStream[String](3)
assert(stream2 != null)

// Verify the streams work correctly
stream1.addData(1, 2, 3)
val df1 = stream1.toDF()
assert(df1.schema.fieldNames.contains("value"))

stream2.addData("a", "b", "c")
val df2 = stream2.toDF()
assert(df2.schema.fieldNames.contains("value"))
}

test("LowPriorityContinuousMemoryStreamImplicits works with implicit sqlContext") {
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
// Test that ContinuousMemoryStream can be created using implicit sqlContext
implicit val sqlContext: SQLContext = spark.sqlContext

// Test ContinuousMemoryStream[A]() with implicit sqlContext
val stream1 = ContinuousMemoryStream[Int]()
assert(stream1 != null)

// Test ContinuousMemoryStream[A](numPartitions) with implicit sqlContext
val stream2 = ContinuousMemoryStream[String](3)
assert(stream2 != null)

// Test ContinuousMemoryStream.singlePartition with implicit sqlContext
val stream3 = ContinuousMemoryStream.singlePartition[Int]()
assert(stream3 != null)

// Verify the streams work correctly
stream1.addData(Seq(1, 2, 3))
stream2.addData(Seq("a", "b", "c"))
stream3.addData(Seq(10, 20))

// Basic verification that streams are functional
assert(stream1.initialOffset() != null)
assert(stream2.initialOffset() != null)
assert(stream3.initialOffset() != null)
}

test("LowPriorityLowLatencyMemoryStreamImplicits works with implicit sqlContext") {
import org.apache.spark.sql.execution.streaming.LowLatencyMemoryStream
// Test that LowLatencyMemoryStream can be created using implicit sqlContext
implicit val sqlContext: SQLContext = spark.sqlContext

// Test LowLatencyMemoryStream[A]() with implicit sqlContext
val stream1 = LowLatencyMemoryStream[Int]()
assert(stream1 != null)

// Test LowLatencyMemoryStream[A](numPartitions) with implicit sqlContext
val stream2 = LowLatencyMemoryStream[String](3)
assert(stream2 != null)

// Test LowLatencyMemoryStream.singlePartition with implicit sqlContext
val stream3 = LowLatencyMemoryStream.singlePartition[Int]()
assert(stream3 != null)

// Verify the streams work correctly
stream1.addData(Seq(1, 2, 3))
stream2.addData(Seq("a", "b", "c"))
stream3.addData(Seq(10, 20))

// Basic verification that streams are functional
assert(stream1.initialOffset() != null)
assert(stream2.initialOffset() != null)
assert(stream3.initialOffset() != null)
}

private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = {
require(schema.fields.length === 1)
sqlContext.createDataset(seq).toDF(schema.fieldNames.head)
Expand Down
Loading