diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index a19898f443e0..de8192c73925 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1177,7 +1177,22 @@ acceptedBreaks: old: "class org.apache.iceberg.Metrics" new: "class org.apache.iceberg.Metrics" justification: "Java serialization across versions is not guaranteed" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.actions.RewriteTablePath org.apache.iceberg.actions.RewriteTablePath::hiveMetaMigrate(boolean)" + justification: "Enhance the RewriteTablePath Procedure" org.apache.iceberg:iceberg-core: + - code: "java.method.numberOfParametersChanged" + old: "method org.apache.iceberg.RewriteTablePathUtil.RewriteResult\ + \ org.apache.iceberg.RewriteTablePathUtil::rewriteDataManifest(org.apache.iceberg.ManifestFile,\ + \ org.apache.iceberg.io.OutputFile, org.apache.iceberg.io.FileIO, int, java.util.Map, java.lang.String, java.lang.String) throws\ + \ java.io.IOException" + new: "method org.apache.iceberg.RewriteTablePathUtil.RewriteResult\ + \ org.apache.iceberg.RewriteTablePathUtil::rewriteDataManifest(org.apache.iceberg.ManifestFile,\ + \ org.apache.iceberg.io.OutputFile, org.apache.iceberg.io.FileIO, int, java.util.Map, java.lang.String, java.lang.String, boolean)\ + \ throws java.io.IOException" + justification: "Enhance the RewriteTablePath Procedure" - code: "java.method.removed" old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\ \ org.apache.hadoop.conf.Configuration)" diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java index a63647011c3b..e9f6e9dd6393 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java @@ -86,6 +86,14 @@ public interface RewriteTablePath extends Action manifestFilesInSnapshot(FileIO io, Snapshot sn * @param specsById map of partition specs by id * @param sourcePrefix source prefix that will be replaced * @param targetPrefix target prefix that will replace it + * @param hiveMetaMigrate hive meta migrate * @return a copy plan of content files in the manifest that was rewritten */ public static RewriteResult rewriteDataManifest( @@ -301,7 +302,8 @@ public static RewriteResult rewriteDataManifest( int format, Map specsById, String sourcePrefix, - String targetPrefix) + String targetPrefix, + boolean hiveMetaMigrate) throws IOException { PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); try (ManifestWriter writer = @@ -309,7 +311,10 @@ public static RewriteResult rewriteDataManifest( ManifestReader reader = ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) { return StreamSupport.stream(reader.entries().spliterator(), false) - .map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, targetPrefix, writer)) + .map( + entry -> + writeDataFileEntry( + entry, spec, sourcePrefix, targetPrefix, writer, hiveMetaMigrate)) .reduce(new RewriteResult<>(), RewriteResult::append); } } @@ -358,21 +363,28 @@ private static RewriteResult writeDataFileEntry( PartitionSpec spec, String sourcePrefix, String targetPrefix, - ManifestWriter writer) { + ManifestWriter writer, + boolean hiveMetaMigrate) { RewriteResult result = new RewriteResult<>(); DataFile dataFile = entry.file(); String sourceDataFilePath = dataFile.location(); - Preconditions.checkArgument( - sourceDataFilePath.startsWith(sourcePrefix), - "Encountered data file %s not under the source prefix %s", - sourceDataFilePath, - sourcePrefix); - String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, targetPrefix); - DataFile newDataFile = - DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build(); - appendEntryWithFile(entry, writer, newDataFile); + DataFile newDataFile; + if (hiveMetaMigrate) { + newDataFile = DataFiles.builder(spec).copy(entry.file()).build(); + appendEntryWithFile(entry, writer, newDataFile); + } else { + Preconditions.checkArgument( + sourceDataFilePath.startsWith(sourcePrefix), + "Encountered data file %s not under the source prefix %s", + sourceDataFilePath, + sourcePrefix); + String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, targetPrefix); + newDataFile = DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build(); + appendEntryWithFile(entry, writer, newDataFile); + } + // keep deleted data file entries but exclude them from copyPlan - if (entry.isLive()) { + if (entry.isLive() && newDataFile != null) { result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location())); } return result; diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index 7ed9a86a561a..ee60608ea1ff 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -60,6 +60,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { implementation project(':iceberg-orc') implementation project(':iceberg-parquet') implementation project(':iceberg-arrow') + implementation project(':iceberg-hive-metastore') implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}") implementation("org.apache.datasketches:datasketches-java:${libs.versions.datasketches.get()}") diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index 98d6c10b029a..c9c90458d862 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -95,6 +95,7 @@ public class RewriteTablePathSparkAction extends BaseSparkAction tableBroadcast = null; @@ -148,6 +149,12 @@ public RewriteTablePath stagingLocation(String stagingLocation) { return this; } + @Override + public RewriteTablePath hiveMetaMigrate(boolean pMetaMigrate) { + this.hiveMetaMigrate = pMetaMigrate; + return this; + } + @Override public Result execute() { validateInputs(); @@ -506,7 +513,8 @@ private RewriteContentFileResult rewriteManifests( stagingDir, tableMetadata.formatVersion(), sourcePrefix, - targetPrefix), + targetPrefix, + hiveMetaMigrate), Encoders.bean(RewriteContentFileResult.class)) // duplicates are expected here as the same data file can have different statuses // (e.g. added and deleted) @@ -518,7 +526,8 @@ private static MapFunction toManifests( String stagingLocation, int format, String sourcePrefix, - String targetPrefix) { + String targetPrefix, + boolean hiveMetaMigrate) { return manifestFile -> { RewriteContentFileResult result = new RewriteContentFileResult(); @@ -526,7 +535,13 @@ private static MapFunction toManifests( case DATA: result.appendDataFile( writeDataManifest( - manifestFile, table, stagingLocation, format, sourcePrefix, targetPrefix)); + manifestFile, + table, + stagingLocation, + format, + sourcePrefix, + targetPrefix, + hiveMetaMigrate)); break; case DELETES: result.appendDeleteFile( @@ -547,14 +562,22 @@ private static RewriteResult writeDataManifest( String stagingLocation, int format, String sourcePrefix, - String targetPrefix) { + String targetPrefix, + boolean hiveMetaMigrate) { try { String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation); FileIO io = table.getValue().io(); OutputFile outputFile = io.newOutputFile(stagingPath); Map specsById = table.getValue().specs(); return RewriteTablePathUtil.rewriteDataManifest( - manifestFile, outputFile, io, format, specsById, sourcePrefix, targetPrefix); + manifestFile, + outputFile, + io, + format, + specsById, + sourcePrefix, + targetPrefix, + hiveMetaMigrate); } catch (IOException e) { throw new RuntimeIOException(e); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/Hive2IcebergProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/Hive2IcebergProcedure.java new file mode 100644 index 000000000000..28792f871016 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/Hive2IcebergProcedure.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.io.File; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.actions.SnapshotTable; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.actions.RewriteTablePathSparkAction; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.catalyst.catalog.CatalogUtils; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.catalog.V1Table; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Hive2IcebergProcedure extends BaseProcedure { + + private static final Logger LOG = LoggerFactory.getLogger(Hive2IcebergProcedure.class); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("source_table", DataTypes.StringType), + ProcedureParameter.optional("parallelism", DataTypes.IntegerType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("imported_files_count", DataTypes.LongType, false, Metadata.empty()) + }); + + public static SparkProcedures.ProcedureBuilder builder() { + return new Builder() { + @Override + protected Hive2IcebergProcedure doBuild() { + return new Hive2IcebergProcedure(tableCatalog()); + } + }; + } + + private Hive2IcebergProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + + /////////////////////////////////// Process SnapShot/////////////////////////////////////////// + + // 1. Prepare Source Table + String sourceTable = args.getString(0); + Preconditions.checkArgument( + sourceTable != null && !sourceTable.isEmpty(), + "Cannot handle an empty identifier for argument source_table"); + + // 2. Prepare Dest Table + // Define a time formatting template. + LocalDateTime now = LocalDateTime.now(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); + String tableSuffix = now.format(formatter); + String snapShotTable = sourceTable + "_snapshot_" + tableSuffix; + Preconditions.checkArgument( + snapShotTable != null && !snapShotTable.isEmpty(), + "Cannot handle an empty identifier for argument table"); + + // 3. Prepare Table Properties + Map properties = Maps.newHashMap(); + Preconditions.checkArgument( + !sourceTable.equals(snapShotTable), + "Cannot create a hive2Iceberg snapshot with the same name as the source of the snapshot."); + SnapshotTable action = SparkActions.get().snapshotTable(sourceTable).as(snapShotTable); + + // 4. Prepare Parallelism + if (!args.isNullAt(1)) { + int parallelism = args.getInt(1); + Preconditions.checkArgument(parallelism > 0, "Parallelism should be larger than 0"); + action = action.executeWith(SparkTableUtil.migrationService(parallelism)); + } + + SnapshotTable.Result result = action.tableProperties(properties).execute(); + long dataFilesCount = result.importedDataFilesCount(); + Preconditions.checkArgument(dataFilesCount > 0, "snapShot may have failed."); + + /////////////////////////////////// Process ReWrite/////////////////////////////////////////// + + // 1. Prepare SourceTable + CatalogTable sourceSparkTable; + Identifier sourceIdentifier; + CatalogPlugin sourceCatalog; + try { + String ctx = "hive2iceberg source"; + sourceCatalog = spark().sessionState().catalogManager().currentCatalog(); + Spark3Util.CatalogAndIdentifier catalogAndIdent = + Spark3Util.catalogAndIdentifier(ctx, spark(), sourceTable, sourceCatalog); + sourceIdentifier = catalogAndIdent.identifier(); + TableCatalog tableCatalog = checkTargetCatalog(catalogAndIdent.catalog()); + V1Table targetTable = (V1Table) tableCatalog.loadTable(sourceIdentifier); + sourceSparkTable = targetTable.v1Table(); + LOG.info("hive2iceberg source table: {}.", sourceSparkTable.qualifiedName()); + } catch (Exception e) { + LOG.error("parse Source Table Error.", e); + throw new RuntimeException(e); + } + + // 2. Prepare TargetTable + SparkTable snapShotSparkTable; + try { + String ctx = "hive2iceberg target"; + CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog(); + Spark3Util.CatalogAndIdentifier catalogAndIdent = + Spark3Util.catalogAndIdentifier(ctx, spark(), snapShotTable, defaultCatalog); + snapShotSparkTable = (SparkTable) tableCatalog().loadTable(catalogAndIdent.identifier()); + LOG.info("hive2iceberg dest table: {}.", snapShotSparkTable.name()); + } catch (Exception e) { + LOG.error("parse Dest Table Error.", e); + throw new RuntimeException(e); + } + + // 3. Rewrite Table Path + String sourcePrefix = snapShotSparkTable.table().location(); + String targetPrefix = CatalogUtils.URIToString(sourceSparkTable.storage().locationUri().get()); + String stagingLocation = targetPrefix + "/metadata"; + boolean metaMigrate = true; + + RewriteTablePathSparkAction rewriteAction = + SparkActions.get().rewriteTablePath(snapShotSparkTable.table()); + if (stagingLocation != null) { + rewriteAction.stagingLocation(stagingLocation); + } + rewriteAction.hiveMetaMigrate(metaMigrate); + RewriteTablePath.Result rewrite = + rewriteAction.rewriteLocationPrefix(sourcePrefix, targetPrefix).execute(); + String fileListLocation = rewrite.fileListLocation(); + Preconditions.checkArgument( + StringUtils.isNotBlank(fileListLocation), "rewrite may have failed."); + + /////////////////////////////////// Update HMS/////////////////////////////////////////// + + if (metaMigrate) { + try { + properties.put(CatalogProperties.CATALOG_IMPL, HiveCatalog.class.getName()); + + HiveCatalog hiveCatalog = new HiveCatalog(); + hiveCatalog.setConf(spark().sparkContext().hadoopConfiguration()); + hiveCatalog.initialize("hive", properties); + + String metadataFileLocation = getMetadataLocation(snapShotSparkTable.table()); + + String targetMetadataFileLocation = + targetPrefix + "/metadata/" + fileName(metadataFileLocation); + TableCatalog targetSourceCatalog = checkTargetCatalog(sourceCatalog); + targetSourceCatalog.alterTable( + sourceIdentifier, + TableChange.setProperty("metadata_location", targetMetadataFileLocation), + TableChange.setProperty("table_type", "ICEBERG"), + TableChange.setProperty("provide", "iceberg")); + } catch (Exception e) { + LOG.error("ReWrite metaMigrate Failed.", e); + throw new RuntimeException(e); + } + } + + return new InternalRow[] {newInternalRow(result.importedDataFilesCount())}; + } + + private String getMetadataLocation(Table tbl) { + String currentMetadataPath = + ((HasTableOperations) tbl).operations().current().metadataFileLocation(); + return currentMetadataPath; + } + + protected TableCatalog checkTargetCatalog(CatalogPlugin catalog) { + // currently the import code relies on being able to look up the table in the session catalog + Preconditions.checkArgument( + catalog.name().equalsIgnoreCase("spark_catalog"), + "Cannot rewrite a table that isn't in the session catalog (i.e. spark_catalog). " + + "Found source catalog: %s.", + catalog.name()); + + Preconditions.checkArgument( + catalog instanceof TableCatalog, + "Cannot rewrite as catalog %s of class %s in not a table catalog", + catalog.name(), + catalog.getClass().getName()); + + return (TableCatalog) catalog; + } + + protected String fileName(String path) { + String filename = path; + int lastIndex = path.lastIndexOf(File.separator); + if (lastIndex != -1) { + filename = path.substring(lastIndex + 1); + } + return filename; + } + + private InternalRow[] toOutputRows(RewriteTablePath.Result result) { + return new InternalRow[] { + newInternalRow( + UTF8String.fromString(result.latestVersion()), + UTF8String.fromString(result.fileListLocation())) + }; + } + + @Override + public String description() { + return "Hive2IcebergProcedure"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java index b936dcfcedfe..0cc728714fc7 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java @@ -45,6 +45,8 @@ public class RewriteTablePathProcedure extends BaseProcedure { ProcedureParameter.optional("end_version", DataTypes.StringType); private static final ProcedureParameter STAGING_LOCATION_PARAM = ProcedureParameter.optional("staging_location", DataTypes.StringType); + private static final ProcedureParameter HIVE_META_MIGRATE_PARAM = + ProcedureParameter.optional("hive_meta_migrate", DataTypes.BooleanType); private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { @@ -53,7 +55,8 @@ public class RewriteTablePathProcedure extends BaseProcedure { TARGET_PREFIX_PARAM, START_VERSION_PARAM, END_VERSION_PARM, - STAGING_LOCATION_PARAM + STAGING_LOCATION_PARAM, + HIVE_META_MIGRATE_PARAM }; private static final StructType OUTPUT_TYPE = @@ -95,6 +98,7 @@ public InternalRow[] call(InternalRow args) { String startVersion = input.asString(START_VERSION_PARAM, null); String endVersion = input.asString(END_VERSION_PARM, null); String stagingLocation = input.asString(STAGING_LOCATION_PARAM, null); + boolean hiveMetaMigrate = input.asBoolean(HIVE_META_MIGRATE_PARAM, false); return withIcebergTable( tableIdent, @@ -110,7 +114,9 @@ public InternalRow[] call(InternalRow args) { if (stagingLocation != null) { action.stagingLocation(stagingLocation); } - + if (hiveMetaMigrate) { + action.hiveMetaMigrate(true); + } return toOutputRows(action.rewriteLocationPrefix(sourcePrefix, targetPrefix).execute()); }); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index 353970443025..febef132fe8c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -63,6 +63,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder); mapBuilder.put("compute_table_stats", ComputeTableStatsProcedure::builder); mapBuilder.put("rewrite_table_path", RewriteTablePathProcedure::builder); + mapBuilder.put("hive_to_iceberg", Hive2IcebergProcedure::builder); return mapBuilder.build(); }