From 611030bbdd05f79fb13cf7fdd3f079073c97da59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Jun 2026 14:57:46 +0200 Subject: [PATCH] feat: parallel sort-preserving merge (PSRS) for >2x merge speedup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `ParallelSortPreservingMergeExec` (`sorts/parallel_merge.rs`), a parallel order-preserving merge of sorted partitions using Parallel Sorting by Regular Sampling (PSRS): materialize each locally-sorted input, pick `P-1` pivots by regular sampling, cut every run by the same pivots (binary search on byte-comparable row encodings), then merge the `P` key-range buckets concurrently (one `SpawnedTask` each, reusing the loser-tree `StreamingMergeBuilder`) and concatenate the buckets in order. Output is a single globally-sorted partition — same contract as `SortPreservingMergeExec`, computed with `target_partitions`-way parallelism. A new `JoinSelection`-style physical-optimizer rule `ParallelSortMerge` (`parallel_sort_merge.rs`, run after `PushdownSort`) replaces an eligible `SortPreservingMergeExec` with the parallel exec, gated by `datafusion.optimizer.enable_parallel_sort_merge` (default true). Eligibility: no fetch/limit, bounded input, >1 input partition, and a *known* row count >= `batch_size * target_partitions` (unknown-size inputs keep the serial merge to avoid regressing small data). It is pipeline-breaking (materializes all input, no early-stop), so it never honors a pushed-down limit and is restricted to bounded inputs. Measured: ~2.36x on the `sort_preserving_merge` bench (1M rows x 3 partitions, u64); ~2-2.5x on sort_tpch (Q1 2.54x, Q2 2.31x, Q10 1.93x). A `--parallel-merge` A/B flag is added to the sort_tpch benchmark. Co-Authored-By: Claude Opus 4.8 (1M context) --- benchmarks/src/sort_tpch.rs | 11 +- datafusion/common/src/config.rs | 13 + datafusion/physical-optimizer/src/lib.rs | 1 + .../physical-optimizer/src/optimizer.rs | 6 + .../src/parallel_sort_merge.rs | 121 +++ .../benches/sort_preserving_merge.rs | 23 +- datafusion/physical-plan/src/sorts/mod.rs | 1 + .../physical-plan/src/sorts/parallel_merge.rs | 931 ++++++++++++++++++ .../test_files/array_agg_sliding_window.slt | 2 +- .../sqllogictest/test_files/explain.slt | 4 + .../test_files/information_schema.slt | 2 + datafusion/sqllogictest/test_files/joins.slt | 16 +- datafusion/sqllogictest/test_files/window.slt | 18 +- docs/source/user-guide/configs.md | 1 + 14 files changed, 1130 insertions(+), 20 deletions(-) create mode 100644 datafusion/physical-optimizer/src/parallel_sort_merge.rs create mode 100644 datafusion/physical-plan/src/sorts/parallel_merge.rs diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index 2182d1a383633..b6499a5bbeb82 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -72,6 +72,12 @@ pub struct RunOpt { /// Append a `LIMIT n` clause to the query #[arg(short = 'l', long = "limit")] limit: Option, + + /// Override `datafusion.optimizer.enable_parallel_sort_merge`. + /// When unset, the config default is used. Pass `--parallel-merge false` + /// to benchmark the single-threaded `SortPreservingMergeExec` baseline. + #[arg(long = "parallel-merge")] + parallel_merge: Option, } pub const SORT_TPCH_QUERY_START_ID: usize = 1; @@ -208,7 +214,10 @@ impl RunOpt { /// Benchmark query `query_id` in `SORT_QUERIES` async fn benchmark_query(&self, query_id: usize) -> Result> { - let config = self.common.config()?; + let mut config = self.common.config()?; + if let Some(parallel_merge) = self.parallel_merge { + config.options_mut().optimizer.enable_parallel_sort_merge = parallel_merge; + } let rt = self.common.build_runtime()?; let state = SessionStateBuilder::new() .with_config(config) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index cc263dfe3e619..2eaa9d8fb6bdb 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1636,6 +1636,19 @@ config_namespace! { /// Default: true pub enable_sort_pushdown: bool, default = true + /// When set to true, eligible `SortPreservingMergeExec` operators are + /// replaced with a parallel merge that splits the locally sorted inputs + /// into key ranges (via regular sampling of the sorted runs) and merges + /// those ranges concurrently across `target_partitions` threads, before + /// concatenating them in order into a single sorted stream. + /// + /// This trades extra memory (the sorted inputs are materialized) for + /// parallelism in the otherwise single-threaded merge. It only applies + /// to merges without a fetch/limit over bounded, multi-partition inputs. + /// + /// Default: true + pub enable_parallel_sort_merge: bool, default = true + /// When set to true, the optimizer will extract leaf expressions /// (such as `get_field`) from filter/sort/join nodes into projections /// closer to the leaf table scans, and push those projections down diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index b9eb248f6e843..ad23df1c9dabf 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -40,6 +40,7 @@ pub mod limit_pushdown_past_window; pub mod limited_distinct_aggregation; pub mod optimizer; pub mod output_requirements; +pub mod parallel_sort_merge; pub mod projection_pushdown; pub use datafusion_pruning as pruning; pub mod hash_join_buffering; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 0f81512b61c8e..fcde538252c57 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -37,6 +37,7 @@ use crate::update_aggr_exprs::OptimizeAggregateOrder; use crate::hash_join_buffering::HashJoinBuffering; use crate::limit_pushdown_past_window::LimitPushPastWindows; +use crate::parallel_sort_merge::ParallelSortMerge; use crate::pushdown_sort::PushdownSort; use crate::window_topn::WindowTopN; use datafusion_common::Result; @@ -233,6 +234,11 @@ impl PhysicalOptimizer { Arc::new(ProjectionPushdown::new()), // PushdownSort: Detect sorts that can be pushed down to data sources. Arc::new(PushdownSort::new()), + // ParallelSortMerge: optionally replace single-threaded + // `SortPreservingMergeExec` with a parallel merge. Gated by + // `datafusion.optimizer.enable_parallel_sort_merge`. Runs after sort + // placement is finalized so the merges it targets already exist. + Arc::new(ParallelSortMerge::new()), Arc::new(EnsureCooperative::new()), // This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan. // Therefore, it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references. diff --git a/datafusion/physical-optimizer/src/parallel_sort_merge.rs b/datafusion/physical-optimizer/src/parallel_sort_merge.rs new file mode 100644 index 0000000000000..225de086cccb1 --- /dev/null +++ b/datafusion/physical-optimizer/src/parallel_sort_merge.rs @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Replace eligible [`SortPreservingMergeExec`] operators with the parallel +//! [`ParallelSortPreservingMergeExec`]. +//! +//! Gated by the `datafusion.optimizer.enable_parallel_sort_merge` config flag. +//! See [`ParallelSortPreservingMergeExec`] for the algorithm. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_plan::execution_plan::Boundedness; +use datafusion_physical_plan::sorts::parallel_merge::ParallelSortPreservingMergeExec; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion_physical_plan::statistics::StatisticsArgs; +use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; + +/// Optimizer rule that swaps [`SortPreservingMergeExec`] for the parallel +/// [`ParallelSortPreservingMergeExec`] when it is both safe and beneficial. +#[derive(Debug, Clone, Default)] +pub struct ParallelSortMerge {} + +impl ParallelSortMerge { + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for ParallelSortMerge { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + if !config.optimizer.enable_parallel_sort_merge { + return Ok(plan); + } + // With a single target partition there is nothing to parallelize. + if config.execution.target_partitions <= 1 { + return Ok(plan); + } + + // Only parallelize merges whose input is large enough to split into + // more than one bucket; below this the materialization + sampling + // overhead outweighs the benefit and a single-threaded merge is faster. + let min_rows = config + .execution + .batch_size + .get() + .saturating_mul(config.execution.target_partitions); + + plan.transform_up(|plan: Arc| { + let Some(spm) = plan.downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + let input = spm.input(); + + // The parallel merge materializes its input and emits no rows until + // it has merged everything, so it cannot satisfy a fetch/limit + // (which `SortPreservingMergeExec` streams and stops early on) and + // must not be used on unbounded input. It also needs more than one + // input partition to be worth parallelizing. + let eligible = spm.fetch().is_none() + && matches!(input.boundedness(), Boundedness::Bounded) + && input.output_partitioning().partition_count() > 1; + + if !eligible { + return Ok(Transformed::no(plan)); + } + + // Only parallelize when the input is *known* to be large enough to + // benefit. The parallel merge materializes its input and pays a + // sampling + task-spawn overhead, so for small (or unknown-size) + // inputs the single-threaded merge is faster; when the row count is + // unavailable we conservatively keep the serial merge. + let Some(rows) = input + .statistics_with_args(&StatisticsArgs::new()) + .ok() + .and_then(|stats| stats.num_rows.get_value().copied()) + else { + return Ok(Transformed::no(plan)); + }; + if rows < min_rows { + return Ok(Transformed::no(plan)); + } + + let parallel = ParallelSortPreservingMergeExec::new( + spm.expr().clone(), + Arc::clone(input), + ); + Ok(Transformed::yes(Arc::new(parallel))) + }) + .data() + } + + fn name(&self) -> &str { + "ParallelSortMerge" + } + + fn schema_check(&self) -> bool { + true + } +} diff --git a/datafusion/physical-plan/benches/sort_preserving_merge.rs b/datafusion/physical-plan/benches/sort_preserving_merge.rs index 76ebf230a30e0..04909b8bca897 100644 --- a/datafusion/physical-plan/benches/sort_preserving_merge.rs +++ b/datafusion/physical-plan/benches/sort_preserving_merge.rs @@ -25,7 +25,8 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, expressions::col}; use datafusion_physical_plan::test::TestMemoryExec; use datafusion_physical_plan::{ - collect, sorts::sort_preserving_merge::SortPreservingMergeExec, + collect, sorts::parallel_merge::ParallelSortPreservingMergeExec, + sorts::sort_preserving_merge::SortPreservingMergeExec, }; use std::sync::Arc; @@ -190,6 +191,26 @@ fn bench_merge_sorted_preserving(c: &mut Criterion) { ) }, ); + c.bench_function(&format!("bench_merge_sorted_parallel/{bench_name}"), |b| { + b.iter_batched( + || { + let exec = + TestMemoryExec::try_new_exec(&partitions, schema.clone(), None) + .unwrap(); + Arc::new(ParallelSortPreservingMergeExec::new( + sort_order.clone(), + exec, + )) + }, + |merge_exec| { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + collect(merge_exec, task_ctx.clone()).await.unwrap(); + }); + }, + BatchSize::LargeInput, + ) + }); } } diff --git a/datafusion/physical-plan/src/sorts/mod.rs b/datafusion/physical-plan/src/sorts/mod.rs index ca8d4a4400c49..8f01e6e9b8a42 100644 --- a/datafusion/physical-plan/src/sorts/mod.rs +++ b/datafusion/physical-plan/src/sorts/mod.rs @@ -21,6 +21,7 @@ mod builder; mod cursor; mod merge; mod multi_level_merge; +pub mod parallel_merge; pub mod partial_sort; pub mod partitioned_topk; pub mod sort; diff --git a/datafusion/physical-plan/src/sorts/parallel_merge.rs b/datafusion/physical-plan/src/sorts/parallel_merge.rs new file mode 100644 index 0000000000000..a2e8ad69342ad --- /dev/null +++ b/datafusion/physical-plan/src/sorts/parallel_merge.rs @@ -0,0 +1,931 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ParallelSortPreservingMergeExec`] merges sorted partitions in parallel. +//! +//! Unlike [`SortPreservingMergeExec`], which performs a single-threaded k-way +//! merge of all input partitions on one thread, this operator splits the merge +//! work across multiple threads using the **Parallel Sorting by Regular +//! Sampling (PSRS)** strategy of Shi & Schaeffer (1992), combined with the +//! merge-path / co-rank splitter idea (Green, McColl & Bader, 2012): +//! +//! 1. Each input partition is already locally sorted (by an upstream +//! `SortExec`). We materialize the runs and encode their sort keys with a +//! single shared [`RowConverter`] so all keys are byte-comparable. +//! 2. We draw a regular sample of keys across all runs, sort the sample, and +//! pick `P - 1` *pivots* (split points) that partition the key space into +//! `P` ranges. +//! 3. Every run is partitioned by the *same* pivots via binary search +//! (`lower_bound`). Range `s` is the `s`-th slice taken from each run. +//! 4. The `P` ranges ("buckets") are merged independently and concurrently, +//! each by the existing optimized [`StreamingMergeBuilder`] k-way merge. +//! 5. Because every run is cut by the same pivot values, bucket `s` contains +//! exactly the keys in `[pivot_{s-1}, pivot_s)` across all runs, so the +//! buckets are totally ordered. Emitting bucket 0, then 1, ... then `P - 1` +//! yields a single globally sorted stream — the same output contract as +//! [`SortPreservingMergeExec`]. +//! +//! Correctness does not depend on the pivots being balanced: any pivots produce +//! correctly sorted output. Balance only affects how evenly work is spread +//! across threads. Regular sampling gives each bucket at most `~2 * R / P` rows +//! (the classic PSRS load-balance bound) for high-cardinality keys; very +//! low-cardinality keys may yield empty buckets and limited parallelism. +//! +//! [`SortPreservingMergeExec`]: crate::sorts::sort_preserving_merge::SortPreservingMergeExec +//! [`StreamingMergeBuilder`]: crate::sorts::streaming_merge::StreamingMergeBuilder + +use std::sync::Arc; + +use crate::common::spawn_buffered; +use crate::execution_plan::{CardinalityEffect, EmissionType}; +use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use crate::sorts::streaming_merge::StreamingMergeBuilder; +use crate::statistics::StatisticsArgs; +use crate::stream::RecordBatchStreamAdapter; +use crate::{ + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, + Partitioning, PhysicalExpr, PlanProperties, SendableRecordBatchStream, Statistics, + check_if_same_properties, +}; + +use crate::execution_plan::{EvaluationType, SchedulingType}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use arrow::row::{RowConverter, Rows, SortField}; +use datafusion_common::utils::memory::get_record_batch_memory_size; +use datafusion_common::{ + DataFusionError, Result, assert_eq_or_internal_err, internal_err, +}; +use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::TaskContext; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; +use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; + +use futures::stream::{self, StreamExt, TryStreamExt}; +use log::trace; + +/// Number of key samples drawn per output bucket when selecting pivots. +/// +/// Oversampling (drawing more samples than strictly necessary) tightens the +/// load balance of the resulting buckets. The total number of samples is +/// `OVERSAMPLE_PER_BUCKET * num_buckets`, bounded by the total row count. +const OVERSAMPLE_PER_BUCKET: usize = 50; + +/// Parallel order-preserving merge of sorted input partitions. +/// +/// See the [module-level documentation](self) for the algorithm. This operator +/// produces a single, globally sorted output partition, exactly like +/// [`SortPreservingMergeExec`](crate::sorts::sort_preserving_merge::SortPreservingMergeExec), +/// but computes it with `target_partitions`-way parallelism. It materializes +/// all of its input (it is pipeline-breaking) and therefore does not support a +/// `fetch`/limit; callers needing early termination should use +/// `SortPreservingMergeExec`. +#[derive(Debug, Clone)] +pub struct ParallelSortPreservingMergeExec { + /// Input plan with sorted partitions + input: Arc, + /// Sort expressions + expr: LexOrdering, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, + /// Cache holding plan properties like equivalences, output partitioning etc. + cache: Arc, + /// Use round-robin selection of tied winners of the loser tree in each + /// bucket merge. See + /// [`SortPreservingMergeExec::with_round_robin_repartition`]. + /// + /// [`SortPreservingMergeExec::with_round_robin_repartition`]: crate::sorts::sort_preserving_merge::SortPreservingMergeExec::with_round_robin_repartition + enable_round_robin_repartition: bool, + /// Target number of parallel merge buckets. `None` means use the session's + /// `target_partitions`. + target_buckets: Option, +} + +impl ParallelSortPreservingMergeExec { + /// Create a new parallel sort preserving merge execution plan + pub fn new(expr: LexOrdering, input: Arc) -> Self { + let cache = Self::compute_properties(&input, expr.clone()); + Self { + input, + expr, + metrics: ExecutionPlanMetricsSet::new(), + cache: Arc::new(cache), + enable_round_robin_repartition: true, + target_buckets: None, + } + } + + /// Sets the selection strategy of tied winners of the loser tree algorithm, + /// applied within each bucket merge. See + /// [`SortPreservingMergeExec::with_round_robin_repartition`]. + /// + /// [`SortPreservingMergeExec::with_round_robin_repartition`]: crate::sorts::sort_preserving_merge::SortPreservingMergeExec::with_round_robin_repartition + pub fn with_round_robin_repartition( + mut self, + enable_round_robin_repartition: bool, + ) -> Self { + self.enable_round_robin_repartition = enable_round_robin_repartition; + self + } + + /// Override the target number of parallel merge buckets. By default the + /// session's `target_partitions` is used. + pub fn with_target_buckets(mut self, target_buckets: usize) -> Self { + self.target_buckets = Some(target_buckets); + self + } + + /// Input plan + pub fn input(&self) -> &Arc { + &self.input + } + + /// Sort expressions + pub fn expr(&self) -> &LexOrdering { + &self.expr + } + + /// Fast-path child replacement used by [`check_if_same_properties!`] when + /// the new child preserves this operator's properties. + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } + + /// Creates the cache object that stores the plan properties such as schema, + /// equivalence properties, ordering, partitioning, etc. + fn compute_properties( + input: &Arc, + ordering: LexOrdering, + ) -> PlanProperties { + // With a single input partition `execute` passes the input through + // unchanged, so we inherit its pipeline behavior and scheduling. With + // multiple partitions we materialize all input before emitting, so the + // operator is pipeline-breaking (`Final`) and eagerly driven. + let input_partitions = input.output_partitioning().partition_count(); + let (emission, evaluation, scheduling) = if input_partitions > 1 { + ( + EmissionType::Final, + EvaluationType::Eager, + SchedulingType::Cooperative, + ) + } else { + ( + input.pipeline_behavior(), + input.properties().evaluation_type, + input.properties().scheduling_type, + ) + }; + + let mut eq_properties = input.equivalence_properties().clone(); + eq_properties.clear_per_partition_constants(); + eq_properties.add_ordering(ordering); + PlanProperties::new( + eq_properties, // Equivalence Properties + Partitioning::UnknownPartitioning(1), // Output Partitioning + emission, // Pipeline behavior + input.boundedness(), // Boundedness + ) + .with_evaluation_type(evaluation) + .with_scheduling_type(scheduling) + } +} + +impl DisplayAs for ParallelSortPreservingMergeExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "ParallelSortPreservingMergeExec: [{}]", self.expr) + } + DisplayFormatType::TreeRender => { + for (i, e) in self.expr().iter().enumerate() { + e.fmt_sql(f)?; + if i != self.expr().len() - 1 { + write!(f, ", ")?; + } + } + Ok(()) + } + } + } +} + +impl ExecutionPlan for ParallelSortPreservingMergeExec { + fn name(&self) -> &'static str { + "ParallelSortPreservingMergeExec" + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::UnspecifiedDistribution] + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn required_input_ordering(&self) -> Vec> { + vec![Some(OrderingRequirements::from(self.expr.clone()))] + } + + fn maintains_input_order(&self) -> Vec { + vec![true] + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + check_if_same_properties!(self, children); + let mut exec = ParallelSortPreservingMergeExec::new( + self.expr.clone(), + children.swap_remove(0), + ) + .with_round_robin_repartition(self.enable_round_robin_repartition); + exec.target_buckets = self.target_buckets; + Ok(Arc::new(exec)) + } + + /// The parallel merge materializes all of its input and cannot stop early, + /// so it never accepts a pushed-down fetch/limit. Returning `None` lets the + /// limit-pushdown rule keep a `LimitExec` above this operator. + fn with_fetch(&self, _limit: Option) -> Option> { + None + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + trace!( + "Start ParallelSortPreservingMergeExec::execute for partition: {partition}" + ); + assert_eq_or_internal_err!( + partition, + 0, + "ParallelSortPreservingMergeExec invalid partition {partition}" + ); + + let input_partitions = self.input.output_partitioning().partition_count(); + let schema = self.schema(); + + match input_partitions { + 0 => { + return internal_err!( + "ParallelSortPreservingMergeExec requires at least one input partition" + ); + } + // A single sorted input partition is already globally sorted. + 1 => return self.input.execute(0, context), + _ => {} + } + + // Run each input partition on its own task so the upstream sorts proceed + // in parallel while we drain them. + let input_streams = (0..input_partitions) + .map(|p| { + Ok(spawn_buffered( + self.input.execute(p, Arc::clone(&context))?, + 2, + )) + }) + .collect::>>()?; + + let target_buckets = self + .target_buckets + .unwrap_or_else(|| context.session_config().target_partitions()); + + let cfg = MergeConfig { + schema: Arc::clone(&schema), + expr: self.expr.clone(), + metrics: self.metrics.clone(), + batch_size: context.session_config().batch_size(), + target_buckets, + enable_round_robin_repartition: self.enable_round_robin_repartition, + memory_pool: Arc::clone(&context.runtime_env().memory_pool), + partition, + }; + + // Materialization is async; build the merge lazily on first poll and + // flatten the resulting stream into the output. + let fut = async move { parallel_merge(input_streams, cfg).await }; + let stream = stream::once(fut).try_flatten(); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics_with_args(&self, _args: &StatisticsArgs) -> Result> { + // The parallel merge materializes all input rows into a single output + // partition, so its statistics equal the input's overall statistics + // regardless of the requested partition. + self.input.statistics_with_args(&StatisticsArgs::new()) + } + + fn cardinality_effect(&self) -> CardinalityEffect { + CardinalityEffect::Equal + } +} + +/// Configuration captured from `execute` and used to drive the parallel merge. +struct MergeConfig { + schema: SchemaRef, + expr: LexOrdering, + metrics: ExecutionPlanMetricsSet, + batch_size: usize, + target_buckets: usize, + enable_round_robin_repartition: bool, + memory_pool: Arc, + partition: usize, +} + +/// A locally sorted input partition, fully materialized in memory. +struct Run { + /// The sorted record batches of this run (payload kept zero-copy). + batches: Vec, + /// Prefix sums of row counts: `offsets[k]` is the number of rows in + /// `batches[0..k]`. `offsets` has length `batches.len() + 1` and is strictly + /// increasing (empty batches are dropped). + offsets: Vec, + /// Sort-key expressions, used to encode individual rows on demand. + key_exprs: Arc<[Arc]>, + /// Shared converter producing byte-comparable key encodings. + converter: Arc, + /// Keeps the memory of `batches` accounted for the lifetime of the run. + _reservation: MemoryReservation, +} + +impl Run { + fn len(&self) -> usize { + self.offsets.last().copied().unwrap_or(0) + } + + /// Map a global row index to `(batch index, offset within batch)`. + fn locate(&self, pos: usize) -> (usize, usize) { + // `offsets` is strictly increasing and starts at 0, so the batch holding + // `pos` is the one starting at the last offset that is `<= pos`. + let batch = self.offsets.partition_point(|&o| o <= pos) - 1; + (batch, pos - self.offsets[batch]) + } + + /// Append the encoded sort key at global row index `pos` to `rows`. + /// + /// Keys are encoded on demand: pivot selection only probes + /// `O(samples + buckets * runs * log n)` rows, so we never encode the whole + /// run (the per-bucket merge encodes the rows it actually consumes). The + /// caller controls `rows` so encodings can either accumulate (sampling) or + /// be cleared and reused per probe (binary search). + fn append_key(&self, pos: usize, rows: &mut Rows) -> Result<()> { + let (batch, offset) = self.locate(pos); + let row = self.batches[batch].slice(offset, 1); + let cols = evaluate_expressions_to_arrays(self.key_exprs.iter(), &row)?; + self.converter.append(rows, &cols)?; + Ok(()) + } + + /// A reusable single-row buffer for [`Self::append_key`]. + fn key_scratch(&self) -> Rows { + self.converter.empty_rows(1, 0) + } + + /// First index `k` in `[0, len)` whose encoded key is `>= pivot`. + /// + /// Encoded rows are byte-comparable, so a plain `&[u8]` comparison matches + /// the lexicographic order defined by the [`RowConverter`]. `scratch` is a + /// reused single-row buffer so the search allocates nothing per probe. + fn lower_bound(&self, pivot: &[u8], scratch: &mut Rows) -> Result { + let mut lo = 0; + let mut hi = self.len(); + while lo < hi { + let mid = lo + (hi - lo) / 2; + scratch.clear(); + self.append_key(mid, scratch)?; + if scratch.row(0).as_ref() < pivot { + lo = mid + 1; + } else { + hi = mid; + } + } + Ok(lo) + } + + /// Zero-copy slices of this run covering the global row range `[lo, hi)`. + fn slice(&self, lo: usize, hi: usize) -> Vec { + let mut out = Vec::new(); + for (k, batch) in self.batches.iter().enumerate() { + let b_start = self.offsets[k]; + let b_end = self.offsets[k + 1]; + if b_end <= lo { + continue; + } + if b_start >= hi { + break; + } + let start = lo.max(b_start) - b_start; + let end = hi.min(b_end) - b_start; + if end > start { + out.push(batch.slice(start, end - start)); + } + } + out + } +} + +/// Drive the full parallel merge: materialize the inputs, pick pivots, cut each +/// run, and spawn one concurrent merge per bucket. Returns a single sorted +/// output stream. +async fn parallel_merge( + input_streams: Vec, + cfg: MergeConfig, +) -> Result { + let MergeConfig { + schema, + expr, + metrics, + batch_size, + target_buckets, + enable_round_robin_repartition, + memory_pool, + partition, + } = cfg; + + // A single shared converter makes encoded keys comparable across all runs + // and against the pivots. + let sort_fields = expr + .iter() + .map(|sort| { + let data_type = sort.expr.data_type(schema.as_ref())?; + Ok(SortField::new_with_options(data_type, sort.options)) + }) + .collect::>>()?; + let converter = Arc::new(RowConverter::new(sort_fields)?); + let key_exprs: Arc<[Arc]> = + expr.iter().map(|sort| Arc::clone(&sort.expr)).collect(); + + // Materialize the runs. Each input stream is already on its own task, so + // polling them concurrently drives the upstream sorts in parallel. + let collect_futures = input_streams.into_iter().map(|stream| { + let converter = Arc::clone(&converter); + let key_exprs = Arc::clone(&key_exprs); + let reservation = MemoryConsumer::new(format!( + "ParallelSortPreservingMergeExec[{partition}] run" + )) + .register(&memory_pool); + async move { collect_run(stream, converter, key_exprs, reservation).await } + }); + let runs: Vec = futures::future::try_join_all(collect_futures).await?; + + let total_rows: usize = runs.iter().map(Run::len).sum(); + if total_rows == 0 { + return Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::empty(), + ))); + } + + // Choose the number of buckets: never more than there are rows to fill a + // batch, never more than the target. + let max_useful_buckets = total_rows.div_ceil(batch_size.max(1)).max(1); + let num_buckets = target_buckets.clamp(1, max_useful_buckets); + + let pivots = choose_pivots(&runs, num_buckets, &converter)?; + + // Cut each run by the pivots: `cuts[run][b]` for b in 0..=num_buckets, with + // the first cut at 0 and the last at the run length. + let cuts: Vec> = runs + .iter() + .map(|run| { + let mut scratch = run.key_scratch(); + let mut c = Vec::with_capacity(num_buckets + 1); + c.push(0); + for pivot in &pivots { + c.push(run.lower_bound(pivot, &mut scratch)?); + } + c.push(run.len()); + Ok(c) + }) + .collect::>>()?; + + let runs = Arc::new(runs); + + // Merge each bucket on its own task so the buckets run concurrently across + // worker threads (a single k-way merge per bucket, reusing the optimized + // loser-tree merge). Each task fully merges its bucket into a `Vec`. + let mut handles: Vec>>> = + Vec::with_capacity(num_buckets); + for bucket in 0..num_buckets { + let mut sub_streams: Vec = + Vec::with_capacity(runs.len()); + for (run_idx, run) in runs.iter().enumerate() { + let lo = cuts[run_idx][bucket]; + let hi = cuts[run_idx][bucket + 1]; + if hi <= lo { + continue; + } + let slices = run.slice(lo, hi); + let sub = stream::iter(slices.into_iter().map(Ok)); + sub_streams.push(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&schema), + sub, + ))); + } + + // Empty buckets contribute nothing to the ordered output. + if sub_streams.is_empty() { + continue; + } + + let reservation = MemoryConsumer::new(format!( + "ParallelSortPreservingMergeExec[{partition}] bucket {bucket}" + )) + .register(&memory_pool); + let merge_stream = StreamingMergeBuilder::new() + .with_streams(sub_streams) + .with_schema(Arc::clone(&schema)) + .with_expressions(&expr) + .with_metrics(BaselineMetrics::new(&metrics, partition)) + .with_batch_size(batch_size) + .with_reservation(reservation) + .with_round_robin_tie_breaker(enable_round_robin_repartition) + .build()?; + + handles.push(SpawnedTask::spawn(async move { + merge_stream.try_collect::>().await + })); + } + + // Collect the buckets in order. The tasks were spawned up front and run + // concurrently, so awaiting them sequentially preserves the global order + // without serializing the merge work. Concatenating the bucket outputs in + // order yields a single globally sorted stream. + let mut output = Vec::new(); + for handle in handles { + let batches = handle.join_unwind().await.map_err(|e| { + DataFusionError::Execution(format!( + "ParallelSortPreservingMergeExec bucket task failed: {e}" + )) + })??; + output.extend(batches); + } + // The merged output owns freshly interleaved arrays, so the input runs (and + // their key encodings) can be released now. + drop(runs); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::iter(output.into_iter().map(Ok)), + ))) +} + +/// Fully drain a sorted input stream into a [`Run`]. +/// +/// Only the payload batches are buffered; sort keys are encoded lazily by +/// [`Run::append_key`] during pivot selection. +async fn collect_run( + mut stream: SendableRecordBatchStream, + converter: Arc, + key_exprs: Arc<[Arc]>, + reservation: MemoryReservation, +) -> Result { + let mut batches = Vec::new(); + let mut offsets = vec![0usize]; + + while let Some(batch) = stream.next().await { + let batch = batch?; + let num_rows = batch.num_rows(); + if num_rows == 0 { + continue; + } + reservation.try_grow(get_record_batch_memory_size(&batch))?; + offsets.push(offsets.last().unwrap() + num_rows); + batches.push(batch); + } + + Ok(Run { + batches, + offsets, + key_exprs, + converter, + _reservation: reservation, + }) +} + +/// Select `num_buckets - 1` pivots (split points) by regular sampling. +/// +/// Samples are drawn at a fixed global stride so that each run contributes a +/// number of samples proportional to its length. The merged sample is sorted +/// and `num_buckets - 1` evenly spaced quantiles are returned as pivots. Each +/// pivot is the raw byte encoding of a sampled key (comparable to run keys). +fn choose_pivots( + runs: &[Run], + num_buckets: usize, + converter: &RowConverter, +) -> Result>> { + if num_buckets <= 1 { + return Ok(Vec::new()); + } + + let total_rows: usize = runs.iter().map(Run::len).sum(); + let target_samples = (num_buckets * OVERSAMPLE_PER_BUCKET).min(total_rows).max(1); + let stride = (total_rows / target_samples).max(1); + + // Accumulate every sample into a single row buffer instead of one `Vec` + // per sample. + let mut samples = converter.empty_rows(target_samples, 0); + for run in runs { + let len = run.len(); + let mut pos = stride / 2; + while pos < len { + run.append_key(pos, &mut samples)?; + pos += stride; + } + } + + let m = samples.num_rows(); + if m == 0 { + return Ok(Vec::new()); + } + // Sort indices into the buffer by their (byte-comparable) encoded key; the + // byte order of the encoding equals the sort order. + let mut order: Vec = (0..m).collect(); + order.sort_unstable_by(|&a, &b| samples.row(a).cmp(&samples.row(b))); + + // Pick `num_buckets - 1` evenly spaced quantiles as pivots, owning only + // those few bytes for use during binary search. + Ok((1..num_buckets) + .map(|j| { + let idx = ((j * m) / num_buckets).min(m - 1); + samples.row(order[idx]).as_ref().to_vec() + }) + .collect()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::collect; + use crate::expressions::col; + use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; + use crate::test::TestMemoryExec; + + use arrow::array::{Array, ArrayRef, Int64Array}; + use arrow::compute::SortOptions; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_execution::TaskContext; + use datafusion_execution::config::SessionConfig; + use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + + /// Build a two-column (`k` key, `v` payload == key) schema. + fn test_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("k", DataType::Int64, true), + Field::new("v", DataType::Int64, true), + ])) + } + + /// One sorted partition from key values. The payload column `v` mirrors the + /// key so we can detect rows whose payload got separated from their key. + fn sorted_partition( + schema: &Arc, + mut keys: Vec>, + opts: SortOptions, + rows_per_batch: usize, + ) -> Vec { + keys.sort_by(|a, b| match (a, b) { + (None, None) => std::cmp::Ordering::Equal, + (None, _) => { + if opts.nulls_first { + std::cmp::Ordering::Less + } else { + std::cmp::Ordering::Greater + } + } + (_, None) => { + if opts.nulls_first { + std::cmp::Ordering::Greater + } else { + std::cmp::Ordering::Less + } + } + (Some(a), Some(b)) => { + if opts.descending { + b.cmp(a) + } else { + a.cmp(b) + } + } + }); + keys.chunks(rows_per_batch.max(1)) + .map(|chunk| { + let k: ArrayRef = Arc::new(Int64Array::from(chunk.to_vec())); + let v: ArrayRef = Arc::new(Int64Array::from(chunk.to_vec())); + RecordBatch::try_new(Arc::clone(schema), vec![k, v]).unwrap() + }) + .collect() + } + + fn key_sequence(batches: &[RecordBatch]) -> Vec> { + let mut out = Vec::new(); + for batch in batches { + let arr = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..arr.len() { + out.push(if arr.is_null(i) { + None + } else { + Some(arr.value(i)) + }); + } + } + out + } + + /// The payload `v` must equal the key `k` in every output row, otherwise the + /// merge separated a row's columns. + fn assert_payload_matches_key(batches: &[RecordBatch]) { + for batch in batches { + let k = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let v = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(k, v, "payload column diverged from key column"); + } + } + + async fn run_and_compare( + partitions: Vec>>, + opts: SortOptions, + target_buckets: usize, + batch_size: usize, + ) { + let schema = test_schema(); + let rows_per_batch = 3; + let data: Vec> = partitions + .into_iter() + .map(|keys| sorted_partition(&schema, keys, opts, rows_per_batch)) + .collect(); + + let sort: LexOrdering = [PhysicalSortExpr { + expr: col("k", &schema).unwrap(), + options: opts, + }] + .into(); + + let config = SessionConfig::new().with_batch_size(batch_size); + let ctx = Arc::new(TaskContext::default().with_session_config(config)); + + // Reference: single-threaded sort preserving merge. + let spm_input = + TestMemoryExec::try_new_exec(&data, Arc::clone(&schema), None).unwrap(); + let spm = Arc::new(SortPreservingMergeExec::new(sort.clone(), spm_input)); + let expected = collect(spm, Arc::clone(&ctx)).await.unwrap(); + + // Parallel merge. + let par_input = + TestMemoryExec::try_new_exec(&data, Arc::clone(&schema), None).unwrap(); + let par = Arc::new( + ParallelSortPreservingMergeExec::new(sort, par_input) + .with_target_buckets(target_buckets), + ); + let actual = collect(par, ctx).await.unwrap(); + + let expected_keys = key_sequence(&expected); + let actual_keys = key_sequence(&actual); + + // Same number of rows and identical key sequence (ties keep the same key, + // so the key sequence is deterministic regardless of tie-break order). + assert_eq!( + expected_keys.len(), + actual_keys.len(), + "row count mismatch: expected {} got {}", + expected_keys.len(), + actual_keys.len() + ); + assert_eq!(expected_keys, actual_keys, "key sequence diverged from SPM"); + assert_payload_matches_key(&actual); + } + + fn asc() -> SortOptions { + SortOptions { + descending: false, + nulls_first: true, + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn unique_keys_match_spm() { + // Four interleaved sorted runs with globally unique keys. + let partitions = (0..4) + .map(|p| (0..25).map(|i| Some(p + 4 * i)).collect()) + .collect(); + run_and_compare(partitions, asc(), 4, 4).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn low_cardinality_keys_match_spm() { + // Heavy ties: only three distinct keys spread across runs. Pivots will + // collapse; buckets become uneven but output must stay correct. + let partitions = vec![ + vec![Some(0); 30] + .into_iter() + .chain(vec![Some(1); 10]) + .collect(), + vec![Some(1); 20] + .into_iter() + .chain(vec![Some(2); 20]) + .collect(), + vec![Some(0); 5] + .into_iter() + .chain(vec![Some(2); 35]) + .collect(), + ]; + run_and_compare(partitions, asc(), 8, 4).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn descending_with_nulls_match_spm() { + let opts = SortOptions { + descending: true, + nulls_first: false, + }; + let partitions = vec![ + vec![Some(5), Some(3), None, Some(1)], + vec![Some(9), Some(3), Some(3), None, None], + vec![Some(8), Some(2), Some(0)], + ]; + run_and_compare(partitions, opts, 4, 2).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn uneven_partition_sizes_match_spm() { + let partitions = vec![ + (0..100).map(Some).collect(), + vec![Some(50)], + vec![], + (0..7).map(|i| Some(i * 13)).collect(), + ]; + run_and_compare(partitions, asc(), 8, 5).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn more_buckets_than_rows_match_spm() { + let partitions = vec![vec![Some(1), Some(4)], vec![Some(2), Some(3)]]; + run_and_compare(partitions, asc(), 64, 1).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn single_partition_passthrough() { + let partitions = vec![(0..20).map(Some).collect()]; + run_and_compare(partitions, asc(), 4, 4).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn all_empty_partitions() { + let partitions = vec![vec![], vec![], vec![]]; + run_and_compare(partitions, asc(), 4, 4).await; + } +} diff --git a/datafusion/sqllogictest/test_files/array_agg_sliding_window.slt b/datafusion/sqllogictest/test_files/array_agg_sliding_window.slt index 6f0712e2a6929..c828794b1dcb7 100644 --- a/datafusion/sqllogictest/test_files/array_agg_sliding_window.slt +++ b/datafusion/sqllogictest/test_files/array_agg_sliding_window.slt @@ -430,4 +430,4 @@ statement ok DROP TABLE t_dist_parts; statement ok -DROP TABLE t_dist_int; \ No newline at end of file +DROP TABLE t_dist_int; diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 24b1262e026f4..5cce925d10f13 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -250,6 +250,7 @@ physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE +physical_plan after ParallelSortMerge SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -332,6 +333,7 @@ physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSP physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE +physical_plan after ParallelSortMerge SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -378,6 +380,7 @@ physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSP physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE +physical_plan after ParallelSortMerge SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -624,6 +627,7 @@ physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after TopKRepartition SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE +physical_plan after ParallelSortMerge SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 0932f58a7c03f..d4665163a0608 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -308,6 +308,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_join_dynamic_filter_pushdown true datafusion.optimizer.enable_leaf_expression_pushdown true +datafusion.optimizer.enable_parallel_sort_merge true datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery true datafusion.optimizer.enable_piecewise_merge_join false datafusion.optimizer.enable_round_robin_repartition true @@ -465,6 +466,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to tru datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. +datafusion.optimizer.enable_parallel_sort_merge true When set to true, eligible `SortPreservingMergeExec` operators are replaced with a parallel merge that splits the locally sorted inputs into key ranges (via regular sampling of the sorted runs) and merges those ranges concurrently across `target_partitions` threads, before concatenating them in order into a single sorted stream. This trades extra memory (the sorted inputs are materialized) for parallelism in the otherwise single-threaded merge. It only applies to merges without a fetch/limit over bounded, multi-partition inputs. Default: true datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery true When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule. Note disabling this option is not recommended. It restores pre behavior, which silently produces incorrect results for multi-row subqueries and does not support scalar subqueries in ORDER BY / JOIN ON / aggregate-function arguments. This option is intended as a temporary escape hatch for distributed execution frameworks and is planned to be removed in a future DataFusion release. datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 082b10167274c..4da7230cd149c 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -2898,7 +2898,7 @@ query TT explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id ---- physical_plan -01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] 02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] 03)----DataSourceExec: partitions=1, partition_sizes=[1] 04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -2934,7 +2934,7 @@ query TT explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id ---- physical_plan -01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] 02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] 03)----DataSourceExec: partitions=1, partition_sizes=[1] 04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -2991,7 +2991,7 @@ query TT explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id ---- physical_plan -01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] 02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] 03)----DataSourceExec: partitions=1, partition_sizes=[1] 04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -3027,7 +3027,7 @@ query TT explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id ---- physical_plan -01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] 02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] 03)----DataSourceExec: partitions=1, partition_sizes=[1] 04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -3085,7 +3085,7 @@ query TT explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id ---- physical_plan -01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] 02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0 03)----DataSourceExec: partitions=1, partition_sizes=[1] 04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -3102,7 +3102,7 @@ query TT explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id ---- physical_plan -01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] 02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1 03)----DataSourceExec: partitions=1, partition_sizes=[1] 04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -3157,7 +3157,7 @@ query TT explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id ---- physical_plan -01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] 02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0 03)----DataSourceExec: partitions=1, partition_sizes=[1] 04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -3174,7 +3174,7 @@ query TT explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id ---- physical_plan -01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] 02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1 03)----DataSourceExec: partitions=1, partition_sizes=[1] 04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true] diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 22aaf09dff31f..3e103c7057c70 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -5342,7 +5342,7 @@ logical_plan 04)------Filter: t1.c1 = Int32(2) OR t1.c1 = Int32(3) 05)--------TableScan: t1 projection=[c1, c2] physical_plan -01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank@2 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rank] 03)----BoundedWindowAggExec: wdw=[rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 04)------SortExec: expr=[c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST], preserve_partitioning=[true] @@ -5382,7 +5382,7 @@ logical_plan 04)------WindowAggr: windowExpr=[[rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 05)--------TableScan: t1 projection=[c1, c2] physical_plan -01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank@2 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rank] 03)----FilterExec: c2@1 >= 10 04)------BoundedWindowAggExec: wdw=[rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] @@ -5421,7 +5421,7 @@ logical_plan 05)--------Filter: t1.c1 = Int32(1) 06)----------TableScan: t1 projection=[c1, c2] physical_plan -01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank@2 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rank] 03)----FilterExec: c2@1 = 10 04)------BoundedWindowAggExec: wdw=[rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] @@ -5459,7 +5459,7 @@ logical_plan 04)------WindowAggr: windowExpr=[[rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 05)--------TableScan: t1 projection=[c1, c2] physical_plan -01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank@2 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rank] 03)----FilterExec: c1@0 = 1 OR c2@1 = 10 04)------BoundedWindowAggExec: wdw=[rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] @@ -5500,7 +5500,7 @@ logical_plan 05)--------Filter: t1.c1 > Int32(1) 06)----------TableScan: t1 projection=[c1, c2] physical_plan -01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank1@2 ASC NULLS LAST, rank2@3 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank1@2 ASC NULLS LAST, rank2@3 ASC NULLS LAST] 02)--SortExec: expr=[c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank1@2 ASC NULLS LAST, rank2@3 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rank1, rank() PARTITION BY [t1.c2, t1.c1] ORDER BY [t1.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rank2] 04)------BoundedWindowAggExec: wdw=[rank() PARTITION BY [t1.c2, t1.c1] ORDER BY [t1.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [t1.c2, t1.c1] ORDER BY [t1.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] @@ -5548,7 +5548,7 @@ logical_plan 05)--------WindowAggr: windowExpr=[[rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 06)----------TableScan: t1 projection=[c1, c2] physical_plan -01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank1@2 ASC NULLS LAST, rank2@3 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank1@2 ASC NULLS LAST, rank2@3 ASC NULLS LAST] 02)--SortExec: expr=[c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST, rank1@2 ASC NULLS LAST, rank2@3 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, rank() PARTITION BY [t1.c1] ORDER BY [t1.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rank1, rank() PARTITION BY [t1.c2, t1.c1] ORDER BY [t1.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rank2] 04)------BoundedWindowAggExec: wdw=[rank() PARTITION BY [t1.c2, t1.c1] ORDER BY [t1.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [t1.c2, t1.c1] ORDER BY [t1.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] @@ -5888,7 +5888,7 @@ logical_plan 03)----WindowAggr: windowExpr=[[count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW]] 04)------TableScan: table_test_distinct_count projection=[k, v, time] physical_plan -01)SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST] 02)--ProjectionExec: expr=[k@0 as k, time@2 as time, count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@3 as normal_count, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@4 as distinct_count] 03)----BoundedWindowAggExec: wdw=[count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { "count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW": Int64 }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { "count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW": Int64 }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted] 04)------SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST], preserve_partitioning=[true] @@ -5950,7 +5950,7 @@ logical_plan 04)------Projection: CAST(table_test_distinct_count.v AS Int64) AS __common_expr_1, table_test_distinct_count.k, table_test_distinct_count.time 05)--------TableScan: table_test_distinct_count projection=[k, v, time] physical_plan -01)SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST] +01)ParallelSortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST] 02)--ProjectionExec: expr=[k@1 as k, time@2 as time, sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@3 as sum_v, sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@4 as sum_distinct_v] 03)----BoundedWindowAggExec: wdw=[sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { "sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW, sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { "sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted] 04)------SortExec: expr=[k@1 ASC NULLS LAST, time@2 ASC NULLS LAST], preserve_partitioning=[true] @@ -6816,7 +6816,7 @@ ORDER BY i; statement ok reset datafusion.execution.batch_size; -# The SLT runner sets `target_partitions` to 4 instead of using the default, so +# The SLT runner sets `target_partitions` to 4 instead of using the default, so # reset it explicitly. statement ok set datafusion.execution.target_partitions = 4; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f70daef317216..777ccd134d281 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -179,6 +179,7 @@ The following configuration settings are available: | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | | datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true | +| datafusion.optimizer.enable_parallel_sort_merge | true | When set to true, eligible `SortPreservingMergeExec` operators are replaced with a parallel merge that splits the locally sorted inputs into key ranges (via regular sampling of the sorted runs) and merges those ranges concurrently across `target_partitions` threads, before concatenating them in order into a single sorted stream. This trades extra memory (the sorted inputs are materialized) for parallelism in the otherwise single-threaded merge. It only applies to merges without a fetch/limit over bounded, multi-partition inputs. Default: true | | datafusion.optimizer.enable_leaf_expression_pushdown | true | When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. | | datafusion.optimizer.enable_unions_to_filter | false | When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that read from the same source and differ only by filter predicates into a single branch with a combined filter. This optimization is conservative and only applies when the branches share the same source and compatible wrapper nodes such as identical projections or aliases. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |