From 54ee990cef2cf9626b4ba42dca40d012db91c1ab Mon Sep 17 00:00:00 2001 From: jackwener Date: Thu, 17 Nov 2022 18:31:35 +0800 Subject: [PATCH 1/2] support cross_join in `limit_push_down` --- datafusion/optimizer/src/limit_push_down.rs | 76 +++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index 2f121b8814326..50ee2e0da9ab7 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -24,6 +24,7 @@ use datafusion_expr::{ Join, JoinType, Limit, LogicalPlan, Projection, Sort, TableScan, Union, }, utils::from_plan, + CrossJoin, }; use std::sync::Arc; @@ -204,6 +205,38 @@ fn limit_push_down( schema: schema.clone(), })) } + ( + LogicalPlan::CrossJoin(cross_join), + Ancestor::FromLimit { + skip: ancestor_skip, + fetch: Some(ancestor_fetch), + .. + }, + ) => { + let left = &*cross_join.left; + let right = &*cross_join.right; + Ok(LogicalPlan::CrossJoin(CrossJoin { + left: Arc::new(limit_push_down( + _optimizer, + Ancestor::FromLimit { + skip: 0, + fetch: Some(ancestor_fetch + ancestor_skip), + }, + left, + _optimizer_config, + )?), + right: Arc::new(limit_push_down( + _optimizer, + Ancestor::FromLimit { + skip: 0, + fetch: Some(ancestor_fetch + ancestor_skip), + }, + right, + _optimizer_config, + )?), + schema: plan.schema().clone(), + })) + } ( LogicalPlan::Join(Join { join_type, .. }), Ancestor::FromLimit { @@ -394,6 +427,7 @@ mod test { Ok(()) } + #[test] fn limit_push_down_take_smaller_limit() -> Result<()> { let table_scan = test_table_scan()?; @@ -872,4 +906,46 @@ mod test { Ok(()) } + + #[test] + fn limit_push_down_cross_join() -> Result<()> { + let table_scan_1 = test_table_scan()?; + let table_scan_2 = test_table_scan_with_name("test2")?; + + let plan = LogicalPlanBuilder::from(table_scan_1) + .cross_join(&LogicalPlanBuilder::from(table_scan_2).build()?)? + .limit(0, Some(1000))? + .build()?; + + // Limit pushdown Not supported in Join + let expected = "Limit: skip=0, fetch=1000\ + \n CrossJoin:\ + \n TableScan: test, fetch=1000\ + \n TableScan: test2, fetch=1000"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn skip_limit_push_down_cross_join() -> Result<()> { + let table_scan_1 = test_table_scan()?; + let table_scan_2 = test_table_scan_with_name("test2")?; + + let plan = LogicalPlanBuilder::from(table_scan_1) + .cross_join(&LogicalPlanBuilder::from(table_scan_2).build()?)? + .limit(1000, Some(1000))? + .build()?; + + // Limit pushdown Not supported in Join + let expected = "Limit: skip=1000, fetch=1000\ + \n CrossJoin:\ + \n TableScan: test, fetch=2000\ + \n TableScan: test2, fetch=2000"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } } From ab68dfebb5f0e4234e0aa4b431f191bd6457ff7e Mon Sep 17 00:00:00 2001 From: jackwener Date: Thu, 17 Nov 2022 22:25:23 +0800 Subject: [PATCH 2/2] remove comment --- datafusion/optimizer/src/limit_push_down.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index 50ee2e0da9ab7..51b2bb4f0d371 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -917,7 +917,6 @@ mod test { .limit(0, Some(1000))? .build()?; - // Limit pushdown Not supported in Join let expected = "Limit: skip=0, fetch=1000\ \n CrossJoin:\ \n TableScan: test, fetch=1000\ @@ -938,7 +937,6 @@ mod test { .limit(1000, Some(1000))? .build()?; - // Limit pushdown Not supported in Join let expected = "Limit: skip=1000, fetch=1000\ \n CrossJoin:\ \n TableScan: test, fetch=2000\