Skip to content

Spark: Refactor rewrite input cache setup#16882

Closed
kevinjqliu wants to merge 6 commits into
apache:mainfrom
kevinjqliu:kevinjqliu/codex-cache-rewrite-data-files-input
Closed

Spark: Refactor rewrite input cache setup#16882
kevinjqliu wants to merge 6 commits into
apache:mainfrom
kevinjqliu:kevinjqliu/codex-cache-rewrite-data-files-input

Conversation

@kevinjqliu

Copy link
Copy Markdown
Contributor

Summary

This PR refactors the input cache added in #16740 to make it easier to read and maintain.

It keeps inputCacheDir as the class-scoped location for cached physical files and keeps CACHED_DATA_FILES as the in-memory cache. CachedDataFilesKey now uses structured fields: partition spec, table options, file count, row count, and partition count.

writeRecords now handles cache reuse directly. On a hit, it appends cached files to the fresh table. On a miss, it writes inputs under inputCacheDir, plans files with column stats, caches them, and appends them to the target table.

Verified locally with temporary instrumentation that the cache has both misses and later hits.


AI Disclosure

  • Model: GPT-5
  • Platform/Tool: Codex
  • Human Oversight: fully reviewed
  • Prompt Summary: Refactor Spark rewrite input caching while preserving test semantics.

Keep regular test tables scoped to each test while materializing reusable cache inputs under a class-scoped cache directory. Key cached data files by the table options that shape the generated input files.

Generated-by: Codex
@kevinjqliu

Copy link
Copy Markdown
Contributor Author

cc @Baunsgaard

@nssalian

Copy link
Copy Markdown
Collaborator

@kevinjqliu it would be good to add these to the v4.0 and v4.1 Spark too.

Keep rewrite data file input caching consistent across Spark 4.0 and 4.1 test suites.

Generated-by: Codex
Explain why cached data files clear first_row_id before re-append.

Generated-by: Codex
Keep cached file lists immutable, document cache cleanup, and preserve the first-row-id reset invariant on re-append.

Generated-by: Codex
@nssalian nssalian requested a review from huaxingao June 20, 2026 00:26

@nssalian nssalian left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@amogh-jahagirdar amogh-jahagirdar left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @kevinjqliu . The structured CachedDataFilesKey reads better than the string key, so I do think it's an improvement.

Before we go further though, I want to step back and ask whether caching is the right thing to be doing here at all. My worry is that we're adding a fair amount of complexity and potential brittleness for a relatively minor win.

From my reading of the code, here's the overall of what happens (lmk if i'm missing anything) in the tests:

  1. Create a new table.
  2. Materialize the input. Generate ~SCALE (400k) rows on the driver, then createDataFrame, repartition (a shuffle), sortWithinPartitions (a sort), and the spark job to write. So parquet writes, metadata writes, and the commit.
  3. Run the rewrite under test. RewriteDataFiles reads those 400k rows, groups them, and writes new files. This also scales with SCALE.
  4. Assert on the output.

The cache helps step 2 afaict , and only when there's a hit. Following from that,

1/ The rewrite in step 3 is what we're actually testing, and it still reads all 400k rows on every test no matter what. So the most this can ever save is the input write half, which somewhat lines up with the ~10% savings on the original PR.

2/ Hits are limited by the key. The key includes the format version (via options), the cache is static per JVM, and tests that vary shape or properties don't share. So a lot of the materializations are misses the first time they're seen, and the benefit is fairly narrow.

I think we're taking on a fair bit of burden for a relatively narrow win. The key has to stay a complete description of everything that affects the written files, or I think we may silently hand back the wrong ones.

As an additional point, while table.newAppend() reflects what Spark does today in the integration, I'd note that we're not quite really testing the "real" spark write path in the case of a cache hit (and I mean more than data file write here, I mean even the metadata commit path that Spark chooses to use in the future maybe different and then our tests may not fully represent the real write path which may end up masking things that may have otherwise been incidental coverage).

I think my actual question is have we measured how far just lowering SCALE (and/or target file size) gets us? That shrinks both step 2 and step 3, for every test, with no key to maintain and no shared state. Most of the assertions here look like they care about
file count and grouping. If that's true we could probably drop SCALE a fair bit without losing coverage, and get a bigger and simpler win than the cache gives us. Not sure if this analysis was already done, and maybe I just missed that.

If that gets us most of the way we don't need the cache. If it doesn't, at
least we'll know how much of the time is the input write vs the rewrite vs Spark overhead, and we can decide from there.

@kevinjqliu

Copy link
Copy Markdown
Contributor Author

I think we're taking on a fair bit of burden for a relatively narrow win. The key has to stay a complete description of everything that affects the written files, or I think we may silently hand back the wrong ones.

+1 while its great to reduce resource utilization for CI, I would like to prioritize test correctness, readability, and maintainability above all else. If that cost us more resources, then so be it 😄

I do think there are other opportunity to pursue. In general, I think we should modify the tests themselves only as a last resort, and only when its not affecting the semantic or correctness of those tests. In this case I think it might be a good idea to revert #16740 and also forgo this PR.

I can raise this in the devlist thread about CI utilization if necessary

@laskoviymishka laskoviymishka left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to @amogh-jahagirdar’s framing. I’d settle the “should we cache at all?” question before polishing this further.

