Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ impl LogicalPlanBuilder {
/// Wrap a plan in a window
pub fn window_plan(
input: LogicalPlan,
window_exprs: Vec<Expr>,
window_exprs: impl IntoIterator<Item = Expr>,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this allows feeding in a HashSet

) -> Result<LogicalPlan> {
let mut plan = input;
let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ pub fn compare_sort_expr(

/// Group a slice of window expression expr by their order by expressions
pub fn group_window_expr_by_sort_keys(
window_expr: Vec<Expr>,
window_expr: impl IntoIterator<Item = Expr>,
) -> Result<Vec<(WindowSortKey, Vec<Expr>)>> {
let mut result = vec![];
window_expr.into_iter().try_for_each(|expr| match &expr {
Expand Down
16 changes: 13 additions & 3 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ pub async fn from_project_rel(
p: &ProjectRel,
) -> Result<LogicalPlan> {
if let Some(input) = p.input.as_ref() {
let mut input = LogicalPlanBuilder::from(consumer.consume_rel(input).await?);
let input = consumer.consume_rel(input).await?;
let original_schema = Arc::clone(input.schema());

// Ensure that all expressions have a unique display name, so that
Expand All @@ -1075,6 +1075,10 @@ pub async fn from_project_rel(
// leaving only explicit expressions.

let mut explicit_exprs: Vec<Expr> = vec![];
// For WindowFunctions, we need to wrap them in a Window relation. If there are duplicates,
// we can do the window'ing only once, then the project will duplicate the result.
// Order here doesn't matter since LPB::window_plan sorts the expressions.
let mut window_exprs: HashSet<Expr> = HashSet::new();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't order of the expressions matter? If so, I think you could use an IndexSet rather than an HashSet to preserve the input order too

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like an improvement over what is on main, so I think we could merge this PR as is, but this seems the potential reordering might cause potentially confusing intermittently variable output

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I don't think it matters per se, since the project below then puts things into right places based on the names. However, for consistency it might be nice for the order to stay. Let me see if I can quickly do that!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no, the ordering we use here doesn't matter, since the LogicalPlanBuilder::window_plan calls group_window_expr_by_sort_keys which sorts the expressions anyways. At least I think so. So I just added a comment to note that: 696bf5d

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Blizzara

for expr in &p.expressions {
let e = consumer
.consume_expression(expr, input.clone().schema())
Expand All @@ -1084,18 +1088,24 @@ pub async fn from_project_rel(
// Adding the same expression here and in the project below
// works because the project's builder uses columnize_expr(..)
// to transform it into a column reference
input = input.window(vec![e.clone()])?
window_exprs.insert(e.clone());
}
explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?);
}

let input = if !window_exprs.is_empty() {
LogicalPlanBuilder::window_plan(input, window_exprs)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has logic built in to separate the expressions by their windows, so it's kinda nice.

} else {
input
};

let mut final_exprs: Vec<Expr> = vec![];
for index in 0..original_schema.fields().len() {
let e = Expr::Column(Column::from(original_schema.qualified_field(index)));
final_exprs.push(name_tracker.get_uniquely_named_expr(e)?);
}
final_exprs.append(&mut explicit_exprs);
input.project(final_exprs)?.build()
project(input, final_exprs)
} else {
not_impl_err!("Projection without an input is not supported")
}
Expand Down
66 changes: 65 additions & 1 deletion datafusion/substrait/tests/cases/logical_plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ mod tests {
"Projection: NOT DATA.D AS EXPR$0\
\n TableScan: DATA"
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added these for all of the cases, since some of my iterations passed logical plan creation but failed at physical planning stage.


Ok(())
}

Expand All @@ -71,6 +75,63 @@ mod tests {
\n WindowAggr: windowExpr=[[sum(DATA.D) PARTITION BY [DATA.PART] ORDER BY [DATA.ORD ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: DATA"
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;

Ok(())
}

#[tokio::test]
async fn double_window_function() -> Result<()> {
// Confirms a WindowExpr can be repeated in the same project.
// This wouldn't normally happen with DF-created plans since CSE would eliminate the duplicate.

// File generated with substrait-java's Isthmus:
// ./isthmus-cli/build/graal/isthmus --create "create table data (a int)" "select ROW_NUMBER() OVER (), ROW_NUMBER() OVER () AS aliased from data";
let proto_plan =
read_json("tests/testdata/test_plans/double_window.substrait.json");
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

assert_eq!(
format!("{}", plan),
"Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$0, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW__temp__0 AS ALIASED\
\n WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: DATA"
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;

Ok(())
}

#[tokio::test]
async fn double_window_function_distinct_windows() -> Result<()> {
// Confirms a single project can have multiple window functions with separate windows in it.
// This wouldn't normally happen with DF-created plans since logical optimizer would
// separate them out.

// File generated with substrait-java's Isthmus:
// ./isthmus-cli/build/graal/isthmus --create "create table data (a int)" "select ROW_NUMBER() OVER (), ROW_NUMBER() OVER (PARTITION BY a) from data";
let proto_plan = read_json(
"tests/testdata/test_plans/double_window_distinct_windows.substrait.json",
);
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

assert_eq!(
format!("{}", plan),
"Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$0, row_number() PARTITION BY [DATA.A] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$1\
\n WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n WindowAggr: windowExpr=[[row_number() PARTITION BY [DATA.A] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: DATA"
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;

Ok(())
}

Expand All @@ -86,7 +147,7 @@ mod tests {

assert_eq!(format!("{}", &plan), "Values: (List([1, 2]))");

// Need to trigger execution to ensure that Arrow has validated the plan
// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;

Ok(())
Expand All @@ -107,6 +168,9 @@ mod tests {
\n TableScan: sales"
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;

Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"extensionUris": [
{
"extensionUriAnchor": 1,
"uri": "/functions_arithmetic.yaml"
}
],
"extensions": [
{
"extensionFunction": {
"extensionUriReference": 1,
"functionAnchor": 0,
"name": "row_number:"
}
}
],
"relations": [
{
"root": {
"input": {
"project": {
"common": {
"emit": {
"outputMapping": [
1,
2
]
}
},
"input": {
"read": {
"common": {
"direct": {
}
},
"baseSchema": {
"names": [
"A"
],
"struct": {
"types": [
{
"i32": {
"typeVariationReference": 0,
"nullability": "NULLABILITY_NULLABLE"
}
}
],
"typeVariationReference": 0,
"nullability": "NULLABILITY_REQUIRED"
}
},
"namedTable": {
"names": [
"DATA"
]
}
}
},
"expressions": [
{
"windowFunction": {
"functionReference": 0,
"partitions": [],
"sorts": [],
"upperBound": {
"currentRow": {
}
},
"lowerBound": {
"unbounded": {
}
},
"phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
"outputType": {
"i64": {
"typeVariationReference": 0,
"nullability": "NULLABILITY_REQUIRED"
}
},
"args": [],
"arguments": [],
"invocation": "AGGREGATION_INVOCATION_ALL",
"options": [],
"boundsType": "BOUNDS_TYPE_ROWS"
}
},
{
"windowFunction": {
"functionReference": 0,
"partitions": [],
"sorts": [],
"upperBound": {
"currentRow": {
}
},
"lowerBound": {
"unbounded": {
}
},
"phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
"outputType": {
"i64": {
"typeVariationReference": 0,
"nullability": "NULLABILITY_REQUIRED"
}
},
"args": [],
"arguments": [],
"invocation": "AGGREGATION_INVOCATION_ALL",
"options": [],
"boundsType": "BOUNDS_TYPE_ROWS"
}
}
]
}
},
"names": [
"EXPR$0",
"ALIASED"
]
}
}
],
"expectedTypeUrls": []
}
Loading