Skip to content

[EPIC] Improve window function performance for large windows #23197

Description

@alamb

Is your feature request related to a problem or challenge?

This ticket tracks various ideas to make evaluating window functions faster for single, often large logical window partitions. This has come up several times recently with @avantgardnerio @2010YOUY01 and @wirybeaver among others.

The core use case is to make evaluating a query like this fast:

SELECT
    -- moving average over the current row and the 5 previous rows
    AVG(cpu_usage) OVER (ORDER BY time ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as avg_cpu
FROM metrics;

The key property is that the query has one large logical window partition because there is no PARTITION BY clause in the window specification. Even though each frame contains only the current row and 5 preceding rows, the operator must process one global ordered window partition.

A similar problem can happen when there is a PARTITION BY clause but the logical window partitions are imbalanced due to skew or a small number of partition keys.

Today, each logical window partition is executed in a single DataFusion execution partition, which means:

  1. It is limited to a single core
  2. It is limited to a single machine in distributed environments

Background

DataFusion has support for many window functions (see documentation). Window functions are used like this:

SELECT
    customer, time, cpu_usage,
    -- moving average over the current row and the 5 previous rows
    AVG(cpu_usage) OVER (PARTITION BY customer ORDER BY time ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as avg_cpu
FROM
    metrics;

Which results in something like:

+----------+---------------------+-----------+---------+
| customer | time                | cpu_usage | avg_cpu |
+----------+---------------------+-----------+---------+
| acme     | 2026-01-01T00:00:00 | 10.0      | 10.0    |
| acme     | 2026-01-01T00:01:00 | 20.0      | 15.0    |
...
| globex   | 2026-01-01T00:03:00 | 45.0      | 30.0    |
| globex   | 2026-01-01T00:04:00 | 55.0      | 35.0    |
+----------+---------------------+-----------+---------+

The plan for such a query looks like the following, and typically keeps all cores fully occupied:

BoundedWindowAggExec(...)
  SortExec(customer, time)
    RepartitionExec(Hash(customer))  <-- divides work among partitions and thus cores
      Scan

Here is an example from the tests:

01)ProjectionExec: expr=[depname@0 as depname, running_total@1 as running_total]
02)--SortPreservingMergeExec: [depname@0 ASC NULLS LAST, empno@2 ASC NULLS LAST], fetch=3
03)----ProjectionExec: expr=[depname@0 as depname, sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as running_total, empno@1 as empno]
04)------BoundedWindowAggExec: wdw=[sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
05)--------SortExec: TopK(fetch=3), expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST], preserve_partitioning=[true]
06)----------RepartitionExec: partitioning=Hash([depname@0], 4), input_partitions=1
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[depname, empno], file_type=csv, has_header=true

However, for the case in question, when we remove the PARTITION BY customer from the OVER clause:

SELECT
    customer, time, cpu_usage,
    -- moving average over the current row and the 5 previous rows
    AVG(cpu_usage) OVER (ORDER BY time ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as avg_cpu
FROM
    metrics;

The plan looks like this (no RepartitionExec, and it executes in a single core):

BoundedWindowAggExec(...)  <-- single partition, single core
  SortExec(time)
    Scan

Here is an example:

01)ProjectionExec: expr=[sn@0 as sn, ts@1 as ts, currency@2 as currency, amount@3 as amount, sum(table_with_pk.amount) ORDER BY [table_with_pk.sn ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum1]
02)--BoundedWindowAggExec: wdw=[sum(table_with_pk.amount) ORDER BY [table_with_pk.sn ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(table_with_pk.amount) ORDER BY [table_with_pk.sn ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Float64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
03)----SortExec: expr=[sn@0 ASC NULLS LAST], preserve_partitioning=[false]
04)------DataSourceExec: partitions=1, partition_sizes=[1]

Table definition

CREATE TABLE metrics (
  time TIMESTAMP,
  customer VARCHAR,
  cpu_usage DOUBLE
);

INSERT INTO metrics VALUES
  (TIMESTAMP '2026-01-01 00:00:00', 'acme', 10.0),
  (TIMESTAMP '2026-01-01 00:01:00', 'acme', 20.0),
  (TIMESTAMP '2026-01-01 00:02:00', 'acme', 30.0),
  (TIMESTAMP '2026-01-01 00:03:00', 'acme', 40.0),
  (TIMESTAMP '2026-01-01 00:04:00', 'acme', 50.0),
  (TIMESTAMP '2026-01-01 00:00:00', 'globex', 15.0),
  (TIMESTAMP '2026-01-01 00:01:00', 'globex', 25.0),
  (TIMESTAMP '2026-01-01 00:02:00', 'globex', 35.0),
  (TIMESTAMP '2026-01-01 00:03:00', 'globex', 45.0),
  (TIMESTAMP '2026-01-01 00:04:00', 'globex', 55.0);

Describe the solution you'd like

  • DataFusion can use multiple cores to evaluate window functions for large logical window partitions
  • Distributed systems such as Ballista can use multiple machines to compute window function results in parallel
  • DataFusion can complete window function queries even when the logical window partitions are larger than available memory

Describe alternatives you've considered

There are several related approaches, listed below. They are complementary rather than mutually exclusive.

Additional context

Prefix Sums / Prefix Scan

Used for cumulative windows such as ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.

"Halo" Rows / Parallel Bounded Windows

Used for bounded window frames where each output partition needs some rows from neighboring ranges. Note I don't think "halo rows" is a standard database term; it means "overlapping partitions with replicated boundary rows".

WindowAggExec Memory / Spilling

Avoid WindowAggExec OOMs by processing partitions incrementally and spilling when needed.

Window Frame Evaluation / Vectorization

Improve CPU and memory efficiency for window frame boundary calculations.

Intra-Operator Parallelism

Alternative/complementary approach: keep the logical window as one partition, but parallelize execution inside the operator.

Adaptive Query Execution / Runtime-Informed Planning

Relevant if DataFusion wants a general framework for runtime stats, dynamic split points, repartition choices, skew handling, and similar optimizations.

Related Sort / Merge Parallelism

Relevant because single-partition windows often sit downstream of sort / merge bottlenecks.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions