Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ pub struct RunOpt {
/// Append a `LIMIT n` clause to the query
#[arg(short = 'l', long = "limit")]
limit: Option<usize>,

/// Override `datafusion.optimizer.enable_parallel_sort_merge`.
/// When unset, the config default is used. Pass `--parallel-merge false`
/// to benchmark the single-threaded `SortPreservingMergeExec` baseline.
#[arg(long = "parallel-merge")]
parallel_merge: Option<bool>,
}

pub const SORT_TPCH_QUERY_START_ID: usize = 1;
Expand Down Expand Up @@ -208,7 +214,10 @@ impl RunOpt {

/// Benchmark query `query_id` in `SORT_QUERIES`
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let config = self.common.config()?;
let mut config = self.common.config()?;
if let Some(parallel_merge) = self.parallel_merge {
config.options_mut().optimizer.enable_parallel_sort_merge = parallel_merge;
}
let rt = self.common.build_runtime()?;
let state = SessionStateBuilder::new()
.with_config(config)
Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,19 @@ config_namespace! {
/// Default: true
pub enable_sort_pushdown: bool, default = true

/// When set to true, eligible `SortPreservingMergeExec` operators are
/// replaced with a parallel merge that splits the locally sorted inputs
/// into key ranges (via regular sampling of the sorted runs) and merges
/// those ranges concurrently across `target_partitions` threads, before
/// concatenating them in order into a single sorted stream.
///
/// This trades extra memory (the sorted inputs are materialized) for
/// parallelism in the otherwise single-threaded merge. It only applies
/// to merges without a fetch/limit over bounded, multi-partition inputs.
///
/// Default: true
pub enable_parallel_sort_merge: bool, default = true

/// When set to true, the optimizer will extract leaf expressions
/// (such as `get_field`) from filter/sort/join nodes into projections
/// closer to the leaf table scans, and push those projections down
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod limit_pushdown_past_window;
pub mod limited_distinct_aggregation;
pub mod optimizer;
pub mod output_requirements;
pub mod parallel_sort_merge;
pub mod projection_pushdown;
pub use datafusion_pruning as pruning;
pub mod hash_join_buffering;
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::update_aggr_exprs::OptimizeAggregateOrder;

use crate::hash_join_buffering::HashJoinBuffering;
use crate::limit_pushdown_past_window::LimitPushPastWindows;
use crate::parallel_sort_merge::ParallelSortMerge;
use crate::pushdown_sort::PushdownSort;
use crate::window_topn::WindowTopN;
use datafusion_common::Result;
Expand Down Expand Up @@ -233,6 +234,11 @@ impl PhysicalOptimizer {
Arc::new(ProjectionPushdown::new()),
// PushdownSort: Detect sorts that can be pushed down to data sources.
Arc::new(PushdownSort::new()),
// ParallelSortMerge: optionally replace single-threaded
// `SortPreservingMergeExec` with a parallel merge. Gated by
// `datafusion.optimizer.enable_parallel_sort_merge`. Runs after sort
// placement is finalized so the merges it targets already exist.
Arc::new(ParallelSortMerge::new()),
Arc::new(EnsureCooperative::new()),
// This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan.
// Therefore, it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references.
Expand Down
121 changes: 121 additions & 0 deletions datafusion/physical-optimizer/src/parallel_sort_merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Replace eligible [`SortPreservingMergeExec`] operators with the parallel
//! [`ParallelSortPreservingMergeExec`].
//!
//! Gated by the `datafusion.optimizer.enable_parallel_sort_merge` config flag.
//! See [`ParallelSortPreservingMergeExec`] for the algorithm.

use std::sync::Arc;

use crate::PhysicalOptimizerRule;
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_plan::execution_plan::Boundedness;
use datafusion_physical_plan::sorts::parallel_merge::ParallelSortPreservingMergeExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::statistics::StatisticsArgs;
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

/// Optimizer rule that swaps [`SortPreservingMergeExec`] for the parallel
/// [`ParallelSortPreservingMergeExec`] when it is both safe and beneficial.
#[derive(Debug, Clone, Default)]
pub struct ParallelSortMerge {}

impl ParallelSortMerge {
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for ParallelSortMerge {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if !config.optimizer.enable_parallel_sort_merge {
return Ok(plan);
}
// With a single target partition there is nothing to parallelize.
if config.execution.target_partitions <= 1 {
return Ok(plan);
}

// Only parallelize merges whose input is large enough to split into
// more than one bucket; below this the materialization + sampling
// overhead outweighs the benefit and a single-threaded merge is faster.
let min_rows = config
.execution
.batch_size
.get()
.saturating_mul(config.execution.target_partitions);

plan.transform_up(|plan: Arc<dyn ExecutionPlan>| {
let Some(spm) = plan.downcast_ref::<SortPreservingMergeExec>() else {
return Ok(Transformed::no(plan));
};
let input = spm.input();

// The parallel merge materializes its input and emits no rows until
// it has merged everything, so it cannot satisfy a fetch/limit
// (which `SortPreservingMergeExec` streams and stops early on) and
// must not be used on unbounded input. It also needs more than one
// input partition to be worth parallelizing.
let eligible = spm.fetch().is_none()
&& matches!(input.boundedness(), Boundedness::Bounded)
&& input.output_partitioning().partition_count() > 1;

if !eligible {
return Ok(Transformed::no(plan));
}

// Only parallelize when the input is *known* to be large enough to
// benefit. The parallel merge materializes its input and pays a
// sampling + task-spawn overhead, so for small (or unknown-size)
// inputs the single-threaded merge is faster; when the row count is
// unavailable we conservatively keep the serial merge.
let Some(rows) = input
.statistics_with_args(&StatisticsArgs::new())
.ok()
.and_then(|stats| stats.num_rows.get_value().copied())
else {
return Ok(Transformed::no(plan));
};
if rows < min_rows {
return Ok(Transformed::no(plan));
}

let parallel = ParallelSortPreservingMergeExec::new(
spm.expr().clone(),
Arc::clone(input),
);
Ok(Transformed::yes(Arc::new(parallel)))
})
.data()
}

fn name(&self) -> &str {
"ParallelSortMerge"
}

fn schema_check(&self) -> bool {
true
}
}
23 changes: 22 additions & 1 deletion datafusion/physical-plan/benches/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, expressions::col};
use datafusion_physical_plan::test::TestMemoryExec;
use datafusion_physical_plan::{
collect, sorts::sort_preserving_merge::SortPreservingMergeExec,
collect, sorts::parallel_merge::ParallelSortPreservingMergeExec,
sorts::sort_preserving_merge::SortPreservingMergeExec,
};

use std::sync::Arc;
Expand Down Expand Up @@ -190,6 +191,26 @@ fn bench_merge_sorted_preserving(c: &mut Criterion) {
)
},
);
c.bench_function(&format!("bench_merge_sorted_parallel/{bench_name}"), |b| {
b.iter_batched(
|| {
let exec =
TestMemoryExec::try_new_exec(&partitions, schema.clone(), None)
.unwrap();
Arc::new(ParallelSortPreservingMergeExec::new(
sort_order.clone(),
exec,
))
},
|merge_exec| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
collect(merge_exec, task_ctx.clone()).await.unwrap();
});
},
BatchSize::LargeInput,
)
});
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/sorts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod builder;
mod cursor;
mod merge;
mod multi_level_merge;
pub mod parallel_merge;
pub mod partial_sort;
pub mod partitioned_topk;
pub mod sort;
Expand Down
Loading
Loading