Skip to content

feat: use indexes to accelerate filtered count_rows#6916

Open
westonpace wants to merge 18 commits into
lance-format:mainfrom
westonpace:test-aggregate-pushdown-e2e
Open

feat: use indexes to accelerate filtered count_rows#6916
westonpace wants to merge 18 commits into
lance-format:mainfrom
westonpace:test-aggregate-pushdown-e2e

Conversation

@westonpace
Copy link
Copy Markdown
Member

No description provided.

westonpace and others added 17 commits May 18, 2026 16:42
Lands the plan-time and execute-time halves of aggregate pushdown.
Not yet wired into the scanner.

Plan side (rust/lance-index/src/expression/):
- Moves scalar/expression.rs to expression/scalar.rs, paralleling the
  new expression/aggregate.rs.
- AnyAggregateQuery and AggregateQueryParser traits.
- AggregateIndexSearch leaf with optional index_name, parsed query,
  optional per-aggregate filter, and the original SELECT expression.
- CountQuery (basic / approx / distinct / approx_distinct) and
  CountQueryParser.

Scalar index trait (rust/lance-index/src/scalar.rs):
- ScalarIndex::calculate_aggregate returning a partial-state ArrowScalar;
  default-error stubs added to btree, bitmap, bloomfilter, inverted,
  json, label_list, ngram, rtree, zonemap, and the LogicalScalarIndex
  wrapper.
- Re-exports lance_arrow_scalar::ArrowScalar through scalar::.

Execute side (rust/lance/src/io/exec/aggregate_index.rs):
- AggregateIndexSearchExec emits one partial-state RecordBatch whose
  schema is the concatenation of state_fields() for each paired
  AggregateFunctionExpr, so a downstream AggregateExec(Final) consumes
  it unchanged.
- One optional child input — a ScalarIndexExec — supplies a prefilter
  RowAddrMask. The prefilter load and per-aggregate index loads run in
  parallel.
- Intersects fragment bitmaps across indexed aggregates, materializes
  the allow list as concrete [0..physical_rows) ranges (avoids the
  RoaringBitmap::full() inflation in RowAddrTreeMap::Sub), then composes
  prefilter ∩ fragments_allow − deletion_mask into a single AllowList.
- Calls calculate_aggregate per indexed aggregate; falls back to
  counting the combined mask directly when an aggregate is a
  non-distinct COUNT without an associated index.
- Unit tests cover try_new validation, the Full+Partial count helper,
  and end-to-end execution with no prefilter, an AllowList prefilter, a
  BlockList prefilter, and deletions.

Also includes aggregate-pushdown-research.md surveying how mature query
engines structure aggregate pushdown.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds AggregateIndexPushdown — a PhysicalOptimizerRule that walks the plan
top-down and rewrites COUNT-shaped aggregates into AggregateIndexSearchExec
so they're answered from index metadata + the deletion mask + an optional
scalar-index prefilter, without scanning column data.

Recognized shape:

    AggregateExec(Single, aggs=[COUNT(*)], group_by=[])
      └── FilteredReadExec { no refine_filter, full_filter only when
                              index_input is present, no scan range, no
                              with_deleted_rows, no fragment subset }

Rewritten to:

    AggregateExec(Final, aggs=[COUNT(*)], group_by=[])
      └── AggregateIndexSearchExec { prefilter_input = index_input }

The outer AggregateExec is dropped to AggregateMode::Final because
AggregateIndexSearchExec emits one row of partial state.

is_count_star is intentionally conservative: function name == "count",
not distinct, single non-null Literal argument. Anything else (COUNT(col)
with a column ref, DISTINCT, FILTER (WHERE), GROUP BY, residual filter,
scan range, with_deleted_rows, fragment subset) leaves the existing scan
path untouched.

Registered first in get_physical_optimizer so generic rules don't see
the rewritten subtree.

Tests (4, driving the rule end-to-end through Scanner::create_plan):
- rule_fires_on_unfiltered_count_star
- rule_fires_when_filter_fully_indexed (BTree filter pushdown)
- rule_skips_when_filter_needs_refine (unindexed column residual)
- rule_skips_count_with_group_by

Existing count_rows tests (3) and aggregate_index exec tests (7) all
continue to pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CI fixes:
- Add SPDX license headers to expression.rs and scalar/expression.rs.
- cargo fmt the rule file (3 spots).
- Update test_count_star_single_fragment and test_scanner_count_rows
  in dataset_aggregate.rs to expect the new AggregateExec(Final) →
  AggregateIndexSearchExec shape now that the rule fires by default.

Correctness fixes (both pointed out by automated review on lance-format#6831):

- Stable row IDs: DatasetPreFilter::create_deletion_mask returns an
  AllowList in stable-id space when the dataset uses stable row IDs,
  but AggregateIndexSearchExec builds its fragments-allow list in
  row-address space. ANDing across mismatched id spaces undercounts
  silently. Gate the rule on !manifest.uses_stable_row_ids() until
  the exec can reconcile the two id spaces.

- Partial index coverage: when an index is built and then a fragment
  is appended, the index's fragment bitmap no longer covers the whole
  dataset. The original rule fired anyway and silently dropped rows
  in the unindexed fragments. The proper fix needs an async coverage
  check that's not expressible in a sync PhysicalOptimizerRule; until
  we plumb that through, narrow the rule to only fire when there is
  no filter at all (no full_filter, no refine_filter, no index_input).
  Unfiltered counts remain correct and still benefit from the rewrite.

Both narrowings are documented in the module-level doc and the
inline `try_rewrite` comments so a follow-up can lift them once the
underlying machinery is in place.

Repo hygiene:
- Drop aggregate-pushdown-research.md from the repo root. It was a
  one-off survey not referenced by any code or doc.

New regression tests in aggregate_index_pushdown.rs:
- rule_skips_with_stable_row_ids — toggles enable_stable_row_ids +
  delete, asserts the rule does not fire and the count is correct.
- rule_skips_partial_index_coverage — builds index over 4 fragments,
  appends a 5th, runs COUNT(*) WHERE indexed_col < N, asserts the
  rule does not fire and the count includes the appended fragment.
- rule_skips_when_filter_present_even_if_indexed — replaces the old
  rule_fires_when_filter_fully_indexed; documents that the indexed-
  filter case is deferred.

All 13 aggregate_index* tests pass; cargo check --workspace and
cargo clippy -p lance -p lance-index --tests --benches -- -D warnings
are clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The foundation commit added `lance-arrow-scalar` as a workspace
dependency of `lance-index`. `python/Cargo.lock` was not regenerated,
so CI's `--locked` build failed:

    error: cannot update the lock file
    /home/runner/work/lance/lance/python/Cargo.lock because --locked
    was passed to prevent this

Run `cargo update` from the python crate dir to add the missing entry.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four tests against ``count_rows(filter=...)`` over a 4-fragment dataset
with a BTREE index on the filter column:

- test_filtered_count_with_scalar_index  — happy path: run twice, second
  call must do zero column-data I/O (no ``LanceRead`` in the plan).
- test_filtered_count_with_deleted_rows  — some matching rows deleted.
- test_filtered_count_with_updated_rows  — updates flip rows in/out of
  the matched set.
- test_filtered_count_with_whole_fragment_deleted  — every row in one
  indexed fragment is deleted.

Each test asserts the count is correct AND that the optimizer routes
through ``AggregateIndexSearchExec`` (the rule fired). All four fail
today on the rule-firing assertion: the rule was narrowed in the prior
PR to only fire when the filter is absent. Lifting that gate requires
a synchronous index-coverage check so the rule can safely refuse on
partial coverage (the bug the foundation-PR review flagged). The tests
are committed first as a spec; the rule lift is the next step.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Lifts the v1 "no filter" gate by emitting a UnionExec split plan when
the scalar index has partial coverage of the dataset:

    AggregateExec(Final, [count(*)])
      └── CoalescePartitionsExec
            └── UnionExec
                  ├── AggregateIndexSearchExec(restrict_to_fragments=indexed)
                  └── AggregateExec(Partial)
                        └── FilteredReadExec(fragments=unindexed, full_filter=…)

The two partial-state streams sum to the correct count. When the index
covers every dataset fragment (or there's no filter at all), the rule
keeps the original single-pushdown shape — no union.

The decision needs the index's fragment_bitmap synchronously inside a
PhysicalOptimizerRule. To make that possible, the bitmap is now plumbed
onto ScalarIndexSearch as Option<RoaringBitmap>:

  - New field on ScalarIndexSearch, defaults to None (and is excluded
    from PartialEq — it's dataset state, not query identity).
  - IndexInformationProvider grows fragment_bitmap(col, name) with a
    default-None impl, so existing providers are unaffected.
  - ScalarIndexInfo on the Dataset side stashes per-(column, index_name)
    bitmaps; entries are dropped when any contributing segment is
    missing a bitmap, so the rule treats coverage as unknown rather
    than partial.
  - apply_scalar_indices walks the produced ScalarIndexExpr after
    parsing and injects bitmaps from the provider.
  - ScalarIndexExec exposes its expr() so the rule can walk it.

AggregateIndexSearchExec gains try_new_restricted(... restrict_to_fragments)
so the pushdown branch of a split plan can be scoped to indexed
fragments only.

Optimizer rule (aggregate_index_pushdown.rs):
  - No prefilter → single pushdown branch (every fragment).
  - Prefilter with index ⊇ dataset → single pushdown branch.
  - Prefilter with index ⊊ dataset → split UnionExec + Coalesce + Final.
  - Coverage unknown (any leaf bitmap missing) → refuse to fire.
  - Stable-row-ids, with_deleted_rows, scan ranges, fragment subsets,
    refine_filter, full_filter without index_input → all still refuse.

Tests:
  - All 13 existing aggregate_index*/aggregate_index_pushdown tests pass.
  - rule_emits_split_plan_for_partial_index_coverage exercises the new
    split path: index built over 4 frags, 5th appended, COUNT(*) WHERE
    indexed_col < N — asserts AggregateIndexSearch AND UnionExec are
    both in the plan and the count is correct.
  - test_aggregate_pushdown.py (4 e2e tests in Python, all green):
      * test_filtered_count_with_scalar_index — happy path, twice, no
        column-data I/O on the second call.
      * test_filtered_count_with_deleted_rows
      * test_filtered_count_with_updated_rows — exercises split plan
        (updates push rows into unindexed fragments).
      * test_filtered_count_with_whole_fragment_deleted — index
        superset of dataset is safe; single pushdown branch.

To make Python e2e plan inspection possible, added
Scanner::analyze_count_plan in Rust (and LanceScanner.analyze_count_plan
in Python) — auto-applies count_star to a cloned scanner then runs
analyze_plan, so callers without a hand-built Substrait aggregate can
see the plan count_rows() actually executes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Switch the happy-path test from parsing ``bytes_read=N``/``iops=N`` out
of the plan text to ``dataset.io_stats_incremental()`` so the assertion
reflects every object-store read the dataset actually performed during
the second ``count_rows()`` call, not just what the plan happens to
surface in its formatted metrics.

Drops the now-unused ``_io_bytes_read``/``_io_iops`` helpers and the
``re`` import.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two new tests in dataset_aggregate.rs pinning the rewritten plan
displays for the indexed-filter case and the partial-coverage split:

- test_scanner_count_rows_with_indexed_filter — 2 fragments, BTree on
  ``x``, ``COUNT(*) WHERE x < 50``. Asserts the plan is
  ``AggregateExec(Final) → AggregateIndexSearch → ScalarIndexQuery``
  with no LanceRead, and that the executed count is 50.

- test_scanner_count_rows_with_partial_index_coverage — 2 indexed
  fragments + 1 appended (unindexed). ``COUNT(*) WHERE x < 1000``
  matches every row. Asserts the plan is
  ``AggregateExec(Final) → CoalescePartitions → Union(AggregateIndexSearch+ScalarIndexQuery,
  AggregateExec(Partial)+LanceRead{num_fragments=1})``, and that the
  executed count is 150.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Add struct-level doc on ScalarIndexInfo explaining the planning-time
  snapshot intent.
- Document the indexed_columns field: keyed by full dotted column path,
  the same string used in filter refs; explains why this map is keyed
  only by column (parser dispatch is via MultiQueryParser, so the
  specific index is an output of parsing, not an input) and contrasts
  with fragment_bitmaps which is keyed by (column, index_name).
- Replace "COUNT-shaped aggregates" wording in places where the rule
  itself is described — optimizer registration comment, the rule's
  module doc, the AggregateIndexPushdown struct doc, and the v1 gate
  comment in try_rewrite. The rule, exec, and trait are designed to
  serve other aggregates (MIN/MAX from zone maps, exact COUNT(DISTINCT)
  from bitmap dictionaries, …) once the relevant
  ScalarIndex::calculate_aggregate impls land; v1 just only recognizes
  COUNT(<literal>). Each call-out makes the v1 restriction explicit so
  future readers know what to broaden.

Concrete COUNT-specific comments left as-is: is_count_star and its
"COUNT(NULL) would return 0" note, CountQuery's per-variant docs, the
exec's "index_name=None means non-distinct COUNT" path, and the Python
tests' COUNT(*) examples (those reflect what count_rows actually does).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous comment justified the gate by appealing to "the Lance
scanner emits AggregateMode::Single"; that's the wrong framing — the
gate is a property of the optimizer rule itself, not of any particular
producer. Restate it that way: Single is the only mode this rule
handles today, Partial could be added if a use case arises, and Final
will never be accelerated (the rule's output IS a Final node).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three gates in AggregateIndexPushdown::try_rewrite refuse to fire on
shapes that should be rare or impossible in practice:

- stable row ids,
- FilteredReadExec built with with_deleted_rows,
- FilteredReadExec scoped to an explicit fragment subset.

Hitting any of those silently means we left an optimization on the
table that nobody asked us to give up. Log a warning at each site
instead, so a careful operator can spot lost pushdowns. The other
gates (GROUP BY, refine_filter, scan range, non-count aggregate) are
expected to fire in normal workloads, so those stay silent.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`null_filters` sounded like "filters on null values"; it's actually the
required-shape Vec<Option<Arc<dyn PhysicalExpr>>> argument to
AggregateExec::try_new representing each aggregate's per-aggregate
FILTER (WHERE …) clause. We've already rejected any aggregate carrying
a filter at the gate, so every slot is None. Rename to `filters` and
explain in a comment.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
AggregateIndexSearchExec::load_prefilter reads only the row-address
mask out of the prefilter batch; it ignores the Exact/AtMost/AtLeast
discriminant. If the scalar index returns AtMost (e.g. a zonemap,
which marks needs_recheck=true), the exec treats the over-set as the
exact answer and silently overcounts. The symmetric case (AtLeast)
would silently undercount.

Gate the optimizer rule on `ScalarIndexExpr::needs_recheck()` — read
synchronously off the same prefilter expression we were already
walking for fragment_bitmap coverage. The scan path with its existing
recheck still answers correctly.

Adds rule_skips_when_index_is_inexact, which creates a zonemap on
`ordered`, runs `COUNT(*) WHERE ordered < 25`, and asserts the rule
does not fire and the count is correct.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…al mode

DataFusion's SQL planner produces a different aggregate shape than the
Lance scanner — `AggregateExec(Final) → CoalescePartitions →
AggregateExec(Partial) → ...` instead of a single `AggregateExec(Single)`,
with `RepartitionExec` + empty `ProjectionExec` wrappers inserted above
the `FilteredReadExec`. To make the rule actually exercise against
DataFusion's plans:

- Extend `try_rewrite` to accept `AggregateMode::Partial` in addition
  to `Single`. In Partial mode we emit the rewritten subtree directly
  (no outer Final wrap, no Coalesce); the caller's existing Final
  consumes our partial-state output unchanged. Single still gets the
  Final wrap as before.
- Add `strip_row_preserving_wrappers` that walks through
  `RepartitionExec`, `CoalesceBatchesExec`, `CoalescePartitionsExec`,
  and identity/empty `ProjectionExec` to reach the underlying
  `FilteredReadExec`. None of those layers change the row population
  the aggregate sees, so it's safe to look past them.
- Re-export `aggregate_index_pushdown` from `lance::io::exec` so tests
  outside the crate can refer to `AggregateIndexPushdown`.

New integration tests in `tests/aggregate_pushdown/mod.rs`, hooked into
the existing `integration_tests` umbrella binary so we don't grow a
new test binary:

- `sql_count_star_with_indexed_filter` — `SELECT COUNT(*) FROM t WHERE
  x < 25` against a BTree-indexed dataset. Builds a SessionContext
  with `AggregateIndexPushdown` registered, registers a
  `LanceTableProvider`, runs SQL, asserts `AggregateIndexSearchExec` is
  in the plan and the count is correct.
- `sql_unfiltered_count_star_uses_statistics` — pins DataFusion's
  static-statistics optimization for the unfiltered case (the rule
  does NOT fire; the count comes from the table provider's row-count
  stat).
- `sql_count_distinct_does_not_fire_yet` — scaffold for the future
  `COUNT(DISTINCT)` path. Today `is_count_star` rejects distinct, so
  the rule must not fire; once a bitmap-style `calculate_aggregate`
  lands this assertion will need to flip.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Strips the speculative `AggregateQueryParser` / `AnyAggregateQuery` /
`ScalarIndex::calculate_aggregate` trait scaffolding that was added in
anticipation of the other three aggregate-acceleration categories
(mask-to-answer, zone-aware, dimension-keyed). The rule and exec node
are renamed to reflect that they only handle category 1 today:

* `AggregateIndexSearchExec` -> `CountFromMaskExec`
* `AggregateIndexPushdown`   -> `CountPushdown`
* `expression::aggregate`    -> removed (was unused after the strip)
* `lance-arrow-scalar`       -> dropped from `lance-index` deps

Files, tests, and python e2e were renamed in lockstep; comments referencing
the trait surface were rewritten to point at the four-category plan
without claiming any of the other three are implemented.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`Scanner.analyze_count_plan()` is a one-flag variant of `analyze_plan()`
— two separate methods don't earn their keep. Replaced with
`analyze_plan(count_rows=False)`; passing `True` auto-applies a
`COUNT(*)` aggregate so callers can inspect the plan `count_rows()`
actually executes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The earlier WIP relocated the scalar-expression parser to a
top-level `expression::scalar` namespace in anticipation of an
`expression::aggregate` sibling. With the aggregate work scoped down
to a single category and moved out of `lance-index`, the move no
longer earns its keep — restore the original
`lance_index::scalar::expression` path. No call sites change (they
were never updated to the new path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

@github-actions github-actions Bot added enhancement New feature or request python labels May 22, 2026
Drop unrelated edits picked up earlier: a blank-line insertion in
bloomfilter.rs and ngram.rs, and an import-order reshuffle in
label_list.rs. None of these were tied to the count-pushdown work.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Collaborator

@Xuanwo Xuanwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! The only thing left is the conflict.s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants