Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/optimize_sorts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ mod tests {
vec![
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
" FilterExec: NOT non_nullable_col@1",
" SortExec: [non_nullable_col@2 ASC NULLS LAST]",
" SortExec: [non_nullable_col@1 ASC NULLS LAST]",
" WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
" SortExec: [non_nullable_col@1 DESC]",
" MemoryExec: partitions=0, partition_sizes=[]",
Expand Down
12 changes: 7 additions & 5 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,18 @@ mod tests {
schema.as_ref(),
)?],
input,
schema,
schema.clone(),
vec![],
None,
)?);

let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
assert_eq!(result.len(), 1);

let n_schema_fields = schema.fields().len();
let columns = result[0].columns();

let count: &Int64Array = as_primitive_array(&columns[0])?;
let count: &Int64Array = as_primitive_array(&columns[n_schema_fields])?;
assert_eq!(count.value(0), 100);
assert_eq!(count.value(99), 100);
Ok(())
Expand Down Expand Up @@ -326,19 +327,20 @@ mod tests {
let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
assert_eq!(result.len(), 1);

let n_schema_fields = schema.fields().len();
let columns = result[0].columns();

// c3 is small int

let count: &Int64Array = as_primitive_array(&columns[0])?;
let count: &Int64Array = as_primitive_array(&columns[n_schema_fields])?;
assert_eq!(count.value(0), 100);
assert_eq!(count.value(99), 100);

let max: &Int8Array = as_primitive_array(&columns[1])?;
let max: &Int8Array = as_primitive_array(&columns[n_schema_fields + 1])?;
assert_eq!(max.value(0), 125);
assert_eq!(max.value(99), 125);

let min: &Int8Array = as_primitive_array(&columns[2])?;
let min: &Int8Array = as_primitive_array(&columns[n_schema_fields + 2])?;
assert_eq!(min.value(0), -117);
assert_eq!(min.value(99), -117);

Expand Down
109 changes: 17 additions & 92 deletions datafusion/core/src/physical_plan/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::{
Column, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics, WindowExpr,
};
Expand All @@ -39,8 +39,6 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::rewrite::TreeNodeRewritable;
use datafusion_physical_expr::EquivalentClass;
use futures::stream::Stream;
use futures::{ready, StreamExt};
use log::debug;
Expand All @@ -65,8 +63,6 @@ pub struct WindowAggExec {
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Sort Keys
pub sort_keys: Option<Vec<PhysicalSortExpr>>,
/// The output ordering
output_ordering: Option<Vec<PhysicalSortExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
Expand All @@ -82,33 +78,6 @@ impl WindowAggExec {
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
let window_expr_len = window_expr.len();
// Although WindowAggExec does not change the output ordering from the input, but can not return the output ordering
// from the input directly, need to adjust the column index to align with the new schema.
let output_ordering = input
.output_ordering()
.map(|sort_exprs| {
let new_sort_exprs: Result<Vec<PhysicalSortExpr>> = sort_exprs
.iter()
.map(|e| {
let new_expr = e.expr.clone().transform_down(&|e| {
Ok(e.as_any().downcast_ref::<Column>().map(|col| {
Arc::new(Column::new(
col.name(),
window_expr_len + col.index(),
))
as Arc<dyn PhysicalExpr>
}))
})?;
Ok(PhysicalSortExpr {
expr: new_expr,
options: e.options,
})
})
.collect();
new_sort_exprs
})
.map_or(Ok(None), |v| v.map(Some))?;

Ok(Self {
input,
Expand All @@ -117,7 +86,6 @@ impl WindowAggExec {
input_schema,
partition_keys,
sort_keys,
output_ordering,
metrics: ExecutionPlanMetricsSet::new(),
})
}
Expand Down Expand Up @@ -176,34 +144,10 @@ impl ExecutionPlan for WindowAggExec {

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
// Although WindowAggExec does not change the output partitioning from the input, but can not return the output partitioning
// from the input directly, need to adjust the column index to align with the new schema.
let window_expr_len = self.window_expr.len();
let input_partitioning = self.input.output_partitioning();
match input_partitioning {
Partitioning::RoundRobinBatch(size) => Partitioning::RoundRobinBatch(size),
Partitioning::UnknownPartitioning(size) => {
Partitioning::UnknownPartitioning(size)
}
Partitioning::Hash(exprs, size) => {
let new_exprs = exprs
.into_iter()
.map(|expr| {
expr.transform_down(&|e| {
Ok(e.as_any().downcast_ref::<Column>().map(|col| {
Arc::new(Column::new(
col.name(),
window_expr_len + col.index(),
))
as Arc<dyn PhysicalExpr>
}))
})
.unwrap()
})
.collect::<Vec<_>>();
Partitioning::Hash(new_exprs, size)
}
}
// because we can have repartitioning using the partition keys
// this would be either 1 or more than 1 depending on the presense of
// repartitioning
self.input.output_partitioning()
}

/// Specifies whether this plan generates an infinite stream of records.
Expand All @@ -221,7 +165,7 @@ impl ExecutionPlan for WindowAggExec {
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.output_ordering.as_deref()
self.input().output_ordering()
}

fn maintains_input_order(&self) -> bool {
Expand All @@ -244,30 +188,7 @@ impl ExecutionPlan for WindowAggExec {
}

fn equivalence_properties(&self) -> EquivalenceProperties {
// Although WindowAggExec does not change the equivalence properties from the input, but can not return the equivalence properties
// from the input directly, need to adjust the column index to align with the new schema.
let window_expr_len = self.window_expr.len();
let mut new_properties = EquivalenceProperties::new(self.schema());
let new_eq_classes = self
.input
.equivalence_properties()
.classes()
.iter()
.map(|prop| {
let new_head = Column::new(
prop.head().name(),
window_expr_len + prop.head().index(),
);
let new_others = prop
.others()
.iter()
.map(|col| Column::new(col.name(), window_expr_len + col.index()))
.collect::<Vec<_>>();
EquivalentClass::new(new_head, new_others)
})
.collect::<Vec<_>>();
new_properties.extend(new_eq_classes);
new_properties
self.input().equivalence_properties()
}

fn with_new_children(
Expand Down Expand Up @@ -334,12 +255,13 @@ impl ExecutionPlan for WindowAggExec {
let win_cols = self.window_expr.len();
let input_cols = self.input_schema.fields().len();
// TODO stats: some windowing function will maintain invariants such as min, max...
let mut column_statistics = vec![ColumnStatistics::default(); win_cols];
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
if let Some(input_col_stats) = input_stat.column_statistics {
column_statistics.extend(input_col_stats);
} else {
column_statistics.extend(vec![ColumnStatistics::default(); input_cols]);
}
column_statistics.extend(vec![ColumnStatistics::default(); win_cols]);
Statistics {
is_exact: input_stat.is_exact,
num_rows: input_stat.num_rows,
Expand All @@ -354,10 +276,11 @@ fn create_schema(
window_expr: &[Arc<dyn WindowExpr>],
) -> Result<Schema> {
let mut fields = Vec::with_capacity(input_schema.fields().len() + window_expr.len());
fields.extend_from_slice(input_schema.fields());
// append results to the schema
for expr in window_expr {
fields.push(expr.field()?);
}
fields.extend_from_slice(input_schema.fields());
Ok(Schema::new(fields))
}

Expand Down Expand Up @@ -433,7 +356,7 @@ impl WindowAggStream {
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?,
)
}
let mut columns = transpose(partition_results)
let columns = transpose(partition_results)
.iter()
.map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::<Vec<_>>()))
.collect::<Vec<_>>()
Expand All @@ -442,9 +365,11 @@ impl WindowAggStream {

// combine with the original cols
// note the setup of window aggregates is that they newly calculated window
// expressions are always prepended to the columns
columns.extend_from_slice(batch.columns());
RecordBatch::try_new(self.schema.clone(), columns)
// expression results are always appended to the columns
let mut batch_columns = batch.columns().to_vec();
// calculate window cols
batch_columns.extend_from_slice(&columns);
RecordBatch::try_new(self.schema.clone(), batch_columns)
}

/// Evaluates the partition points given the sort columns. If the sort columns are
Expand Down
Loading