[SPARK-52223][CONNECT] Add SDP Spark Connect Protos#50942
Conversation
sryza
left a comment
There was a problem hiding this comment.
Awesome. A few comments – mostly cosmetic.
| // An unresolved relation that defines the dataset's flow. | ||
| spark.connect.Relation plan = 4; | ||
|
|
||
| // Default SQL configurations set when running this flow. |
There was a problem hiding this comment.
Nitpick: is the word "Default" relevant here? There's nothing more specific, right?
There was a problem hiding this comment.
How is this related to the session in which the flow is defined? Is this an additional way to set configurations? I assume this takes precedence over what the session has configured?
There was a problem hiding this comment.
Yeah, no need to say default - there is no more specific mechanism to set confs.
How is this related to the session in which the flow is defined? Is this an additional way to set configurations? I assume this takes precedence over what the session has configured?
For now, this is not supported. Users have to set confs directly in the table / flow decorators for them to be applied to the pipeline.
|
|
||
| message DefineSqlGraphElements { | ||
| optional string dataflow_graph_id = 1; | ||
| optional string sql_file_name = 2; |
There was a problem hiding this comment.
Something that occurred to me recently is that there could be SQL files with the same name in different subdirs. Should this be sql_file_path?
There was a problem hiding this comment.
I think this is a filepath in implementation, actually. Let me confirm.
There was a problem hiding this comment.
Where is this path pointing to?
There was a problem hiding this comment.
Changed to file_path. We'll rename this in the implementation too.
Where is this path pointing to?
@hvanhovell this path is the local path to the SQL file. It's mostly used for disambiguation in our observability.
| map<string, string> sql_conf = 5; | ||
|
|
||
| // If true, this flow will only be run once per execution. | ||
| bool once = 6; |
There was a problem hiding this comment.
Care to elaborate? Is this a synonym for this is batch?
There was a problem hiding this comment.
This corresponds to Trigger.Once in Spark - the flow runs once per update. This is similar to batch in triggered updates, but not in continuous ones (which we will add eventually).
|
|
||
| // A response containing events emitted during the run of a pipeline. | ||
| message PipelineEventsResult { | ||
| repeated PipelineEvent events = 1; |
There was a problem hiding this comment.
Batching events should not be needed. gRPC server side streaming can return multiple 'events' at the same time, provided it can fit them in a single window (~30k).
There was a problem hiding this comment.
That's fair. But I think the repeated field adds more flexibility in general. We can group events logically, rather than just to avoid network latency.
There was a problem hiding this comment.
Per further feedback from @grundprinzip and @hvanhovell, I'm going to take this batching out. We can always add it in in the future if we come up with a use case for logical grouping.
| repeated PipelineEvent events = 1; | ||
| } | ||
|
|
||
| message PipelineEvent { |
There was a problem hiding this comment.
Is this also supposed to include errors? If so, it'd be nice to understand what has failed... In that case adding add flow/dataset name would be nice.
There was a problem hiding this comment.
Yeah, I can the see the value in adding dataset and flow name. But two things:
- OTOH, we wanted to keep PipelineEvent's as a generic event bus rather than a structured logging format.
- It's possible an error happens that isn't scoped to a dataset/flow, making this field unpredictably empty.
But at the very least, the dataset/flow name will be in the error message.
There was a problem hiding this comment.
To add on to what @aakash-db said, our main use case for these events is to print out to the console, and the string messages will include all the context that's needed for that. Once we have a use case that involves consuming the dataset/flow name programmatically, I'd be supportive of adding more structure to this.
There was a problem hiding this comment.
Btw, errors should flow the regular way through the exception process and the error details. If we were to do it differently it would just create issues later.
There was a problem hiding this comment.
@grundprinzip I actually agree with you. If the pipeline fails we should fail in the normal way. However, that failure can originate from multiple places. As I user I would like to able to figure out what failed. We could embed that failure information in these events.
sryza
left a comment
There was a problem hiding this comment.
Just a few remaining comments
| // Parses the SQL file and registers all datasets and flows. | ||
| message DefineSqlGraphElements { | ||
| // The graph to attach this dataset to. | ||
| optional string dataflow_graph_id = 1; |
There was a problem hiding this comment.
I noticed that this is marked optional, but that the corresponding field in DefineDataset is not. How should we decide when to use optional?
cc @hvanhovell if there's a general recommendation on this.
There was a problem hiding this comment.
What optional does is generate a has<FIELD> method in Java. We can use that to throw an exception when a field isn't present. Else, the field always has an empty string value.
So really, all of our primitives should have an optional designation. I will change that.
There was a problem hiding this comment.
made all of these optional.
| repeated PipelineEvent events = 1; | ||
| } | ||
|
|
||
| message PipelineEvent { |
There was a problem hiding this comment.
To add on to what @aakash-db said, our main use case for these events is to print out to the console, and the string messages will include all the context that's needed for that. Once we have a use case that involves consuming the dataset/flow name programmatically, I'd be supportive of adding more structure to this.
sryza
left a comment
There was a problem hiding this comment.
LGTM! Will of course defer to @hvanhovell on any Spark Connect / proto / gRPC conventions.
grundprinzip
left a comment
There was a problem hiding this comment.
Mostly nits, but looks good.
| repeated PipelineEvent events = 1; | ||
| } | ||
|
|
||
| message PipelineEvent { |
There was a problem hiding this comment.
Btw, errors should flow the regular way through the exception process and the error details. If we were to do it differently it would just create issues later.
|
|
||
| // A response containing events emitted during the run of a pipeline. | ||
| message PipelineEventsResult { | ||
| repeated PipelineEvent events = 1; |
There was a problem hiding this comment.
The doc should be more explicit about how "complete" the set of events is that you receive here. Are these all events or just some? How do you know if more are coming or not.
Generally, I'd stand with Herman that if you don't expect to emit thousands of events per second, your code will be easier and simpler if you don't use a repeated field here and simply emit one event per message.
| // The type of dataset. | ||
| enum DatasetType { | ||
| // Safe default value. Should not be used. | ||
| DATASET_UNSPECIFIED = 0; |
There was a problem hiding this comment.
Linter rule should say: DATASET_TYPE_UNSPECIFIED
|
One more thing - should we regenerate the Python stubs as part of this PR? |
|
yes please run dev/generate-connect-protos.sh (or similar ;) ) |
c1f1f49 to
4f86648
Compare
### What changes were proposed in this pull request? Adds the Spark Connect API for Spark Declarative Pipelines: https://issues.apache.org/jira/browse/SPARK-51727. This adds the following protos: 1. `CreateDataflowGraph` creates a new graph in the registry. 2. `DefineDataset` and `DefineFlow` register elements to the created graph. Datasets are the nodes of the dataflow graph, and are either tables or views, and flows are the edges connecting them. 3. `StartRun` starts a run, which is a single execution of a graph. 4. `StopRun` stops an existing run, while `DropPipeline` stops any current runs and drops the pipeline. It also adds the new `PipelineCommand` object to the `ExecutePlanRequest` and the `PipelineCommand.Response` to the `ExecutePlanResponse` object. ### Why are the changes needed? Base API of Spark Declarative Pipelines. Implementation coming in future PRs. ### Does this PR introduce _any_ user-facing change? Yes - creates new proto API within Spark Connect. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50942 from aakash-db/pipeline-spark-connect-api. Lead-authored-by: Aakash Japi <aakash.japi@databricks.com> Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Sandy Ryza <sandyryza@gmail.com>
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Is there any reason for us to have an exception here? Otherwise, do you think we can rename DefineFlow.plan to DefineFlow.relation for consistency, @aakash-db , @sryza , @HyukjinKwon ?
| optional string target_dataset_name = 3; | ||
|
|
||
| // An unresolved relation that defines the dataset's flow. | ||
| optional spark.connect.Relation plan = 4; |
There was a problem hiding this comment.
This looks like a typo. To be consistent with other Apache Spark code, can we rename plan to relation? This instance seems to be the only exception.
$ git grep 'plan: Spark_Connect_Plan' | wc -l
16
$ git grep 'relation: Spark_Connect_Relation' | wc -l
10
There was a problem hiding this comment.
Thanks for catching this @dongjoon-hyun – I filed an issue to track: https://issues.apache.org/jira/browse/SPARK-52757.
### What changes were proposed in this pull request? This is a minor follow-up change to #50942 (comment) ### Why are the changes needed? Naming consistency. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51442 from peter-toth/SPARK-52757-rename-plan-to-relation. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

What changes were proposed in this pull request?
Adds the Spark Connect API for Spark Declarative Pipelines: https://issues.apache.org/jira/browse/SPARK-51727.
This adds the following protos:
CreateDataflowGraphcreates a new graph in the registry.DefineDatasetandDefineFlowregister elements to the created graph. Datasets are the nodes of the dataflow graph, and are either tables or views, and flows are the edges connecting them.StartRunstarts a run, which is a single execution of a graph.StopRunstops an existing run, whileDropPipelinestops any current runs and drops the pipeline.It also adds the new
PipelineCommandobject to theExecutePlanRequestand thePipelineCommand.Responseto theExecutePlanResponseobject.Why are the changes needed?
Base API of Spark Declarative Pipelines. Implementation coming in future PRs.
Does this PR introduce any user-facing change?
Yes - creates new proto API within Spark Connect.
How was this patch tested?
N/A
Was this patch authored or co-authored using generative AI tooling?
No.