CachedDataFilesKey is definitely cleaner than the string key, but if the cache only saves the input-write part — roughly 10%, and only on a hit — we’re carrying quite a bit of shared state for a pretty narrow win.

The concerns here are already concrete, not theoretical:

  • the per-key lock is gone, so two threads can materialize the same 400K-row shape and orphan one result
  • first_row_id gets cleared twice on every hit
  • the key assumes specId=0 and one fixed schema, so it’s not yet a complete description of the files it returns

None of these are hard to fix, but they’re all complexity that simply lowering SCALE would avoid.

Since the cache came from #16740, @Baunsgaard, would you be up for driving the call here? The data we need is small: a rough split between input writes, the rewrite itself, and Spark overhead, plus the smallest SCALE that still preserves the file-count/grouping assertions.

If lowering SCALE gets us most of the win, I’d drop the cache and make this PR that change. If not, we’ll at least know the tradeoff and can harden the cache with a clearer reason for keeping it.

I’d hold the smaller fixes until we pick the direction.


CachedDataFilesKey key =
new CachedDataFilesKey(spec, tableOptions, files, numRecords, partitionCount);
List<DataFile> cachedDataFiles = CACHED_DATA_FILES.get(key);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flagging this as a real cost of the cache rather than a blocker, given the open question above about whether we keep it at all.

This get()-then-cacheDataFiles() is check-then-act, and the per-key lock the old code had was doing real work. Two threads can both see null, both run the full SCALE=400K Spark write into separate UUID dirs, and putIfAbsent keeps one — the loser's dir is orphaned under inputCacheDir for the class lifetime, and clearCachedDataFiles() never reclaims it. Parameterized formatVersion runs don't collide today (formatVersion is in the key), but concurrent test methods or a forkCount=0 profile would.

If the cache stays, CACHED_DATA_FILES.computeIfAbsent(key, k -> ...) makes the heavy build run once per key and keeps the "build each shape once per fork" guarantee. (Same in v4.0/v4.1.)

AppendFiles append = targetTable.newAppend();
dataFiles.stream()
// Ensure reused files don't carry first_row_id from the cache table into the target table.
.map(file -> DataFiles.builder(spec).copy(file).withFirstRowId(null).build())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same "cost of the cache" note, not a blocker: we clear first_row_id twice. cacheDataFiles already stores files through withFirstRowId(null), so by the time we're here it's already null and this rebuild is a no-op clear plus a fresh builder/object per file on every hit (the common path). The comment also reads as if this is the load-bearing clear, but the one in cacheDataFiles is.

If the cache stays, I'd drop the rebuild here and just dataFiles.forEach(append::appendFile), keeping the explanatory comment up at the cacheDataFiles clear. (Same in v4.0/v4.1.)

@kevinjqliu

Copy link
Copy Markdown
Contributor Author

@laskoviymishka @Baunsgaard could we revert #16740 then? This PR would be unnecessary then 😄

@laskoviymishka

Copy link
Copy Markdown
Contributor

@kevinjqliu I think - this is a right thing to do here.

@kevinjqliu kevinjqliu closed this Jun 22, 2026
@kevinjqliu kevinjqliu deleted the kevinjqliu/codex-cache-rewrite-data-files-input branch June 22, 2026 04:24
@Baunsgaard

Copy link
Copy Markdown
Contributor

Apologies for the silence here, and thanks for driving this forward @kevinjqliu and @laskoviymishka, I was offline for the last few days.

First off: I'm not attached to this solution. Reverting #16740 is fine with me, and lowering SCALE from 400k to something smaller is a perfectly valid alternative that would deliver similar benefits with less machinery. So I'm happy to go either way on the direction.

One point I want to make sure doesn't get lost, though: the ~8% number is the reduction in the entire core job duration, not the saving on the single tests. In other words, caching the input writes of just this one class shaved ~8% off the whole Spark core suite (see the per-version table in #16740: -10.0% / -7.5% / -4.6%). That reframes the tradeoff a bit. It's not "a lot of machinery for 8% of one test", it's "8% of the whole job from one class's input writes."

@amogh-jahagirdar is right that it only helps step 2, and only within a single JVM fork. That narrow scope was deliberate, I kept it intra-fork specifically to minimize the risk of handing back incorrectly cached data. If we broadened it across JVM forks (or to a GitHub Actions cache), the gains would be larger, but so would the correctness risk, which is exactly the burden being called out.

On the root cause: the reason step 2 is so expensive is that we materialize the input through the full Spark write path (createDataFrame, repartition, sortWithinPartitions, the parquet + metadata write, the commit). That's a lot of Spark machinery just to produce deterministic input data that the test doesn't actually care about producing via Spark.

I've been profiling and have some WIP code that writes that input data with native Iceberg writers instead of going through the Spark ecosystem. Just on my Machine not in a PR

spark native delta
732.5 s (12:12) 519.6 s (8:40) -213 s (-29%)

That's a ~29% reduction on the tests themselves, every test, no shared state, no cache key to keep correct, and it still exercises the real rewrite path under test (step 3) unchanged. If that holds up across more reps, it's a bigger and cleaner win than either the cache or lowering SCALE, and it directly addresses the correctness/maintainability concerns raised here.

So my proposal: I'm fine reverting #16740 now to unblock this (I can see it is already done), and I'll follow up with the native-write approach (plus the input-write vs rewrite vs Spark-overhead split)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants