Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 182 additions & 1 deletion datafusion/physical-optimizer/src/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::ExecutionPlanProperties;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::sorts::sort::SortExec;
use std::sync::Arc;
Expand Down Expand Up @@ -91,9 +92,20 @@ impl PhysicalOptimizerRule for PushdownSort {
let sort_input = Arc::clone(sort_exec.input());
let required_ordering = sort_exec.expr();

let eq_properties = sort_input.equivalence_properties();
let new_required_ordering: Vec<_> = required_ordering
.iter()
.filter(|sort_expr| {
let is_const =
eq_properties.is_expr_constant(&sort_expr.expr).is_some();
!is_const
})
.cloned()
.collect();

// Try to push the sort requirement down through the plan tree
// Each node type defines its own pushdown behavior via try_pushdown_sort()
match sort_input.try_pushdown_sort(required_ordering)? {
match sort_input.try_pushdown_sort(&new_required_ordering)? {
SortOrderPushdownResult::Exact { inner } => {
// Data source guarantees perfect ordering - remove the Sort operator
Ok(Transformed::yes(inner))
Expand Down Expand Up @@ -127,3 +139,172 @@ impl PhysicalOptimizerRule for PushdownSort {
true
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::Result;
use datafusion_physical_expr::{
AcrossPartitions, ConstExpr, EquivalenceProperties, LexOrdering, Partitioning,
PhysicalExpr,
expressions::{Column, col},
};
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
SortOrderPushdownResult,
};
use std::any::Any;
use std::sync::Arc;

#[derive(Debug)]
struct MockPlanWithConstants {
schema: SchemaRef,
cache: PlanProperties,
constant_exprs: Vec<Arc<dyn PhysicalExpr>>,
}

impl MockPlanWithConstants {
fn new(
schema: SchemaRef,
constant_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Self> {
let mut eq_props = EquivalenceProperties::new(Arc::clone(&schema));

let const_exprs: Vec<ConstExpr> = constant_exprs
.iter()
.map(|expr| {
ConstExpr::new(Arc::clone(expr), AcrossPartitions::Uniform(None))
})
.collect();

eq_props.add_constants(const_exprs)?;

let cache = PlanProperties::new(
eq_props,
Partitioning::UnknownPartitioning(1),
datafusion_physical_plan::execution_plan::EmissionType::Incremental,
datafusion_physical_plan::execution_plan::Boundedness::Bounded,
);
Ok(Self {
schema,
cache,
constant_exprs,
})
}
}

impl DisplayAs for MockPlanWithConstants {
fn fmt_as(
&self,
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "MockPlanWithConstants")
}
}

impl ExecutionPlan for MockPlanWithConstants {
fn name(&self) -> &str {
"MockPlanWithConstants"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_: usize,
_: Arc<datafusion_execution::TaskContext>,
) -> Result<datafusion_physical_plan::SendableRecordBatchStream> {
unreachable!("This test is optimizer only")
}

fn try_pushdown_sort(
&self,
sort_exprs: &[datafusion_physical_expr::PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
if sort_exprs.len() == 1 {
let expr = &sort_exprs[0].expr;
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
if col.name() == "b" {
return Ok(SortOrderPushdownResult::Exact {
inner: Arc::new(MockPlanWithConstants::new(
self.schema.clone(),
self.constant_exprs.clone(),
)?),
});
}
}
}

Ok(SortOrderPushdownResult::Unsupported)
}
}

#[test]
fn test_sort_pushdown_prefix_removal() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));

let col_a = col("a", &schema)?;
let mock_plan = Arc::new(MockPlanWithConstants::new(
Arc::clone(&schema),
vec![Arc::clone(&col_a)],
)?);

let eq_props = mock_plan.properties().equivalence_properties();
assert!(
eq_props.is_expr_constant(&col_a).is_some(),
"Test setup failed: column 'a' should be constant in MockPlanWithConstants"
);

let sort_exprs = vec![
datafusion_physical_expr::PhysicalSortExpr {
expr: col("a", &schema)?,
options: Default::default(),
},
datafusion_physical_expr::PhysicalSortExpr {
expr: col("b", &schema)?,
options: Default::default(),
},
];
let ordering = LexOrdering::new(sort_exprs).unwrap();
let sort = Arc::new(SortExec::new(ordering, mock_plan));

let optimizer = PushdownSort::new();
let config = ConfigOptions::default();
let optimized_plan = optimizer.optimize(sort, &config)?;

assert!(
optimized_plan.as_any().downcast_ref::<SortExec>().is_none(),
"The top-level SortExec should have been removed by the optimizer"
);

assert!(
optimized_plan
.as_any()
.downcast_ref::<MockPlanWithConstants>()
.is_some(),
"The resulting plan should be MockPlanWithConstants"
);

Ok(())
}
}
Loading