Core: update manifest delete file size after rewrite table action#15470
Conversation
| "1.10.0": | ||
| org.apache.iceberg:iceberg-api: | ||
| - code: "java.class.defaultSerializationChanged" | ||
| old: "class org.apache.iceberg.encryption.EncryptingFileIO" | ||
| new: "class org.apache.iceberg.encryption.EncryptingFileIO" | ||
| justification: "New method for Manifest List reading" | ||
| org.apache.iceberg:iceberg-core: | ||
| - code: "java.class.noLongerInheritsFromClass" |
There was a problem hiding this comment.
Was this moved unintentionally?
There was a problem hiding this comment.
It's just what the ./gradlew revapi generated for me. As best as I can tell, this is what it did:
- The existing 1.10.0 block moved earlier in the file (alphabetical sort?)
- YAML indent changed from - code: (4-space) to - code: (2-space)
- Our 4 new entries added (java.element.noLongerDeprecated, two java.method.numberOfParametersChanged, one java.method.removed for rewriteDeleteManifest)
There was a problem hiding this comment.
Could you keep only the four new entries and revert the reordering/re-indentation of the existing 1.10.0 block? Manually adding the new entries into the existing block (keeping the original 4-space indent and ordering) would make the diff much smaller and easier to review.
There was a problem hiding this comment.
I just want to mention that we must avoid breaking the API here. Please follow https://iceberg.apache.org/contribute/#minor-version-deprecations-required to avoid breaking the API
277c0e2 to
8922c70
Compare
|
hmmm. In #15586 I've been working on skipping the HEAD call within s3afs and azure abfs client openFile() calls, and had been thinking that the manifest data would be best as it'd save another 100 mS/file on what is often a critical path. We need accurate manifests for that. |
| // manifest was not updated. Readers that use file_size_in_bytes to elide a stat() call may | ||
| // fail. | ||
| @TestTemplate | ||
| public void testDeleteFileSizeInBytesAfterRewrite() throws Exception { |
There was a problem hiding this comment.
are there equivalents of this for the other operations which add files to the manifests? if so they'd be handy being added for regression testing there too -though not in this PR
steveloughran
left a comment
There was a problem hiding this comment.
reviewed again. I think changes within RewriteTablePathUtil can eliminate the need for a HEAD call with a v3 puffin file.
…ask, eliminating the separate `rewritePositionDeletes()` Spark job. Each manifest-writing task now also rewrites the delete files that manifest references, measures the actual size via `getLength()`, and records it in the manifest entry.
…ustification "rewriteDeleteManifest signature changed to accept PositionDeleteReaderWriter for inline position delete rewriting; rewritePositionDeleteFile now returns file size to avoid getLength HEAD call".
58725c3 to
78107a1
Compare
steveloughran
left a comment
There was a problem hiding this comment.
I like the change.
|
@mbutrovich as you're changing the signature of |
Sounds good, thanks @wypoon! @anuragmantri suggested I do the change in one version first to make it easier for reviewers (reduce the diff), get consensus on the approach, and then once folks are good with it I can generate the changes for the other versions. I'm happy to do that at any time. |
|
@mbutrovich it is true that having changes for multiple versions of Spark and Flink clutters up a PR and makes reviewing more difficult. However, you can at least put up the changes for one version of Flink and for Delta conversion, and then later for all the versions. |
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Thanks @mbutrovich I had some comments but I agree, I think we should aim to get this into a patch release. thanks for your work on this!
| * rewritten. Equality delete files are excluded because they hold no absolute paths and are not | ||
| * rewritten. | ||
| */ | ||
| private Set<DeleteFile> positionDeletesToRewrite(Set<ManifestFile> metaFiles) { |
There was a problem hiding this comment.
I would probably just have the caller pass in the Set deleteManifests. I'm not sure why the original code kept calling this "metaFiles" but that's not a good name for it.
There was a problem hiding this comment.
Done. positionDeletesToRewrite now takes a pre-filtered Set<ManifestFile> deleteManifests, and the local metaFiles is renamed manifestFiles.
| // Enumerate the distinct position delete files referenced by the delete manifests being | ||
| // rewritten (metadata-only pass). |
There was a problem hiding this comment.
I don't think the comment is particularly helpful, it's pretty apparent from the code what's happening here, I'd remove it.
There was a problem hiding this comment.
Done, removed/trimmed.
| * @param stagingLocation staging location for rewritten files (referred delete file will be | ||
| * rewritten here) | ||
| * @return a copy plan of content files in the manifest that was rewritten | ||
| * @deprecated since 1.11.0, will be removed in 1.12.0; use the overload that accepts the map of |
There was a problem hiding this comment.
I think you're going to have to change to 1.12.0 and 1.13.0 respectively now
There was a problem hiding this comment.
Done. Updated to "since 1.12.0, will be removed in 1.13.0".
| * @param targetPrefix target prefix to replace it | ||
| * @param posDeleteReaderWriter class to read and write position delete files | ||
| */ | ||
| public static void rewritePositionDeleteFile( |
There was a problem hiding this comment.
Why don't we deprecate this one as well?
There was a problem hiding this comment.
Done. rewritePositionDeleteFile is now @Deprecated and delegates to the new method; it had no remaining in-tree callers.
| * @param posDeleteReaderWriter class to read and write position delete files | ||
| * @return the size in bytes of the rewritten file | ||
| */ | ||
| public static long rewritePositionDeleteFileReturningLength( |
There was a problem hiding this comment.
If we deprecate the old one, I think we can just give this a shorter name like rewritePositionDelete.
There was a problem hiding this comment.
Done. Renamed to rewritePositionDelete.
| * Enumerate the distinct position delete files referenced by the delete manifests being | ||
| * rewritten. Equality delete files are excluded because they hold no absolute paths and are not | ||
| * rewritten. |
There was a problem hiding this comment.
Not sure this comment is really necessary (especially the second statement since that's apparent from the name)
There was a problem hiding this comment.
Done, removed/trimmed.
| .isEqualTo(1); | ||
| } | ||
|
|
||
| // Regression test: when the same position delete file is referenced from manifests in different |
There was a problem hiding this comment.
Since we have to change the other spark modules in this PR why not include the tests for those as well?
There was a problem hiding this comment.
Done. Ported the new tests to v3.5 and v4.0; the three modules are identical.
| Encoders.tuple(Encoders.STRING(), Encoders.LONG())) | ||
| .collectAsList(); | ||
|
|
||
| Map<String, Long> sizesBySourcePath = Maps.newHashMap(); |
There was a problem hiding this comment.
Minor: I'd presize the map to the length of the rewrittenSizes.
There was a problem hiding this comment.
Done. Maps.newHashMapWithExpectedSize(...).
| // file was not rewritten (e.g. deleted entries that are not copied to the target). | ||
| long fileSizeInBytes = | ||
| rewrittenDeleteFileSizes.getOrDefault(file.location(), file.fileSizeInBytes()); |
There was a problem hiding this comment.
Do we have a test which exercises the fallback to using file.fileSizeInBytes() for the DELETED entries? I couldn't find one..
There was a problem hiding this comment.
Added testRewriteDeleteManifestFallsBackToOriginalSizeForDeletedEntries in TestRewriteTablePathUtil (a manifest with one live and one DELETED entry, a size map with only the live file; asserts the DELETED entry keeps its original size). Did it at the core level since a DELETED entry's file is usually still live in an earlier manifest, so a full copy rewrites it and the fallback branch never runs.
| .collectAsList(); | ||
|
|
||
| // DeleteFile does not override equals(); DeleteFileSet dedupes by path so a delete file shared | ||
| // across manifests is rewritten only once. |
There was a problem hiding this comment.
Don't think the comment is neccessary here, also I think you can use DeleteFileSet.of(referencedDeleteFiles) below.
There was a problem hiding this comment.
Done. Used DeleteFileSet.of(referencedDeleteFiles) for enumeration and dropped the comment. The physical-rewrite dedup by location() is kept separately so multi-DV Puffin files don't collide (kevin's @819).
kevinjqliu
left a comment
There was a problem hiding this comment.
Thanks for working on this! I have few nits on comments plus a question on DV location
| * <ul> | ||
| * <li>Rebuild version files to staging | ||
| * <li>Rebuild manifest list files to staging | ||
| * <li>Rebuild manifest to staging | ||
| * <li>Get all files needed to move | ||
| * </ul> |
There was a problem hiding this comment.
| * <ul> | |
| * <li>Rebuild version files to staging | |
| * <li>Rebuild manifest list files to staging | |
| * <li>Rewrite referenced position delete files to staging | |
| * <li>Rebuild manifests to staging | |
| * <li>Get all files needed to move | |
| * </ul> |
update the docs
There was a problem hiding this comment.
Done. The phase list now includes the position-delete rewrite step and "Rebuild manifests".
| // Enumerate the distinct position delete files referenced by the delete manifests being | ||
| // rewritten (metadata-only pass). | ||
| Set<DeleteFile> deleteFilesToRewrite = positionDeletesToRewrite(metaFiles); | ||
|
|
||
| // Rewrite those delete files in parallel (deduped by path) and collect the actual size of each | ||
| // rewritten file. The size is measured from the writer on the executor that produced the file, | ||
| // avoiding both the end-of-job burst of getLength()/HEAD calls and the file system races where | ||
| // an in-progress write underreports its length. | ||
| Map<String, Long> rewrittenDeleteFileSizes = rewritePositionDeletes(deleteFilesToRewrite); | ||
|
|
||
| // Rewrite manifests (metadata-only), stamping file_size_in_bytes from the measured sizes. |
There was a problem hiding this comment.
nit: on comments, i feel shorter comments are better.
a lot of of the additional context can be move to the function javadoc if necessary.
i feel just this is enough
// rebuild position delete files
// rebuild manifest files
There was a problem hiding this comment.
Done. Trimmed to // rebuild position delete files / // rebuild manifest files.
| sourcePrefixArg, | ||
| targetPrefixArg, | ||
| posDeleteReaderWriter); | ||
| return new Tuple2<>(deleteFile.location(), rewrittenLength); |
There was a problem hiding this comment.
For v3 tables, multiple DV DeleteFiles can point to the same Puffin file while using different content offsets/sizes. In this action, the staged rewrite path is derived only from deleteFile.location(), so those logical DVs can try to rewrite the same physical Puffin file to the same staged path.
I verified this locally with a v3 table containing two DVs in one Puffin; rewrite_table_path failed with AlreadyExistsException.
Can we dedupe the physical DV rewrite work by location, while still applying the rewritten Puffin file size to each manifest entry?
There was a problem hiding this comment.
for context:
positionDeletesToRewritecan return twoDeleteFileswith the same location, as long as they have different DV blob offsets/sizesrewritePositionDeletewrites a new file perDeleteFile- so the first will write to disk, second will fail with
AlreadyExistingException
There was a problem hiding this comment.
This is probably the key correctness issue to deal with
There was a problem hiding this comment.
Confirmed and fixed. DeleteFileSet dedupes by (location, offset, size), so two DVs in one Puffin both reach the rewrite and collide on the staging path. The rewrite phase now dedupes by location() so the Puffin is rewritten once, and the size map is keyed by location so each entry gets the right whole-Puffin size. Added testSharedPuffinDeleteFileSizeAfterRewrite.
| @@ -669,7 +769,7 @@ | |||
| * @param sourcePrefix source prefix that will be replaced | |||
| * @param targetPrefix target prefix to replace it | |||
| */ | |||
There was a problem hiding this comment.
nit: add return to java docs
* @return the size in bytes of the rewritten DV file
There was a problem hiding this comment.
Done. Added @return to the rewriteDVFile javadoc.
|
Thanks for the detailed pass, @amogh-jahagirdar and @kevinjqliu. Sorry for the delayed response, I was on vacation for a few days. I reworked the PR per the latest feedback. API is now additive plus deprecation (9-param |
steveloughran
left a comment
There was a problem hiding this comment.
looked at the code; all LGTM. that leave's @kevinjqliu's concerns about duplicate file writes to the same path.
Thanks @steveloughran. @kevinjqliu's duplicate-write concern is fixed in bbbe466: the rewrite phase dedupes physical files by |
…r_rewrite # Conflicts: # core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
# Conflicts: # core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
kevinjqliu
left a comment
There was a problem hiding this comment.
LGTM, thank you for the comprehensive change and tests 🙏
|
cc @amogh-jahagirdar @szehon-ho please take a look when you get a chance |
|
Codex flagged a similar issue with rewrite table action not updating the manifest file size in manifest lists 😄 (similar issue, just up one level). I think someone mentioned it somewhere in this PR. Here are the relevant links:
Just flagging since i think the fix would look very similar to this PR and we might want to include it in our patch release |
There was a problem hiding this comment.
I think the fix looks good, after looking at it with fresh eyes I think we really should look at how to make the core APIs less cumbersome but that's for a follow on.
I do think the issue that @kevinjqliu pointed out is legitimate, but I think that can be fixed in a separate PR and included in the patches since this PR is fairly large already.
| * the rewritten file; entries absent from the map keep their original size | ||
| * @return a copy plan of content files in the manifest that was rewritten | ||
| */ | ||
| public static RewriteResult<DeleteFile> rewriteDeleteManifest( |
There was a problem hiding this comment.
Ok I don't think we need to do it in this PR but after looking at this, these APIs are getting pretty unfortunate, we're passing in 10 arguments now.
I think we'll want to introduce some builders and some structure like RewriteContext, and have structures which kick back things like a DeleteManifestRewriter.
I think what I'm asking here is more fundamental changes (and possibly an overhaul/elimination of this Util class) so like I said, definitely for something in the future.
|
Thank you @mbutrovich and @kevinjqliu @steveloughran @szehon-ho @wypoon for reviewing. I'll go ahead and merge this |
|
I'll take a look at #16910 implementation for the manifest file size in manifest lists issue |
There was a problem hiding this comment.
Thanks for the fix — this is well-scoped and nicely tested. Two questions, no inline needed:
-
Enumerating live delete files vs. the old copy-plan set. The new flow enumerates live position-delete entries straight from the delete manifests (positionDeletesToRewrite), whereas the previous flow derived the files from the manifest-rewrite copy plan (which also pulled in DELETED entries via the unconditionaltoRewrite().add(...)). The net effect is that delete files appearing only as DELETED are no longer physically staged (an improvement), butrewrittenDeleteFilePathsCountcan now differ from before. Since that's a user-visible result field, can you confirm the count change is intentional? -
Broadcasting the size map.sparkContext().broadcast(rewrittenDeleteFileSizes)broadcasts a map with one entry per distinct delete-file location. The sizes are already collected to the driver, so it's bounded by that, but for tables with very many delete files this is still a sizable driver-collected + broadcast map. Could you add a short comment noting the scaling assumption (and confirm you've considered it for large delete-heavy tables)?
Edit: I think these are addressed already, no never mind!
Which issue does this PR close?
Closes #12554.
Rationale for this change
rewriteTablePathrewrites position delete files (updating embedded data file paths), which changes their size.file_size_in_bytesin the rewritten manifest reflects the original size, not the rewritten size. Readers that trust this field (Trino, Impala, Comet, iceberg-rust) fail with errors like "end of stream not reached."What changes are included in this PR?
The position delete rewrite is a dedicated distributed phase, separate from manifest rewriting:
location()so a Puffin file holding multiple DVs is written once. Each task measures its rewritten file's size from the writer (writer.length()) and emitspath -> size; the driver collects this into a map and broadcasts it.file_size_in_bytesfrom the broadcast map keyed by location. Entries absent from the map (for example DELETED entries not copied to the target) keep their original size.Parallelism is at delete-file granularity. Sizes are measured on the workers from closed files, so there is no end-of-job burst of
getLength()/HEAD calls and no in-progress-write length race.API surface
The new methods are additive and the changed methods are only deprecated (no removals or signature changes), so this should not require
.palantir/revapi.ymlchanges. Confirm with./gradlew revapi.RewriteTablePathUtil.rewriteDeleteManifest: new 10-param overload takes the measuredMap<String, Long>of rewritten delete-file sizes and is metadata-only. The 9-param overload is@Deprecated(since 1.12.0, removed in 1.13.0) and delegates with an empty map.RewriteTablePathUtil.rewritePositionDelete: new public method that rewrites a position delete file and returns the rewritten size, measured from the writer after close. The existingvoid rewritePositionDeleteFileis@Deprecated(since 1.12.0, removed in 1.13.0) and delegates to it.Applied to Spark 3.5 / 4.0 / 4.1 together. Flink and Delta callers use the still-present deprecated overloads and are unaffected.
How are these changes tested?
New tests in
TestRewriteTablePathsAction(all three Spark modules):testDeleteFileSizeInBytesAfterRewrite: position deletes under a deeply nested path; assertsfile_size_in_bytesin the rewritten manifest matches the on-disk size.testSharedDeleteFileSizeAcrossManifests: same delete file referenced from multiple manifests; asserts size correctness across all rewritten delete manifests.testMultipleDistinctDeleteFileSizesAfterRewrite: two distinct delete files of different sizes in one manifest; guards the per-path size mapping.testSharedPuffinDeleteFileSizeAfterRewrite: two DVs packed in one Puffin file; regression test that the physical file is deduped by location (noAlreadyExistsException) and the rewritten size is stamped into each entry.TestRewriteTablePathUtiladdstestRewriteDeleteManifestFallsBackToOriginalSizeForDeletedEntries: a delete manifest with one live and one DELETED position delete entry, asserting the DELETED entry (absent from the measured-size map) keeps its original size.AI Usage
I am more familiar with the iceberg-rust codebase, so Claude helped me navigate the code, prototype a design, and draft the PR description (in DataFusion Comet's PR template). Claude also helped me with the API change failures in CI.