Spark: Spark tests cache rewrite input#16740
Conversation
a7c5d80 to
1ed1cb6
Compare
TestRewriteDataFilesAction materializes a large (SCALE-row) input table via a Spark write before exercising the rewrite under test, and many tests reuse the same input shape. Cache the written input data files keyed by table shape (formatVersion, spec, files, rows, partitions, properties) and reuse them by re-appending the cached DataFiles to a fresh table, so the Spark write runs once per JVM fork instead of once per test. The rewrite under test still runs per test on its own table, and the generated data is deterministic so reuse is byte-identical. Applied to Spark 3.5, 4.0 and 4.1.
Replace inline string concatenation with String.format when building the input-file cache keys in createTable and createTablePartitioned, for readability. The produced key strings are unchanged.
Replace new ConcurrentHashMap<>() with Maps.newConcurrentMap() to satisfy the Iceberg checkstyle rule that forbids direct ConcurrentHashMap construction.
Replace the computeIfAbsent build with explicit per-key locking and double-checked caching so concurrent callers requesting the same table shape block on the first build and reuse its result instead of materializing identical input twice. The heavy Spark write now happens outside any map lock, so distinct shapes can still build in parallel.
Encode formatVersion and PartitionSpec explicitly in the partitioned cache key so distinct table shapes can never collide on the same key through implicit option dependencies, mirroring the unpartitioned key. Collect cached input files into an immutable list so callers cannot mutate the shared, per-fork cached instance.
f489961 to
10510d8
Compare
|
Small update on the times: Core-job wall time, before = baseline main run, after = mean of 3 green
The per-run overall core average is stable across the three runs (39.4 / 39.3 / |
laskoviymishka
left a comment
There was a problem hiding this comment.
Nice, this is right improvement. SCALE write dominates these tests, and caching the golden files per table shape is exactly where the ~8% win should come from. Re-appending the same physical files is spec-legal too, since sequence numbers and firstRowId are assigned on each append.
I’d hold before merge, mostly around the new static cache.
Main issue: cachedInputFiles passes planFiles() into Streams.stream, so the CloseableIterable is never closed. Each cache miss leaks manifest readers. On a long-lived JVM that’s FD pressure, and on Windows it can leave manifest files locked. A try-with-resources around the scan should fix it.
The other one is the unpartitioned cache key. The partitioned path includes table properties, but the unpartitioned key drops them, even though createTable() sets row-group size and that shapes the cached files. Dormant today, but easy future collision.
Before merge I’d fix:
- close the
planFiles()scan - include table properties in the unpartitioned key
- make the static cache safe with static
@TempDirlifecycle: clear in@AfterAll, or key by cache dir - add a note/assertion on weaker orphan-scan coverage, since cached inputs live outside
table.location()
DCL-vs-PER_METHOD and virtual dispatch are smaller; fine to fold in here.
Once those are addressed, happy to do one more pass.
| // includeColumnStats() is required: a plain scan drops lower/upper bounds and | ||
| // value counts, and re-appending stat-less files breaks tests that read bounds. | ||
| List<DataFile> built = | ||
| Streams.stream(golden.newScan().includeColumnStats().planFiles()) |
There was a problem hiding this comment.
planFiles() returns a CloseableIterable, and handing it to Streams.stream(Iterable) means it never gets closed — so every cache miss leaks the manifest readers it holds open. One leak per distinct key per fork; on a long-lived JVM that's FD pressure, on Windows it leaves the manifest files locked, and @TempDir cleanup can warn at teardown.
I'd wrap the scan in try-with-resources and collect inside the try:
List<DataFile> built;
try (CloseableIterable<FileScanTask> tasks = golden.newScan().includeColumnStats().planFiles()) {
built =
Streams.stream(tasks)
.map(FileScanTask::file)
.map(DataFile::copy)
.collect(ImmutableList.toImmutableList());
}
INPUT_FILE_CACHE.put(key, built);
return built;(The pre-existing planFiles() calls elsewhere in the file have the same shape but are out of scope here.) Same in v4.0 and v4.1.
There was a problem hiding this comment.
Okay! Wrapped the scan in try-with-resources and collect inside, mapping IOException to UncheckedIOException. No more reader leak per cache miss.
| * @return the created table | ||
| */ | ||
| protected Table createTable(int files) { | ||
| String key = String.format("unpartitioned|fv=%d|files=%d|rows=%d", formatVersion, files, SCALE); |
There was a problem hiding this comment.
The unpartitioned key drops the table-properties dimension that the partitioned path includes. The partitioned key folds in opts via new TreeMap<>(options), but here the key is just unpartitioned|fv|files|rows while createTable() sets PARQUET_ROW_GROUP_SIZE_BYTES=20*1024, which shapes split_offsets on every cached DataFile.
It's dormant today because the value is hardcoded, but it's a silent shape collision waiting to happen: if any subclass or future test overrides createTable()'s properties, the cache serves files with mismatched split offsets, and tests like testBinPackSplitLargeFile that lean on accurate offsets fail in a way that's very hard to trace back to here.
I'd mirror the partitioned path and fold the relevant properties into the unpartitioned key. wdyt? Same in v4.0 and v4.1.
There was a problem hiding this comment.
I think this is very speculative about future commits, but fine. I pulled the row-group size into a constant (INPUT_PARQUET_ROW_GROUP_SIZE_BYTES) and folded it into the key (rowGroup=%d), mirroring the partitioned path. A future createTable() property override now changes the key instead of silently colliding.
| // fork and reused by every test that asks for the same shape. The Spark write of SCALE | ||
| // rows dominates these tests; the rewrite under test still runs per test on a fresh table. | ||
| @TempDir private static Path inputCacheDir; | ||
| private static final Map<String, List<DataFile>> INPUT_FILE_CACHE = Maps.newConcurrentMap(); |
There was a problem hiding this comment.
I think there's a subtle lifecycle issue when the class runs twice in one JVM. INPUT_FILE_CACHE is static and never cleared, but inputCacheDir is a static @TempDir, so JUnit deletes and recreates it on a second run in the same JVM (IDE re-run, forkCount=0, suite aggregation). On that second run the cache still holds DataFiles whose location() points into the first, now-deleted directory, so a cache hit hands back dangling paths and you get a FileNotFoundException deep in the Spark Parquet reader.
Invisible in normal Gradle CI since each fork is a fresh JVM, but it bites IntelliJ re-runs.
Either clear the maps and reset the seq in @AfterAll, or key the cache by inputCacheDir.toAbsolutePath() so stale entries can't match a new directory. Either is fine, but one of them should be in. Same in v4.0 and v4.1.
There was a problem hiding this comment.
Okay, accordingly added an @AfterAll that clears the cache + lock map and resets the seq. @AfterAll runs once after all tests, so within-run cross-test caching is unchanged; it only stops a second in-JVM run (IDE re-run) from returning DataFiles pointing into the recreated @TempDir.
| String savedLocation = this.tableLocation; | ||
| try { | ||
| this.tableLocation = | ||
| inputCacheDir.resolve("input-" + INPUT_CACHE_SEQ.incrementAndGet()).toUri().toString(); |
There was a problem hiding this comment.
Caching the input files outside the per-test table location quietly weakens the orphan-file coverage. shouldHaveNoOrphans(table) runs deleteOrphanFiles, which by default only scans table.location() (the per-test tableDir), but the cached inputs live under this static inputCacheDir, outside that prefix — so the startsWith(location) filter in DeleteOrphanFilesSparkAction never sees them.
The tests still pass, but if a bug ever leaked a cached input path into a post-rewrite snapshot, the no-orphans assertions wouldn't catch it — we've lost diagnostic value, not gained a false failure.
I'd at minimum drop a comment near shouldHaveNoOrphans noting the cached inputs are out of scan scope by design; or, if we want to keep the invariant strong, add a manifest-scan assertion that the fresh table's current snapshot contains no inputCacheDir-prefixed paths. Same in v4.0 and v4.1.
There was a problem hiding this comment.
Since the cache explicitly saves the files in a different folder, and actively copies them over for the individual tests it does not invalidate any of the shouldHaveNoOrphans calls, but to be defensive, sure we can add it.
Therefore, accordingly I a comment at shouldHaveNoOrphans noting cached inputs live under the static inputCacheDir, outside table.location(), so deleteOrphanFiles intentionally doesn't scan them.
- Close planFiles() via try-with-resources in cachedInputFiles so each cache miss no longer leaks the manifest readers the scan holds open. - Encode the Parquet row group size (via a shared constant) in the unpartitioned cache key so a future createTable() property override cannot collide with cached files of a different shape. - Clear the static input-file cache and locks and reset the sequence in @afterall, so a second in-JVM run (IDE re-run, forkCount=0) cannot return DataFiles pointing into a recreated @tempdir. - Document that cached inputs live outside table.location() and are intentionally outside deleteOrphanFiles scan scope.
Thanks for the review @laskoviymishka ! I have addressed all the four blockers! One Nit: I left the DCL-vs- |
laskoviymishka
left a comment
There was a problem hiding this comment.
Nice, thanks! Look good to me.
|
@kevinjqliu Could you take a look, thanks! |
|
Thanks for the improvement @Baunsgaard, the changes looks good to me. I had codex generate something similar if you would like to take a look, https://github.com/apache/iceberg/compare/main...kevinjqliu:iceberg:kevinjqliu/codex-cache-rewrite-data-files-input?expand=1 Again, its mostly for readability and maintainability -- the effect is the same 😄 |
|
Sure, I took a look at your suggestion and it makes sense. Up to you which version you like best. The pr is open to contributor modification so you can edit directly if you wish. Otherwise I can change the commit to yours next week and set you as coauthor. |
|
Thanks for taking a look @Baunsgaard! I can open a PR with the edits. I mainly want this part of the tests to be more maintainable, the coauthor part is not a concern 😄 |
|
opened #16882 let me know what you think! |
What
TestRewriteDataFilesActionis the slowest single class in the Spark core test module. Each test materializes a large (SCALE = 400000-row) input table via a Spark write before exercising the rewrite under test, and many tests reuse the same input shape across theformatVersionmatrix.This caches the written input data files keyed by table shape (
formatVersion, spec,files,rows,partitions, properties) and reuses them by re-appending the cachedDataFiles to a fresh table. The expensive Spark write of the input now runs once per JVM fork instead of once per test; the rewrite under test still runs per test on its own fresh table.Applied identically to Spark 3.5, 4.0 and 4.1.
Why it is safe
Random(42)seed), so reuse isbyte-identical to regenerating it.
@TempDir, so they survive across tests (not wipedby the per-test temp dir) and are cleaned up after the class.
includeColumnStats()is used when collecting the cached files so lower/upperbounds and value counts are preserved on re-append.
Results
CI core job durations (baseline main run vs. this PR's latest green run; core jobs averaged per Spark version across the JVM/Scala matrix):
Test-only change, applied identically across v3.5, v4.0, and v4.1.
Notes
createTable(int)/createTablePartitioned(...)helpers; the fewin-test
writeRecords(..., SCALE, ...)call sites are not yet cached, so there is more potential for this change.