[SPARK-56896][SQL] Add bulk read paths for timestamp/date Parquet vector updaters#55923
[SPARK-56896][SQL] Add bulk read paths for timestamp/date Parquet vector updaters#55923iemejia wants to merge 21 commits into
Conversation
b00fa47 to
895332d
Compare
895332d to
6f5847c
Compare
MaxGekk
left a comment
There was a problem hiding this comment.
1 blocking, 0 non-blocking, 2 nits.
The benchmark additions are correct, but the PR description claims a factory implementation that isn't in the diff.
Design / architecture (1)
- The PR title and description claim that
ParquetVectorUpdaterFactory.javawas changed to replace per-elementreadValueloops with two-pass bulk reads in five updaters (LongAsMicrosUpdater,LongAsNanosUpdater,LongAsMicrosRebaseUpdater,DateToTimestampNTZUpdater,DateToTimestampNTZWithRebaseUpdater), with before/after throughput numbers. The diff contains only changes toParquetVectorUpdaterBenchmark.scala; the factory implementations are unchanged — all four of the cited updaters still use per-element loops. Either the implementation was not committed, or the PR description was generated inaccurately. Please either add the implementation or update the description to reflect that this PR adds benchmark coverage only.
Nits: 2 minor items (see inline comments).
| TimestampNTZType, | ||
| descriptor(PrimitiveTypeName.INT32, LogicalTypeAnnotation.dateType()), | ||
| longVec, intBytes) | ||
| addReadValuesCase(benchmark, "LongAsNanosUpdater (TimeType)", |
There was a problem hiding this comment.
The class-level Scaladoc for Group B (IntegerToLong, IntegerToDouble, FloatToDouble, DateToTimestampNTZ, DowncastLong on lines 47–48) doesn't include LongAsNanosUpdater. Please add it to the Group B listing.
There was a problem hiding this comment.
Done. Added LongAsNanos to the Group B listing.
| longVec, intBytes) | ||
| addReadValuesCase(benchmark, "LongAsNanosUpdater (TimeType)", | ||
| TimeType(), | ||
| descriptor(PrimitiveTypeName.INT64), |
There was a problem hiding this comment.
In production, TimeType columns are written as INT64 with timeType(false, TimeUnit.MICROS) (ParquetSchemaConverter.scala:695–697). The class doc says the benchmark exercises "the full configuration matrix (logical-type annotation, rebase mode, timezone) the production decoder uses". Suggest using the production annotation:
| descriptor(PrimitiveTypeName.INT64), | |
| descriptor(PrimitiveTypeName.INT64, LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)), |
There was a problem hiding this comment.
Good catch, fixed. Now using timeType(false, TimeUnit.MICROS) to match the production annotation in ParquetSchemaConverter.
|
Thanks for catching this @MaxGekk ! You were right -- the implementation was indeed missing. It turns out the factory changes were accidentally dropped during a rebase that conflicted with SPARK-56804 (which landed the I was able to recover the original implementation from the reflog and re-applied it (excluding |
…, DateToTimestampNTZWithRebase updaters
6cf0c31 to
d2a6a31
Compare
|
Apologies for the force-push — I accidentally rebased onto latest master while updating my other branches. No code changes, just a base update. Sorry for the noise. |
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
|
@MaxGekk Updated the benchmark results -- all three JDK runs (17, 21, 25) now ran on AMD EPYC 7763 matching the upstream baseline. Everything should be ready for another round of review or merge if you are happy with the current state. Let me know if there is anything else I should address. |
MaxGekk
left a comment
There was a problem hiding this comment.
3 addressed, 0 remaining, 2 new. (2 newly introduced, 0 late catches.)
The factory implementation is now in place and verified equivalent to the per-element loops; what remains is a scalastyle fix and an optional consolidation follow-up.
Design / architecture (1)
- ParquetVectorUpdaterFactory.java:473: two coexisting bulk-read optimization patterns (SPARK-56804's reader-side method vs this PR's two-pass) — consolidation follow-up suggested, see inline
Nits: 1 minor item (see inline comment).
Verification
Traced all four bulk rewrites against their per-element readValue counterparts: identical per-element conversions, identical overflow/rebase-exception behavior, and exact reader-consumption parity (the bulk reads consume the same total values, so page/skip positions are unaffected). The in-place getLong-after-putLong round-trip is safe because the non-dictionary readBatch path nulls the vector's dictionary first (VectorizedColumnReader.java:290), and readIntegersAsLongs sign-extends, so the (int) cast in DateToTimestampNTZWithRebaseUpdater round-trips exactly. Non-PLAIN encodings stay correct via the mandatory readLongs implementations and the readIntegersAsLongs per-row default.
| VectorizedValuesReader valuesReader) { | ||
| for (int i = 0; i < total; ++i) { | ||
| readValue(offset + i, values, valuesReader); | ||
| valuesReader.readIntegersAsLongs(total, values, offset); |
There was a problem hiding this comment.
Now that this two-pass pattern is in place, DateToTimestampNTZUpdater (optimized in SPARK-56804 via the reader-side readIntegersAsTimestampMicros) is the odd one out — worth a follow-up migrating it to the same two-pass form and deleting readIntegersAsTimestampMicros from VectorizedValuesReader. The two-pass form is the stronger pattern: the bulk reads are mandatory interface methods (or have safe per-row defaults), so every encoding gets the speedup rather than only readers that override, and it adds no per-conversion interface methods. Not asking for it in this PR.
There was a problem hiding this comment.
Good idea -- done in this PR. Migrated DateToTimestampNTZUpdater to the same two-pass pattern (readIntegersAsLongs + in-place daysToMicros) and removed readIntegersAsTimestampMicros from VectorizedValuesReader and VectorizedPlainValuesReader. The two-pass form is indeed the stronger pattern since it uses the mandatory readIntegersAsLongs that every encoding already implements.
| longVec, intBytes) | ||
| addReadValuesCase(benchmark, "LongAsNanosUpdater (TimeType)", | ||
| TimeType(), | ||
| descriptor(PrimitiveTypeName.INT64, LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)), |
There was a problem hiding this comment.
This line is 120 chars, over scalastyle's 100-char limit, so dev/lint-scala will fail — my earlier single-line suggestion was over-long, sorry. Wrapped to match the timestampType cases below:
| descriptor(PrimitiveTypeName.INT64, LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)), | |
| descriptor(PrimitiveTypeName.INT64, | |
| LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)), |
There was a problem hiding this comment.
Fixed, thanks for catching that.
Assisted-by: OpenCode:claude-opus-4.6
…IntegersAsTimestampMicros Consolidate DateToTimestampNTZUpdater (CORRECTED mode, from SPARK-56804) to the same two-pass bulk-read pattern used by the other updaters in this PR: readIntegersAsLongs + in-place daysToMicros conversion. This eliminates the per-conversion interface method readIntegersAsTimestampMicros from VectorizedValuesReader, since the two-pass form uses the mandatory readIntegersAsLongs that every encoding already implements. All encodings now benefit from the bulk-read optimization without needing per-method overrides. Assisted-by: OpenCode:claude-opus-4.6
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
….13, split 1 of 1)
|
+1, LGTM. Merging to master/4.x. |
…tor updaters ### What changes were proposed in this pull request? Replace per-element `readValue` loops with two-pass bulk read + in-place conversion for four `ParquetVectorUpdater` implementations in `ParquetVectorUpdaterFactory`: | Updater | Bulk read | In-place transform | |---|---|---| | `LongAsMicrosUpdater` | `readLongs` | `millisToMicros` | | `LongAsNanosUpdater` | `readLongs` | `microsToNanos` | | `LongAsMicrosRebaseUpdater` | `readLongs` | `millisToMicros` + `rebaseMicros` | | `DateToTimestampNTZWithRebaseUpdater` | `readIntegersAsLongs` | `rebaseDays` + `daysToMicros` | Each updater now: 1. Bulk-reads raw values into the column vector via `readLongs` or `readIntegersAsLongs` (backed by `System.arraycopy`). 2. Applies the conversion in a tight in-place loop over the column vector. This avoids per-element virtual dispatch through `VectorizedValuesReader` in the hot loop. The `getLong`/`putLong` calls on `final OnHeapColumnVector` are devirtualized by C2 into direct array access. Note: `DateToTimestampNTZUpdater` (CORRECTED mode) was already optimized via SPARK-56804. Also adds three missing benchmark cases to `ParquetVectorUpdaterBenchmark`: `LongAsNanosUpdater`, `DateToTimestampNTZWithRebaseUpdater`, `LongAsMicrosRebaseUpdater`. ### Why are the changes needed? The per-element `readValue` loop issues a virtual call to `VectorizedValuesReader.readLong()` / `readInteger()` on every row, which C2 cannot always devirtualize because the reader type varies (PLAIN, RLE, DELTA, etc.). The two-pass approach replaces N virtual calls with a single bulk read (already optimized per reader implementation) followed by a tight scalar loop that C2 can fully inline and optimize. #### Benchmark Results (AMD EPYC 7763, JDK 17/21/25) Results from GHA benchmark workflow runs committed to this branch. Both baseline (upstream master) and this PR ran on AMD EPYC 7763 64-Core Processor. Reference updaters (`IntegerWithRebase`, `LongWithRebase`) are stable within noise, confirming valid comparisons. **LongAsMicrosUpdater** (only updater with a direct baseline comparison): | JDK | Baseline (M/s) | This PR (M/s) | Speedup | |---|---|---|---| | 17 | 454.8 | 942.1 | **2.1x** | | 21 | 420.5 | 1,225.1 | **2.9x** | | 25 | 371.3 | 1,228.4 | **3.3x** | **New updater benchmarks** (no upstream baseline -- these benchmark entries are added by this PR): | Updater | JDK 17 (M/s) | JDK 21 (M/s) | JDK 25 (M/s) | |---|---|---|---| | `LongAsNanosUpdater` | 1,002.5 | 1,138.0 | 1,147.8 | | `DateToTimestampNTZWithRebaseUpdater` | 435.1 | 796.7 | 719.9 | | `LongAsMicrosRebaseUpdater` | 887.7 | 1,096.7 | 1,094.1 | The improvement increases with newer JDKs (2.1x -> 2.9x -> 3.3x), likely because newer JIT compilers optimize the tight in-place loop pattern better. Full committed results: [JDK 17](https://github.com/iemejia/spark/actions/runs/27403977287), [JDK 21](https://github.com/iemejia/spark/actions/runs/27403977646), [JDK 25](https://github.com/iemejia/spark/actions/runs/27405232869) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Existing test suites: `ParquetVectorUpdaterFactorySuite`, `ParquetQuerySuite`, `ParquetIOSuite`, `ParquetRebaseDatetimeSuite`. - Benchmark: `ParquetVectorUpdaterBenchmark` run via GHA on JDK 17/21/25 with results committed to the branch. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: OpenCode with Claude claude-opus-4.6 Closes #55923 from iemejia/SPARK-updater-bulk-paths. Authored-by: Ismaël Mejía <iemejia@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com> (cherry picked from commit 67eafbd) Signed-off-by: Max Gekk <max.gekk@gmail.com>
|
Thanks for the review @MaxGekk ! |
…tor updaters ### What changes were proposed in this pull request? Replace per-element `readValue` loops with two-pass bulk read + in-place conversion for four `ParquetVectorUpdater` implementations in `ParquetVectorUpdaterFactory`: | Updater | Bulk read | In-place transform | |---|---|---| | `LongAsMicrosUpdater` | `readLongs` | `millisToMicros` | | `LongAsNanosUpdater` | `readLongs` | `microsToNanos` | | `LongAsMicrosRebaseUpdater` | `readLongs` | `millisToMicros` + `rebaseMicros` | | `DateToTimestampNTZWithRebaseUpdater` | `readIntegersAsLongs` | `rebaseDays` + `daysToMicros` | Each updater now: 1. Bulk-reads raw values into the column vector via `readLongs` or `readIntegersAsLongs` (backed by `System.arraycopy`). 2. Applies the conversion in a tight in-place loop over the column vector. This avoids per-element virtual dispatch through `VectorizedValuesReader` in the hot loop. The `getLong`/`putLong` calls on `final OnHeapColumnVector` are devirtualized by C2 into direct array access. Note: `DateToTimestampNTZUpdater` (CORRECTED mode) was already optimized via SPARK-56804. Also adds three missing benchmark cases to `ParquetVectorUpdaterBenchmark`: `LongAsNanosUpdater`, `DateToTimestampNTZWithRebaseUpdater`, `LongAsMicrosRebaseUpdater`. ### Why are the changes needed? The per-element `readValue` loop issues a virtual call to `VectorizedValuesReader.readLong()` / `readInteger()` on every row, which C2 cannot always devirtualize because the reader type varies (PLAIN, RLE, DELTA, etc.). The two-pass approach replaces N virtual calls with a single bulk read (already optimized per reader implementation) followed by a tight scalar loop that C2 can fully inline and optimize. #### Benchmark Results (AMD EPYC 7763, JDK 17/21/25) Results from GHA benchmark workflow runs committed to this branch. Both baseline (upstream master) and this PR ran on AMD EPYC 7763 64-Core Processor. Reference updaters (`IntegerWithRebase`, `LongWithRebase`) are stable within noise, confirming valid comparisons. **LongAsMicrosUpdater** (only updater with a direct baseline comparison): | JDK | Baseline (M/s) | This PR (M/s) | Speedup | |---|---|---|---| | 17 | 454.8 | 942.1 | **2.1x** | | 21 | 420.5 | 1,225.1 | **2.9x** | | 25 | 371.3 | 1,228.4 | **3.3x** | **New updater benchmarks** (no upstream baseline -- these benchmark entries are added by this PR): | Updater | JDK 17 (M/s) | JDK 21 (M/s) | JDK 25 (M/s) | |---|---|---|---| | `LongAsNanosUpdater` | 1,002.5 | 1,138.0 | 1,147.8 | | `DateToTimestampNTZWithRebaseUpdater` | 435.1 | 796.7 | 719.9 | | `LongAsMicrosRebaseUpdater` | 887.7 | 1,096.7 | 1,094.1 | The improvement increases with newer JDKs (2.1x -> 2.9x -> 3.3x), likely because newer JIT compilers optimize the tight in-place loop pattern better. Full committed results: [JDK 17](https://github.com/iemejia/spark/actions/runs/27403977287), [JDK 21](https://github.com/iemejia/spark/actions/runs/27403977646), [JDK 25](https://github.com/iemejia/spark/actions/runs/27405232869) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Existing test suites: `ParquetVectorUpdaterFactorySuite`, `ParquetQuerySuite`, `ParquetIOSuite`, `ParquetRebaseDatetimeSuite`. - Benchmark: `ParquetVectorUpdaterBenchmark` run via GHA on JDK 17/21/25 with results committed to the branch. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: OpenCode with Claude claude-opus-4.6 Closes apache#55923 from iemejia/SPARK-updater-bulk-paths. Authored-by: Ismaël Mejía <iemejia@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
What changes were proposed in this pull request?
Replace per-element
readValueloops with two-pass bulk read + in-place conversion for fourParquetVectorUpdaterimplementations inParquetVectorUpdaterFactory:LongAsMicrosUpdaterreadLongsmillisToMicrosLongAsNanosUpdaterreadLongsmicrosToNanosLongAsMicrosRebaseUpdaterreadLongsmillisToMicros+rebaseMicrosDateToTimestampNTZWithRebaseUpdaterreadIntegersAsLongsrebaseDays+daysToMicrosEach updater now:
readLongsorreadIntegersAsLongs(backed bySystem.arraycopy).This avoids per-element virtual dispatch through
VectorizedValuesReaderin the hot loop. ThegetLong/putLongcalls onfinal OnHeapColumnVectorare devirtualized by C2 into direct array access.Note:
DateToTimestampNTZUpdater(CORRECTED mode) was already optimized via SPARK-56804.Also adds three missing benchmark cases to
ParquetVectorUpdaterBenchmark:LongAsNanosUpdater,DateToTimestampNTZWithRebaseUpdater,LongAsMicrosRebaseUpdater.Why are the changes needed?
The per-element
readValueloop issues a virtual call toVectorizedValuesReader.readLong()/readInteger()on every row, which C2 cannot always devirtualize because the reader type varies (PLAIN, RLE, DELTA, etc.). The two-pass approach replaces N virtual calls with a single bulk read (already optimized per reader implementation) followed by a tight scalar loop that C2 can fully inline and optimize.Benchmark Results (AMD EPYC 7763, JDK 17/21/25)
Results from GHA benchmark workflow runs committed to this branch. Both baseline (upstream master) and this PR ran on AMD EPYC 7763 64-Core Processor. Reference updaters (
IntegerWithRebase,LongWithRebase) are stable within noise, confirming valid comparisons.LongAsMicrosUpdater (only updater with a direct baseline comparison):
New updater benchmarks (no upstream baseline -- these benchmark entries are added by this PR):
LongAsNanosUpdaterDateToTimestampNTZWithRebaseUpdaterLongAsMicrosRebaseUpdaterThe improvement increases with newer JDKs (2.1x -> 2.9x -> 3.3x), likely because newer JIT compilers optimize the tight in-place loop pattern better.
Full committed results: JDK 17, JDK 21, JDK 25
Does this PR introduce any user-facing change?
No.
How was this patch tested?
ParquetVectorUpdaterFactorySuite,ParquetQuerySuite,ParquetIOSuite,ParquetRebaseDatetimeSuite.ParquetVectorUpdaterBenchmarkrun via GHA on JDK 17/21/25 with results committed to the branch.Was this patch authored or co-authored using generative AI tooling?
Generated-by: OpenCode with Claude claude-opus-4.6