Skip to content

Improve internal worker parallelism support #23174

Description

@2010YOUY01

Is your feature request related to a problem or challenge?

Original discussion from @alamb : #23026 (comment)

Summary: DataFusion mostly uses repartition-based parallelism today, but at some point we need to introduce intra-partition parallelism, and we have to do that carefully for performance.

The existing SortExec has already exposed some internal worker parallelism: it is possible to have a large number of concurrent workers for local sorting.

  • let streams = std::mem::take(&mut self.in_mem_batches)
    .into_iter()
    .map(|batch| {
    let metrics = self.metrics.baseline.intermediate();
    let reservation = self
    .reservation
    .split(get_reserved_bytes_for_record_batch(&batch)?);
    let input = self.sort_batch_stream(batch, &metrics, reservation)?;
    Ok(spawn_buffered(input, 1))

This issue explains the background and proposes some ideas for improving this support.

What is internal worker parallelism

DataFusion mostly uses repartition based parallelism, each partition has independent data, and we use 1 CPU core to process one partition, here is a parallel aggregation query example:

Image

For certain workloads, the assumptions for repartition are not ideal, here are 3 motivating examples

Motivating Example 1: memory pressure case

Let's say we're doing a large sort (data size >> memory), on a machine with 32 cores, 64GB memory. The default setting will execute it with 32 global partitions, and with each partition a classic external sort algorithm is executed (local sort, spill disk, and finally read back and sort-preserving merge)
The issue is that per-partition memory budget is low, the spilling might create smaller sorted runs, and the end-to-end execution requires extra spills, reading back, and merging smaller files.
A more ideal plan shape is:

  • the scanner keep 32 partitions, since they're not memory intensive and scalable with number of CPU cores
  • SortExec shrink to 8 partitions, with 4 internal workers per partition. This gives each partition more memory budget to proceed easier; also there are efficient algorithms to parallelize sort and sort-preserving merge within partition.
Image

Motivating Example 2: segment-tree based parallelism in window functions

The window query in the figure is impossible to parallelize with repartition, because it assumes data independence among partitions, and the query has one global partition, and window frame changes every row.
At the meantime, there is a very parallel algorithm if we can allow shared memory among partitions:

Then the ideal query shape become

(any downstream exec)
-- RepartitionExec(round-robin on batch, input_partitions=1, otuput_partitions=32)
---- WindowExec(partition=1, internal_parallelism=32)
------ CoalescePartitionExec(input_partitions=32, output_partitions=1)
-------- CsvExec(partition=32, internal_parallelism=1)
Image

Motivating Example 3

This PR from @Dandandan seems also tries to introduce intra partition parallelism

Describe the solution you'd like

  • Establish conventions for intra-partition parallelism. For example, each execution plan stage may want to maintain a similar total concurrency level: partition_count * internal_workers_per_partition. See the CsvExec + WindowExec example above.
  • Improve Explain output for internal parallelism. The existing single-partition SortExec can still use internal parallelism, but the plan currently looks serial, which makes it hard to inspect potential performance issues.

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions