Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,22 @@ acceptedBreaks:
old: "class org.apache.iceberg.Metrics"
new: "class org.apache.iceberg.Metrics"
justification: "Java serialization across versions is not guaranteed"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.actions.RewriteTablePath org.apache.iceberg.actions.RewriteTablePath::hiveMetaMigrate(boolean)"
justification: "Enhance the RewriteTablePath Procedure"
org.apache.iceberg:iceberg-core:
- code: "java.method.numberOfParametersChanged"
old: "method org.apache.iceberg.RewriteTablePathUtil.RewriteResult<org.apache.iceberg.DataFile>\
\ org.apache.iceberg.RewriteTablePathUtil::rewriteDataManifest(org.apache.iceberg.ManifestFile,\
\ org.apache.iceberg.io.OutputFile, org.apache.iceberg.io.FileIO, int, java.util.Map<java.lang.Integer,\
\ org.apache.iceberg.PartitionSpec>, java.lang.String, java.lang.String) throws\
\ java.io.IOException"
new: "method org.apache.iceberg.RewriteTablePathUtil.RewriteResult<org.apache.iceberg.DataFile>\
\ org.apache.iceberg.RewriteTablePathUtil::rewriteDataManifest(org.apache.iceberg.ManifestFile,\
\ org.apache.iceberg.io.OutputFile, org.apache.iceberg.io.FileIO, int, java.util.Map<java.lang.Integer,\
\ org.apache.iceberg.PartitionSpec>, java.lang.String, java.lang.String, boolean)\
\ throws java.io.IOException"
justification: "Enhance the RewriteTablePath Procedure"
- code: "java.method.removed"
old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\
\ org.apache.hadoop.conf.Configuration)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ public interface RewriteTablePath extends Action<RewriteTablePath, RewriteTableP
*/
RewriteTablePath stagingLocation(String stagingLocation);

/**
* Determines whether a Hive metadata migration is in progress.
*
* @param metaMigrate A boolean indicating if the metadata migration is happening.
* @return this for method chaining
*/
RewriteTablePath hiveMetaMigrate(boolean metaMigrate);

/** The action result that contains a summary of the execution. */
interface Result {
/** Staging location of rewritten files */
Expand Down
38 changes: 25 additions & 13 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ private static List<ManifestFile> manifestFilesInSnapshot(FileIO io, Snapshot sn
* @param specsById map of partition specs by id
* @param sourcePrefix source prefix that will be replaced
* @param targetPrefix target prefix that will replace it
* @param hiveMetaMigrate hive meta migrate
* @return a copy plan of content files in the manifest that was rewritten
*/
public static RewriteResult<DataFile> rewriteDataManifest(
Expand All @@ -301,15 +302,19 @@ public static RewriteResult<DataFile> rewriteDataManifest(
int format,
Map<Integer, PartitionSpec> specsById,
String sourcePrefix,
String targetPrefix)
String targetPrefix,
boolean hiveMetaMigrate)
throws IOException {
PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
try (ManifestWriter<DataFile> writer =
ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId());
ManifestReader<DataFile> reader =
ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) {
return StreamSupport.stream(reader.entries().spliterator(), false)
.map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, targetPrefix, writer))
.map(
entry ->
writeDataFileEntry(
entry, spec, sourcePrefix, targetPrefix, writer, hiveMetaMigrate))
.reduce(new RewriteResult<>(), RewriteResult::append);
}
}
Expand Down Expand Up @@ -358,21 +363,28 @@ private static RewriteResult<DataFile> writeDataFileEntry(
PartitionSpec spec,
String sourcePrefix,
String targetPrefix,
ManifestWriter<DataFile> writer) {
ManifestWriter<DataFile> writer,
boolean hiveMetaMigrate) {
RewriteResult<DataFile> result = new RewriteResult<>();
DataFile dataFile = entry.file();
String sourceDataFilePath = dataFile.location();
Preconditions.checkArgument(
sourceDataFilePath.startsWith(sourcePrefix),
"Encountered data file %s not under the source prefix %s",
sourceDataFilePath,
sourcePrefix);
String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, targetPrefix);
DataFile newDataFile =
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
appendEntryWithFile(entry, writer, newDataFile);
DataFile newDataFile;
if (hiveMetaMigrate) {
newDataFile = DataFiles.builder(spec).copy(entry.file()).build();
appendEntryWithFile(entry, writer, newDataFile);
} else {
Preconditions.checkArgument(
sourceDataFilePath.startsWith(sourcePrefix),
"Encountered data file %s not under the source prefix %s",
sourceDataFilePath,
sourcePrefix);
String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, targetPrefix);
newDataFile = DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
appendEntryWithFile(entry, writer, newDataFile);
}

// keep deleted data file entries but exclude them from copyPlan
if (entry.isLive()) {
if (entry.isLive() && newDataFile != null) {
result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location()));
}
return result;
Expand Down
1 change: 1 addition & 0 deletions spark/v3.5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
implementation project(':iceberg-orc')
implementation project(':iceberg-parquet')
implementation project(':iceberg-arrow')
implementation project(':iceberg-hive-metastore')
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}")
implementation("org.apache.datasketches:datasketches-java:${libs.versions.datasketches.get()}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePat
private String startVersionName;
private String endVersionName;
private String stagingDir;
private boolean hiveMetaMigrate;

private final Table table;
private Broadcast<Table> tableBroadcast = null;
Expand Down Expand Up @@ -148,6 +149,12 @@ public RewriteTablePath stagingLocation(String stagingLocation) {
return this;
}

@Override
public RewriteTablePath hiveMetaMigrate(boolean pMetaMigrate) {
this.hiveMetaMigrate = pMetaMigrate;
return this;
}

@Override
public Result execute() {
validateInputs();
Expand Down Expand Up @@ -506,7 +513,8 @@ private RewriteContentFileResult rewriteManifests(
stagingDir,
tableMetadata.formatVersion(),
sourcePrefix,
targetPrefix),
targetPrefix,
hiveMetaMigrate),
Encoders.bean(RewriteContentFileResult.class))
// duplicates are expected here as the same data file can have different statuses
// (e.g. added and deleted)
Expand All @@ -518,15 +526,22 @@ private static MapFunction<ManifestFile, RewriteContentFileResult> toManifests(
String stagingLocation,
int format,
String sourcePrefix,
String targetPrefix) {
String targetPrefix,
boolean hiveMetaMigrate) {

return manifestFile -> {
RewriteContentFileResult result = new RewriteContentFileResult();
switch (manifestFile.content()) {
case DATA:
result.appendDataFile(
writeDataManifest(
manifestFile, table, stagingLocation, format, sourcePrefix, targetPrefix));
manifestFile,
table,
stagingLocation,
format,
sourcePrefix,
targetPrefix,
hiveMetaMigrate));
break;
case DELETES:
result.appendDeleteFile(
Expand All @@ -547,14 +562,22 @@ private static RewriteResult<DataFile> writeDataManifest(
String stagingLocation,
int format,
String sourcePrefix,
String targetPrefix) {
String targetPrefix,
boolean hiveMetaMigrate) {
try {
String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
FileIO io = table.getValue().io();
OutputFile outputFile = io.newOutputFile(stagingPath);
Map<Integer, PartitionSpec> specsById = table.getValue().specs();
return RewriteTablePathUtil.rewriteDataManifest(
manifestFile, outputFile, io, format, specsById, sourcePrefix, targetPrefix);
manifestFile,
outputFile,
io,
format,
specsById,
sourcePrefix,
targetPrefix,
hiveMetaMigrate);
} catch (IOException e) {
throw new RuntimeIOException(e);
}
Expand Down
Loading