row-spine: per-column arrangement compression (alpha, default off)#37111
row-spine: per-column arrangement compression (alpha, default off)#37111frankmcsherry wants to merge 10 commits into
Conversation
…37113) ## What `ArrangementFlavor::flat_map` and `flat_map_ok` constructed a fresh `RowArena::new()` *inside* the per-row `logic` closure, so an arena was allocated and dropped on every row processed. This hoists the arena to the enclosing scope and `clear()`s it once per row instead, reusing its allocation spine across invocations. ## Why Addresses post-merge review feedback from @antiguru on #37110 (the `ExtendDatums` refactor): the arena is the caller-provided decode target a compressed row representation decodes into. With per-column arrangement codecs (follow-up #37111) each row actually populates the arena, so reallocating it per row would churn; clearing-and-reusing keeps the spine. Note that `RowArena::clear()` today drops the inner per-value `Vec<u8>` buffers (it clears the outer `Vec<Vec<u8>>`), so this reuses the outer spine but not the individual buffers — recycling those is a separate, larger `RowArena` change. ## Tests No behavior change; existing `compute` tests cover these paths. Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
## What Reworks `RowArena` from "one `Vec<u8>` allocation per `push_bytes`, all freed on `clear`" into a bump allocator over a stack of byte regions: - `push_bytes` **copies** bytes into the active (last) region. When that region lacks spare capacity it allocates a **new, larger** region (doubling) rather than growing the current one — so a region that already holds data is never reallocated and references returned earlier stay valid. The outer `Vec` may reallocate as regions are added, but that only moves the `Vec<u8>` headers, not the heap buffers they own. - `push_bytes` now accepts any `B: Deref<Target = [u8]>` (e.g. `Vec<u8>`, `&[u8]`). Existing `Vec<u8>` callers are unchanged; borrowed sources no longer need a throwaway allocation just to hand bytes over. - `clear` retains only the **single largest region** (emptied) and drops the rest, right-sizing the arena. An arena reused across `clear` cycles — e.g. decoding arrangement rows one at a time — becomes allocation-free in steady state. - `reserve` now reserves *bytes* in the active region (was: slots in the outer vector); `with_capacity` sizes the initial region. Neither has load-bearing callers (the `reserve` users in `expr` pass size hints). ## Why Follows up the `ExtendDatums` arena work (#37110) and the per-row arena hoist (#37113). Once the arena is the decode target for compressed arrangement rows (per-column codecs, #37111), a fresh allocation per value per row is the dominant cost; a reused doubling region removes it. The pattern mirrors columnation's region allocator. The trade-off is the standard bump-allocator one: a single very large row leaves the arena holding a large region until the next `clear`. For the per-row-cleared decode/eval paths the high-water mark is just the largest single row, so it stays bounded. ## Tests Adds `mz_ore::test`s in `repr::row` covering: - references from `push_bytes` remaining valid after later pushes force new regions, - `push_unary_row` reading a row back from a non-zero region offset (confirms decoding is position/alignment independent), - reuse across `clear`. No behavior change for callers; no release note. --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Addresses review feedback on MaterializeInc#37111: scope `enable_arrangement_column_compression_alpha` to the replica (`ParameterScope::Replica`) so it can carry per-replica / per-size-family overrides, like the other replica-local physical flags (lgalloc, persist pager, column-paged batcher). This also replaces the manual capture chain the flag previously borrowed from dictionary compression — InstanceConfig/ReplicaConfig fields, the controller's per-replica capture, and the `handle_create_instance` store — with the replica-scoped idiom: the per-replica-resolved value arrives in the replica's `worker_config`, and `apply_worker_config` mirrors it into the process-global `mz_row_spine::COLUMN_COMPRESSION` flag (next to the lgalloc-region store). The controller-level capture read only the environment value and would have missed per-replica overrides. Re-applying on each config tick is safe: every batch records its own codec, so a flip only affects batches sealed afterwards. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Addresses review feedback on MaterializeInc#37111: scope `enable_arrangement_column_compression_alpha` to the replica (`ParameterScope::Replica`) so it can carry per-replica / per-size-family overrides, like the other replica-local physical flags (lgalloc, persist pager, column-paged batcher). This also replaces the manual capture chain the flag previously borrowed from dictionary compression — InstanceConfig/ReplicaConfig fields, the controller's per-replica capture, and the `handle_create_instance` store — with the replica-scoped idiom: the per-replica-resolved value arrives in the replica's `worker_config`, and `apply_worker_config` mirrors it into the process-global `mz_row_spine::COLUMN_COMPRESSION` flag (next to the lgalloc-region store). The controller-level capture read only the environment value and would have missed per-replica overrides. Re-applying on each config tick is safe: every batch records its own codec, so a flip only affects batches sealed afterwards. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
6d74d99 to
5ea5db5
Compare
|
Edit: I @antiguru actually requested the review, but Claude didn't know, and got confused. Review — row-spine per-column arrangement compression
TL;DRThe feature code ( ❌ Blocker 1 — 134,008 lines of unrelated junkThe final commit
None are referenced by the feature. The real diff is just: plus the two Python flag-registration files ( Action: rebuild the last commit so it carries only the flag-scoping change ( Correctness2. Variable / mixed arity is silent corruption in release (robustness gap).
3. 4. FoR / 5. Dictionary codes are insertion-ordered, not sorted. Performance (flag is off, so non-blocking — but this is whether it can ship)6. The comparison path fully decodes both operands. 7. Per-row allocations on the read/scan path. 8. 9. Merge re-encodes everything (inherent, fine). Test coverage gaps
Minor nits
Suggested benchmarks
Net: solid, well-isolated feature behind an off-by-default flag. Strip the junk commit (blocker), add the arity guard, and fill the Huffman-selection + merge-path test gaps; treat the comparison-path decode cost as the key thing to measure before promoting past alpha. Generated by Claude Code |
First piece of the entropy-coded row work. A self-contained, tested canonical Huffman coder over the byte alphabet, seeded from batch-wide frequencies. Not yet wired into the containers (allowed unused for now); integration into the codec slot follows. - `HuffmanCode` is described entirely by its 256 per-byte code lengths; canonical codes and decode tables derive from the lengths, so a stored model costs only the lengths. Built from frequencies via standard Huffman; declines (returns `None`) when <2 symbols appear or the optimal code would exceed a 24-bit bound (caller then leaves the data uncompressed). - Decoding is forward-streaming (read bits MSB-first, emit one symbol at a time with O(1) state) so that comparisons can decode operands without materializing them — the property that lets `ReadItem::Ord` avoid a per-item allocation. - `FrequencyCounter` accumulates the model; `BitWriter`/`BitReader` do MSB-first bit packing. Tests round-trip empty, single/two-symbol, skewed, uniform (verifying exactly 8-bit codes, i.e. no expansion), and pseudo-random inputs, and check that a code rebuilt from its lengths matches the original. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Foundation for per-column compression of arrangements: each column is compressed independently with a codec chosen from batch-wide statistics, instead of one scheme for the whole row. Standalone + tested; container wiring follows. - `ColumnCodec` (enum, one variant per scheme): `Raw`, `Constant` (column is one value batch-wide -> zero bytes/row), `Huffman`. Codecs are self-delimiting in the encoded stream, so they compose: `RowCodec` decodes a row by walking the column codecs in order, each consuming exactly its bytes — no central length-prefix block. New schemes (FoR = subtract-min then bit-pack, dictionary) slot in as a variant plus a size estimate. - `RowStats` selects per column the smallest-estimated codec (including `Raw`), and `build()` returns `None` when every column is best left `Raw`. This structurally avoids the overhead trap the experiments exposed (uniform per-column Huffman and dictionary both blew up small many-column arrangements): cheap columns stay `Raw`/`Constant`, expensive tables are installed only where they pay. Built on the ExtendDatums/arena iterator cleanup; reuses the canonical Huffman coder. Tests cover mixed/constant/text-heavy round-trips and that an all-raw batch yields no codec. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Wires the per-column codec framework end-to-end behind a default-off COLUMN_COMPRESSION flag, reusing the decode-on-read pattern: a container holds an optional RowCodec, records are stored RowCodec-encoded, and DatumSeq carries the compressed bytes + codec ref and decodes on demand. - DatumContainer gains `column_codec: Option<RowCodec>` + a staging buffer, threaded through with_capacity/merge_capacity/clear/heap_size; merge drops the codec (rebuilt at next seal). push_into encodes the raw row columns per-column (implies no dictionary codec); index defers decode. - DatumSeq gains `column_codec: Option<&RowCodec>`. to_row/into_owned/clone_onto decode the record to raw row bytes; cmp/eq decode each operand into a thread-local scratch (two, non-aliasing); extend_datums decodes into the arena; next/bytes_iter keep entropy.is_none()-style tripwires. - seal() builds + installs a RowCodec per key/value container via RowStats over the raw columns (all four builders); column compression layers over raw, so it is skipped when a dictionary codec is present. Flag off => no codec built => unchanged behavior. New container round-trip test covers encode/decode, compare/seek, and extend_datums for codec-encoded items. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…lection Fixes the per-column overhead the catalog benchmark exposed: `ColumnCodec` was an enum sized to its largest variant (`Huffman`'s ~1.3 KB inline tables), so every column — even `Raw`/`Constant` — cost ~1.3 KB in the per-column `Vec`, blowing up small many-column arrangements (e.g. a 7-row catalog index 750 B -> ~9 KB). - `Huffman(Box<HuffmanCode>)`: the enum is now pointer-sized, so `Raw`/`Constant` columns cost only a discriminant + pointer; the table is heap-allocated only for columns that actually use Huffman. - The selector now charges the model's storage (its inline tables) against the Huffman estimate, so Huffman is adopted only when per-row savings outweigh the one-time table — keeping it off small columns where the table would dominate. `heap_size` accounts the boxed table. Test now uses enough rows that Huffman is selected (exercising its decode path); small batches correctly fall back to Raw. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Fills out the per-column codec menu so the selector can fit numeric and low-cardinality columns, not just text. - `For` (frame-of-reference): for fixed-width (<= 8 byte) columns, interpret the bytes as a big-endian integer and store `value - min` in the fewest bytes that hold `max - min`. It is pure arithmetic on the byte pattern (no datum-type knowledge), so it always reconstructs exactly; it simply isn't *chosen* when the byte order leaves residuals large, so it's safe to offer everywhere. - `Dictionary` (per-column): map each distinct value to a fixed-width code into a value table, for columns up to 2^16 distinct values. `ColumnStats` now also tracks fixed length, min/max integer, and distinct values (bounded); `choose` estimates FoR and dictionary sizes (table storage charged) alongside Huffman/Constant/Raw and picks the smallest. New round-trip tests cover FoR-numeric and dictionary-low-cardinality selection. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…_compression_alpha Add an `enable_arrangement_column_compression_alpha` dyncfg (default off) that controls per-column arrangement compression, mirroring the existing `enable_arrangement_dictionary_compression_alpha` wiring end to end: captured once at replica creation, threaded through InstanceConfig/ReplicaConfig, and applied as a one-shot store into the process-global COLUMN_COMPRESSION flag at instance creation. The two flags are independent and the codecs are mutually exclusive at runtime. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… lists bin/lint-test-flags requires every system flag to be known to parallel-workload's FlipFlagsAction and to the mzcompose system-parameter lists. Mirror the dictionary-compression flag's entries for the new per-column compression flag. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Addresses review feedback on MaterializeInc#37111: scope `enable_arrangement_column_compression_alpha` to the replica (`ParameterScope::Replica`) so it can carry per-replica / per-size-family overrides, like the other replica-local physical flags (lgalloc, persist pager, column-paged batcher). This also replaces the manual capture chain the flag previously borrowed from dictionary compression — InstanceConfig/ReplicaConfig fields, the controller's per-replica capture, and the `handle_create_instance` store — with the replica-scoped idiom: the per-replica-resolved value arrives in the replica's `worker_config`, and `apply_worker_config` mirrors it into the process-global `mz_row_spine::COLUMN_COMPRESSION` flag (next to the lgalloc-region store). The controller-level capture read only the environment value and would have missed per-replica overrides. Re-applying on each config tick is safe: every batch records its own codec, so a flip only affects batches sealed afterwards. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
5ea5db5 to
c360dd8
Compare
Review follow-ups on MaterializeInc#37111: - Correctness: `RowCodec::decode_into` walks a fixed column count, so a row whose arity differs from the codec's would read past its record (a panic, or silent corruption in release, caught before only by a debug assert). `RowStats` now tracks the first observed row's arity and `build()` declines on any mismatch. `build_column_codec` no longer skips empty rows — an empty (arity-0) row is a real arity signal, so a mixed-arity container correctly gets no codec. - Docs/comments: fix `HuffmanCode::from_frequencies` doc (a single distinct byte yields a 1-bit code; `None` only for empty input or codes exceeding MAX_BITS); note dictionary codes are insertion-ordered (not order-preserving); explain the intentionally-unread final `best_bytes` write and that CMP_SCRATCH buffers don't shrink. - Tests: Huffman selected end-to-end (high-cardinality skewed column), the merge re-encode path (push an encoded item into another codec'd container), mixed / empty-row arity declines, `code_lengths` MAX_BITS decline, and the `DICT_MAX_CARDINALITY` retain/drop boundary. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| #[allow(dead_code)] | ||
| mod codec; | ||
| #[allow(dead_code)] | ||
| mod huffman; |
There was a problem hiding this comment.
Not actually dead anymore I think?
| if let Some(codec) = self.column_codec { | ||
| // `iter.data` is RowCodec-encoded; decode to raw row bytes. | ||
| let mut buf = Vec::new(); | ||
| codec.decode_into(self.iter.data, &mut buf); | ||
| return unsafe { Row::from_bytes_unchecked(&buf) }; | ||
| } |
There was a problem hiding this comment.
Probably better to decode into an arena here if one is available than allocate per row
There was a problem hiding this comment.
Yup. We don't have that until #37115 lands, I think.
aef6edc to
7a0f6ea
Compare
| let mut best = ColumnCodec::Raw; | ||
| let mut best_bytes = self.raw_bytes; | ||
|
|
||
| if let Some(code) = self.freq.build() { |
There was a problem hiding this comment.
Probably pretty minor in the grand scheme of things but we do the expensive check first. Calling build on the huffman code calls code_lengths which is slightly expensive (BinaryHeap over >= 256 symbols plus allocs). One could optimize this by running the cheap codecs first to lower best_bytes and then gate the huffman build behind a cheap lower bound like let huffman_floor = self.count.saturating_mul(2).saturating_add(size_of::<HuffmanCode>() + 256);
Again, pretty minor and might need some stats/profile to prove if its really a problem.
antiguru
left a comment
There was a problem hiding this comment.
Seems good, but I'd like to see a test/nightly run with the flag enabled. At the moment it seems mostly untested delta the unit tests (thanks for adding!)
I'll approve, don't see a blocker here.
| // NB: per-column compression is replica-scoped and applied in `apply_worker_config` | ||
| // (called just above), not captured on `InstanceConfig`. |
There was a problem hiding this comment.
Nit: Comment reads like a note-to-self.
| // off the end of an empty record at decode time. | ||
| match self.arity { | ||
| None => self.arity = Some(count), | ||
| Some(arity) if arity != count => self.mixed_arity = true, |
There was a problem hiding this comment.
In what situation would we expected mixed arity rows?
| "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" | ||
| ), | ||
| "enable_alter_swap": "true", | ||
| "enable_arrangement_column_compression_alpha": "false", |
There was a problem hiding this comment.
I'd like to see a test/nightly run with compression enabled.
Motivation
Row-spine arrangements currently compress at the whole-row level via the
just-shipped dictionary codec (
enable_arrangement_dictionary_compression_alpha).That works well when whole rows repeat, but leaves a lot on the table for wide
rows where individual columns have exploitable structure (low-cardinality enums,
clustered integers, repeated strings) but the rows as a whole are nearly unique.
This PR adds a per-column compression framework: each column in a row gets
its own codec, chosen independently from a menu, based on batch-wide statistics
collected at seal time.
What's here
A new
mz_row_spine::codecmodule:ColumnCodec— a self-delimiting per-column codec enum:Raw,Constant,Huffman(boxed canonical Huffman, entropy),For(Frame-of-Reference = subtract-min then bitpack to a fixed byte width), and
Dictionary. Because each codec is self-delimiting, they compose freely downa row.
RowCodec— encodes/decodes a whole row by dispatching column-by-column.RowStats/ColumnStats— accumulate per-column statistics over a batchand pick the smallest-estimated codec per column, charging the per-codec table
storage (dictionary maps, Huffman tables) against the estimate so fixed
overhead can't lose us bytes on small or high-cardinality columns.
huffman.rs) with forward-streaming decode.Decode is deferred to the operator that needs datums:
ReadItemcarries thecompressed bytes plus a reference to the codec, and decode happens into a
thread-local scratch for comparisons or into the caller's
RowArenaforextend_datums/to_row.Relationship to dictionary compression
This is purely additive and orthogonal to the existing dictionary codec:
ColumnsCodec/dictionary builder /
row_codecmodule).column_codec, alongside the existingcodec) and a separate flag(
enable_arrangement_column_compression_alpha, default off).build_column_codecreturnsNonewhenever a dictionary codec is installed.So the recently-shipped dictionary feature is untouched and independently
controllable; either flag can be flipped or walked back with no effect on the
other.
Flag wiring
enable_arrangement_column_compression_alphamirrors the dictionary flag end toend: captured once at replica creation, threaded through
InstanceConfig/ReplicaConfig, and applied as a one-shot store into theprocess-global
COLUMN_COMPRESSIONflag at instance creation (deliberately notre-applied on config update, so flipping it never retroactively rewrites a live
replica's arrangements).
Testing
Unit tests in
codec.rs/huffman.rscover round-trip encode/decode for eachcodec and the selector. Manual validation against a TPCH load generator showed
per-column selection reducing arrangement footprint substantially on wide tables
(e.g. ~-56% on
lineitem, ~-43% across a mixed TPCH set) versus uncompressed,competitive with or better than whole-row dictionary while decoding more cheaply.
This release will add an alpha
enable_arrangement_column_compression_alphafeature flag (default off) for per-column arrangement compression.
🤖 Generated with Claude Code