Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,103 +35,85 @@ import org.apache.spark.sql.types.StructType
@Experimental
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
/**
* :: Experimental ::
* Specifies the input data source format.
*
* @since 2.0.0
*/
@Experimental
def format(source: String): DataStreamReader = {
this.source = source
this
}

/**
* :: Experimental ::
* Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
* automatically from data. By specifying the schema here, the underlying data source can
* skip the schema inference step, and thus speed up data loading.
*
* @since 2.0.0
*/
@Experimental
def schema(schema: StructType): DataStreamReader = {
this.userSpecifiedSchema = Option(schema)
this
}

/**
* :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: String): DataStreamReader = {
this.extraOptions += (key -> value)
this
}

/**
* :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString)

/**
* :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Long): DataStreamReader = option(key, value.toString)

/**
* :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Double): DataStreamReader = option(key, value.toString)

/**
* :: Experimental ::
* (Scala-specific) Adds input options for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def options(options: scala.collection.Map[String, String]): DataStreamReader = {
this.extraOptions ++= options
this
}

/**
* :: Experimental ::
* Adds input options for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def options(options: java.util.Map[String, String]): DataStreamReader = {
this.options(options.asScala)
this
}


/**
* :: Experimental ::
* Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
* (e.g. external key-value stores).
*
* @since 2.0.0
*/
@Experimental
def load(): DataFrame = {
val dataSource =
DataSource(
Expand All @@ -143,18 +125,15 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
}

/**
* :: Experimental ::
* Loads input in as a [[DataFrame]], for data streams that read from some path.
*
* @since 2.0.0
*/
@Experimental
def load(path: String): DataFrame = {
option("path", path).load()
}

/**
* :: Experimental ::
* Loads a JSON file stream (one object per line) and returns the result as a [[DataFrame]].
*
* This function goes through the input once to determine the input schema. If you know the
Expand Down Expand Up @@ -198,11 +177,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
@Experimental
def json(path: String): DataFrame = format("json").load(path)

/**
* :: Experimental ::
* Loads a CSV file stream and returns the result as a [[DataFrame]].
*
* This function will go through the input once to determine the input schema if `inferSchema`
Expand Down Expand Up @@ -262,11 +239,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
@Experimental
def csv(path: String): DataFrame = format("csv").load(path)

/**
* :: Experimental ::
* Loads a Parquet file stream, returning the result as a [[DataFrame]].
*
* You can set the following Parquet-specific option(s) for reading Parquet files:
Expand All @@ -281,13 +256,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
@Experimental
def parquet(path: String): DataFrame = {
format("parquet").load(path)
}

/**
* :: Experimental ::
* Loads text files and returns a [[DataFrame]] whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
Expand All @@ -308,7 +281,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
@Experimental
def text(path: String): DataFrame = format("text").load(path)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private val df = ds.toDF()

/**
* :: Experimental ::
* Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
* - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
* written to the sink
Expand All @@ -46,15 +45,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
@Experimental
def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
this.outputMode = outputMode
this
}


/**
* :: Experimental ::
* Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
* - `append`: only the new rows in the streaming DataFrame/Dataset will be written to
* the sink
Expand All @@ -63,7 +59,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
@Experimental
def outputMode(outputMode: String): DataStreamWriter[T] = {
this.outputMode = outputMode.toLowerCase match {
case "append" =>
Expand All @@ -78,7 +73,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}

/**
* :: Experimental ::
* Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
* the query as fast as possible.
*
Expand All @@ -100,33 +94,28 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
@Experimental
def trigger(trigger: Trigger): DataStreamWriter[T] = {
this.trigger = trigger
this
}


/**
* :: Experimental ::
* Specifies the name of the [[StreamingQuery]] that can be started with `start()`.
* This name must be unique among all the currently active queries in the associated SQLContext.
*
* @since 2.0.0
*/
@Experimental
def queryName(queryName: String): DataStreamWriter[T] = {
this.extraOptions += ("queryName" -> queryName)
this
}

/**
* :: Experimental ::
* Specifies the underlying output data source. Built-in options include "parquet" for now.
*
* @since 2.0.0
*/
@Experimental
def format(source: String): DataStreamWriter[T] = {
this.source = source
this
Expand Down Expand Up @@ -156,90 +145,74 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}

/**
* :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: String): DataStreamWriter[T] = {
this.extraOptions += (key -> value)
this
}

/**
* :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, value.toString)

/**
* :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Long): DataStreamWriter[T] = option(key, value.toString)

/**
* :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Double): DataStreamWriter[T] = option(key, value.toString)

/**
* :: Experimental ::
* (Scala-specific) Adds output options for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T] = {
this.extraOptions ++= options
this
}

/**
* :: Experimental ::
* Adds output options for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def options(options: java.util.Map[String, String]): DataStreamWriter[T] = {
this.options(options.asScala)
this
}

/**
* :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
@Experimental
def start(path: String): StreamingQuery = {
option("path", path).start()
}

/**
* :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
@Experimental
def start(): StreamingQuery = {
if (source == "memory") {
assertNotPartitioned("memory")
Expand Down Expand Up @@ -297,7 +270,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}

/**
* :: Experimental ::
* Starts the execution of the streaming query, which will continually send results to the given
* [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data
* generated by the [[DataFrame]]/[[Dataset]] to an external system.
Expand Down Expand Up @@ -343,7 +315,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
@Experimental
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
this.source = "foreach"
this.foreachWriter = if (writer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.DataStreamWriter `DataStreamWriter.start()`]],
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]],
* that is, `onQueryStart` will be called on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]]. Please
* don't block this method as it will block your query.
Expand Down Expand Up @@ -101,8 +101,6 @@ object StreamingQueryListener {
* @param queryInfo Information about the status of the query.
* @param exception The exception message of the [[StreamingQuery]] if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @param stackTrace The stack trace of the exception if the query was terminated with an
* exception. It will be empty if there was no error.
* @since 2.0.0
*/
@Experimental
Expand Down