From 32bd1bd72a9586f559b0e30b37975bac64e688e5 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 6 Apr 2020 07:52:34 -0700 Subject: [PATCH 1/5] Spark: Implement an action to remove orphan files --- .../apache/iceberg/io/LocationProvider.java | 6 + .../org/apache/iceberg/LocationProviders.java | 10 + .../iceberg/hadoop/HiddenPathFilter.java | 43 +++ .../main/java/org/apache/iceberg/Action.java | 24 ++ .../main/java/org/apache/iceberg/Actions.java | 45 +++ .../iceberg/RemoveOrphanFilesAction.java | 197 +++++++++++ .../RemoveOrphanFilesActionResult.java | 34 ++ .../iceberg/TestRemoveOrphanFilesAction.java | 327 ++++++++++++++++++ 8 files changed, 686 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/hadoop/HiddenPathFilter.java create mode 100644 spark/src/main/java/org/apache/iceberg/Action.java create mode 100644 spark/src/main/java/org/apache/iceberg/Actions.java create mode 100644 spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java create mode 100644 spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesActionResult.java create mode 100644 spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java diff --git a/api/src/main/java/org/apache/iceberg/io/LocationProvider.java b/api/src/main/java/org/apache/iceberg/io/LocationProvider.java index 58f70c1de883..e3b6174af645 100644 --- a/api/src/main/java/org/apache/iceberg/io/LocationProvider.java +++ b/api/src/main/java/org/apache/iceberg/io/LocationProvider.java @@ -29,6 +29,12 @@ * Implementations must be {@link Serializable} because instances will be serialized to tasks. */ public interface LocationProvider extends Serializable { + + /** + * Return a fully-qualified data location. + */ + String dataLocation(); + /** * Return a fully-qualified data file location for the given filename. * diff --git a/core/src/main/java/org/apache/iceberg/LocationProviders.java b/core/src/main/java/org/apache/iceberg/LocationProviders.java index af4414e9ade5..ab03a53faabe 100644 --- a/core/src/main/java/org/apache/iceberg/LocationProviders.java +++ b/core/src/main/java/org/apache/iceberg/LocationProviders.java @@ -53,6 +53,11 @@ static class DefaultLocationProvider implements LocationProvider { String.format("%s/data", tableLocation))); } + @Override + public String dataLocation() { + return dataLocation; + } + @Override public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) { return String.format("%s/%s/%s", dataLocation, spec.partitionToPath(partitionData), filename); @@ -76,6 +81,11 @@ static class ObjectStoreLocationProvider implements LocationProvider { this.context = pathContext(tableLocation); } + @Override + public String dataLocation() { + return storageLocation; + } + @Override public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) { return newDataLocation(String.format("%s/%s", spec.partitionToPath(partitionData), filename)); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HiddenPathFilter.java b/core/src/main/java/org/apache/iceberg/hadoop/HiddenPathFilter.java new file mode 100644 index 000000000000..89fd744e8c4e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/hadoop/HiddenPathFilter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hadoop; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +/** + * A {@link PathFilter} that filters out hidden paths. A path is considered to + * be hidden when the path name starts with a period ('.') or an underscore ('_'). + */ +public class HiddenPathFilter implements PathFilter { + + private static final HiddenPathFilter INSTANCE = new HiddenPathFilter(); + + private HiddenPathFilter() {} + + public static HiddenPathFilter get() { + return INSTANCE; + } + + @Override + public boolean accept(Path p) { + return !p.getName().startsWith("_") && !p.getName().startsWith("."); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/Action.java b/spark/src/main/java/org/apache/iceberg/Action.java new file mode 100644 index 000000000000..af18e40a4876 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/Action.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +public interface Action { + R execute(); +} diff --git a/spark/src/main/java/org/apache/iceberg/Actions.java b/spark/src/main/java/org/apache/iceberg/Actions.java new file mode 100644 index 000000000000..a8e431b90da4 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/Actions.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.spark.sql.SparkSession; + +public class Actions { + + private SparkSession spark; + private Table table; + + private Actions(SparkSession spark, Table table) { + this.spark = spark; + this.table = table; + } + + public static Actions forTable(SparkSession spark, Table table) { + return new Actions(spark, table); + } + + public static Actions forTable(Table table) { + return new Actions(SparkSession.active(), table); + } + + public RemoveOrphanFilesAction removeOrphanFiles() { + return new RemoveOrphanFilesAction(spark, table); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java new file mode 100644 index 000000000000..41e37160ad32 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.function.Predicate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.Tasks; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.SerializableConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import parquet.Preconditions; + +public class RemoveOrphanFilesAction implements Action { + + private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class); + + private final SparkSession spark; + private final JavaSparkContext sparkContext; + private final SerializableConfiguration hadoopConf; + private final FileIO fileIO; + private final int partitionDiscoveryParallelism; + private final String dataLocation; + + private String allDataFilesTable = null; + private Long olderThanTimestamp = null; + private boolean dryRun = false; + + RemoveOrphanFilesAction(SparkSession spark, Table table) { + this.spark = spark; + this.sparkContext = new JavaSparkContext(spark.sparkContext()); + this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf()); + this.fileIO = table.io(); + this.partitionDiscoveryParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism(); + this.dataLocation = table.locationProvider().dataLocation(); + } + + public RemoveOrphanFilesAction allDataFilesTable(String newAllDataFilesTable) { + this.allDataFilesTable = newAllDataFilesTable; + return this; + } + + public RemoveOrphanFilesAction olderThan(long newOlderThanTimestamp) { + this.olderThanTimestamp = newOlderThanTimestamp; + return this; + } + + public RemoveOrphanFilesAction dryRun(boolean newDryRun) { + this.dryRun = newDryRun; + return this; + } + + @Override + public RemoveOrphanFilesActionResult execute() { + Preconditions.checkArgument(allDataFilesTable != null, "allDataFilesTable must be set"); + Preconditions.checkArgument(olderThanTimestamp != null, "olderThanTimestamp should be set"); + + Dataset validDataFileDF = buildValidDataFileDF(); + Dataset actualDataFileDF = buildActualDataFileDF(); + + Column joinCond = validDataFileDF.col("file_path").equalTo(actualDataFileDF.col("file_path")); + List orphanDataFiles = actualDataFileDF.join(validDataFileDF, joinCond, "leftanti") + .as(Encoders.STRING()) + .collectAsList(); + + if (!dryRun) { + Tasks.foreach(orphanDataFiles) + .noRetry() + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete data file: {}", file, exc)) + .run(fileIO::deleteFile); + } + + return new RemoveOrphanFilesActionResult(orphanDataFiles); + } + + private Dataset buildValidDataFileDF() { + return spark.read().format("iceberg") + .load(allDataFilesTable) + .select("file_path") + .distinct(); + } + + private Dataset buildActualDataFileDF() { + List topLevelDirs = Lists.newArrayList(); + List matchingTopLevelFiles = Lists.newArrayList(); + + try { + Path dataPath = new Path(dataLocation); + FileSystem fs = dataPath.getFileSystem(hadoopConf.value()); + + for (FileStatus file : fs.listStatus(dataPath, HiddenPathFilter.get())) { + // TODO: handle custom metadata folders + // we need to ignore the metadata folder when data is written to the root table location + if (file.isDirectory() && !"metadata".equals(file.getPath().getName())) { + topLevelDirs.add(file.getPath().toString()); + } else if (file.isFile() && file.getModificationTime() < olderThanTimestamp) { + matchingTopLevelFiles.add(file.getPath().toString()); + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to determine top-level files and dirs"); + } + + JavaRDD matchingTopLevelFileRDD = sparkContext.parallelize(matchingTopLevelFiles, 1); + + if (topLevelDirs.isEmpty()) { + return spark.createDataset(matchingTopLevelFileRDD.rdd(), Encoders.STRING()).toDF("file_path"); + } + + int parallelism = Math.min(topLevelDirs.size(), partitionDiscoveryParallelism); + JavaRDD topLevelDirRDD = sparkContext.parallelize(topLevelDirs, parallelism); + + Broadcast conf = sparkContext.broadcast(hadoopConf); + JavaRDD matchingLeafFileRDD = topLevelDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp)); + + JavaRDD matchingFileRDD = matchingTopLevelFileRDD.union(matchingLeafFileRDD); + return spark.createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF("file_path"); + } + + private static FlatMapFunction, String> listDirsRecursively( + Broadcast conf, + long olderThanTimestamp) { + + return (FlatMapFunction, String>) dirs -> { + List files = Lists.newArrayList(); + Predicate predicate = file -> file.getModificationTime() < olderThanTimestamp; + dirs.forEachRemaining(dir -> { + List dirFiles = listDirRecursively(dir, predicate, conf.value().value()); + files.addAll(dirFiles); + }); + return files.iterator(); + }; + } + + private static List listDirRecursively(String dir, Predicate predicate, Configuration conf) { + try { + Path path = new Path(dir); + FileSystem fs = path.getFileSystem(conf); + + List childDirs = Lists.newArrayList(); + List matchingFiles = Lists.newArrayList(); + + for (FileStatus file : fs.listStatus(path, HiddenPathFilter.get())) { + if (file.isDirectory()) { + childDirs.add(file.getPath().toString()); + } else if (file.isFile() && predicate.test(file)) { + matchingFiles.add(file.getPath().toString()); + } + } + + for (String childDir : childDirs) { + List childDirFiles = listDirRecursively(childDir, predicate, conf); + matchingFiles.addAll(childDirFiles); + } + + return matchingFiles; + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesActionResult.java b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesActionResult.java new file mode 100644 index 000000000000..8d713368adf3 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesActionResult.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; + +public class RemoveOrphanFilesActionResult { + private final List dataFiles; + + public RemoveOrphanFilesActionResult(List dataFiles) { + this.dataFiles = dataFiles; + } + + public List dataFiles() { + return dataFiles; + } +} diff --git a/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java b/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java new file mode 100644 index 000000000000..505fe9511dc2 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public class TestRemoveOrphanFilesAction { + + private static final HadoopTables TABLES = new HadoopTables(new Configuration()); + private static final Schema SCHEMA = new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get()) + ); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + .truncate("c2", 2) + .identity("c3") + .build(); + + private static SparkSession spark; + + @BeforeClass + public static void startSpark() { + TestRemoveOrphanFilesAction.spark = SparkSession.builder() + .master("local[2]") + .getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestRemoveOrphanFilesAction.spark; + TestRemoveOrphanFilesAction.spark = null; + currentSpark.stop(); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + private String tableLocation = null; + + @Before + public void setupTableLocation() throws Exception { + File tableDir = temp.newFolder(); + this.tableLocation = tableDir.toURI().toString(); + } + + @Test + public void testDryRun() throws IOException, InterruptedException { + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); + + List records = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + List validFiles = spark.read().format("iceberg") + .load(tableLocation + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + Assert.assertEquals("Should be 2 valid files", 2, validFiles.size()); + + df.write().mode("append").parquet(tableLocation + "/data"); + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + List allFiles = Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get())) + .filter(FileStatus::isFile) + .map(file -> file.getPath().toString()) + .collect(Collectors.toList()); + Assert.assertEquals("Should be 3 files", 3, allFiles.size()); + + List invalidFiles = Lists.newArrayList(allFiles); + invalidFiles.removeAll(validFiles); + Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size()); + + // sleep for 1 second to unsure files will be old enough + Thread.sleep(1000); + + Actions actions = Actions.forTable(table); + + RemoveOrphanFilesActionResult result1 = actions.removeOrphanFiles() + .allDataFilesTable(tableLocation + "#all_data_files") + .olderThan(System.currentTimeMillis()) + .dryRun(true) + .execute(); + Assert.assertEquals("Action should find 1 data file", invalidFiles, result1.dataFiles()); + Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0)))); + + RemoveOrphanFilesActionResult result2 = actions.removeOrphanFiles() + .allDataFilesTable(tableLocation + "#all_data_files") + .olderThan(System.currentTimeMillis()) + .execute(); + Assert.assertEquals("Action should delete 1 data file", invalidFiles, result2.dataFiles()); + Assert.assertFalse("Invalid file should not be present", fs.exists(new Path(invalidFiles.get(0)))); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records); + expectedRecords.addAll(records); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testAllValidFilesAreKept() throws IOException, InterruptedException { + Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + Dataset df1 = spark.createDataFrame(records1, ThreeColumnRecord.class).coalesce(1); + + // original append + df1.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA") + ); + Dataset df2 = spark.createDataFrame(records2, ThreeColumnRecord.class).coalesce(1); + + // dynamic partition overwrite + df2.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("overwrite") + .save(tableLocation); + + // second append + df2.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + List snapshots = Lists.newArrayList(table.snapshots()); + + List snapshotFiles1 = snapshotFiles(snapshots.get(0).snapshotId()); + Assert.assertEquals(1, snapshotFiles1.size()); + + List snapshotFiles2 = snapshotFiles(snapshots.get(1).snapshotId()); + Assert.assertEquals(1, snapshotFiles2.size()); + + List snapshotFiles3 = snapshotFiles(snapshots.get(2).snapshotId()); + Assert.assertEquals(2, snapshotFiles3.size()); + + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/invalid/invalid"); + + // sleep for 1 second to unsure files will be old enough + Thread.sleep(1000); + + Actions actions = Actions.forTable(table); + + RemoveOrphanFilesActionResult result = actions.removeOrphanFiles() + .allDataFilesTable(tableLocation + "#all_data_files") + .olderThan(System.currentTimeMillis()) + .execute(); + + Assert.assertEquals("Should delete 4 data files", 4, result.dataFiles().size()); + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + + for (String fileLocation : snapshotFiles1) { + Assert.assertTrue("All snapshot files must remain", fs.exists(new Path(fileLocation))); + } + + for (String fileLocation : snapshotFiles2) { + Assert.assertTrue("All snapshot files must remain", fs.exists(new Path(fileLocation))); + } + + for (String fileLocation : snapshotFiles3) { + Assert.assertTrue("All snapshot files must remain", fs.exists(new Path(fileLocation))); + } + } + + @Test + public void testMetadataFolderIsIntact() throws InterruptedException { + // write data directly to the table location + Map props = Maps.newHashMap(); + props.put(TableProperties.WRITE_NEW_DATA_LOCATION, tableLocation); + Table table = TABLES.create(SCHEMA, SPEC, props, tableLocation); + + List records = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + df.write().mode("append").parquet(tableLocation + "/c2_trunc=AA/c3=AAAA"); + + // sleep for 1 second to unsure files will be old enough + Thread.sleep(1000); + + Actions actions = Actions.forTable(table); + + RemoveOrphanFilesActionResult result = actions.removeOrphanFiles() + .allDataFilesTable(tableLocation + "#all_data_files") + .olderThan(System.currentTimeMillis()) + .execute(); + + Assert.assertEquals("Should delete 1 data file", 1, result.dataFiles().size()); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + Assert.assertEquals("Rows must match", records, actualRecords); + } + + @Test + public void testOlderThanTimestamp() throws InterruptedException { + Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation); + + List records = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); + df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); + + Thread.sleep(1000); + + long timestamp = System.currentTimeMillis(); + + Thread.sleep(1000); + + df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); + + Actions actions = Actions.forTable(table); + + RemoveOrphanFilesActionResult result = actions.removeOrphanFiles() + .allDataFilesTable(tableLocation + "#all_data_files") + .olderThan(timestamp) + .execute(); + + Assert.assertEquals("Should delete only 2 data files", 2, result.dataFiles().size()); + } + + private List snapshotFiles(long snapshotId) { + return spark.read().format("iceberg") + .option("snapshot-id", snapshotId) + .load(tableLocation + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + } +} From 2c25af79c5f646b342080aa8a0a414eed0b9f6d2 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 8 Apr 2020 17:18:43 -0700 Subject: [PATCH 2/5] Rework the action to clean the whole table location --- .../apache/iceberg/io/LocationProvider.java | 5 - .../org/apache/iceberg/LocationProviders.java | 10 -- .../iceberg/RemoveOrphanFilesAction.java | 116 +++++++++++------ .../RemoveOrphanFilesActionResult.java | 34 ----- .../iceberg/TestRemoveOrphanFilesAction.java | 119 +++++++++++++++--- .../source/TestIcebergSourceHiveTables.java | 15 ++- .../source/TestIcebergSourceTablesBase.java | 43 +++++++ 7 files changed, 234 insertions(+), 108 deletions(-) delete mode 100644 spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesActionResult.java diff --git a/api/src/main/java/org/apache/iceberg/io/LocationProvider.java b/api/src/main/java/org/apache/iceberg/io/LocationProvider.java index e3b6174af645..7b273d3eabe4 100644 --- a/api/src/main/java/org/apache/iceberg/io/LocationProvider.java +++ b/api/src/main/java/org/apache/iceberg/io/LocationProvider.java @@ -30,11 +30,6 @@ */ public interface LocationProvider extends Serializable { - /** - * Return a fully-qualified data location. - */ - String dataLocation(); - /** * Return a fully-qualified data file location for the given filename. * diff --git a/core/src/main/java/org/apache/iceberg/LocationProviders.java b/core/src/main/java/org/apache/iceberg/LocationProviders.java index ab03a53faabe..af4414e9ade5 100644 --- a/core/src/main/java/org/apache/iceberg/LocationProviders.java +++ b/core/src/main/java/org/apache/iceberg/LocationProviders.java @@ -53,11 +53,6 @@ static class DefaultLocationProvider implements LocationProvider { String.format("%s/data", tableLocation))); } - @Override - public String dataLocation() { - return dataLocation; - } - @Override public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) { return String.format("%s/%s/%s", dataLocation, spec.partitionToPath(partitionData), filename); @@ -81,11 +76,6 @@ static class ObjectStoreLocationProvider implements LocationProvider { this.context = pathContext(tableLocation); } - @Override - public String dataLocation() { - return storageLocation; - } - @Override public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) { return newDataLocation(String.format("%s/%s", spec.partitionToPath(partitionData), filename)); diff --git a/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java index 41e37160ad32..bfefaeb35a52 100644 --- a/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java +++ b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.function.Consumer; import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -30,7 +31,6 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.HiddenPathFilter; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.util.Tasks; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -46,32 +46,38 @@ import org.slf4j.LoggerFactory; import parquet.Preconditions; -public class RemoveOrphanFilesAction implements Action { +public class RemoveOrphanFilesAction implements Action> { private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class); private final SparkSession spark; private final JavaSparkContext sparkContext; private final SerializableConfiguration hadoopConf; - private final FileIO fileIO; private final int partitionDiscoveryParallelism; - private final String dataLocation; + private final Table table; + private final TableOperations ops; - private String allDataFilesTable = null; + private String location = null; private Long olderThanTimestamp = null; - private boolean dryRun = false; + private Consumer deleteFunc = new Consumer() { + @Override + public void accept(String file) { + table.io().deleteFile(file); + } + }; RemoveOrphanFilesAction(SparkSession spark, Table table) { this.spark = spark; this.sparkContext = new JavaSparkContext(spark.sparkContext()); this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf()); - this.fileIO = table.io(); this.partitionDiscoveryParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism(); - this.dataLocation = table.locationProvider().dataLocation(); + this.table = table; + this.ops = ((HasTableOperations) table).operations(); + this.location = table.location(); } - public RemoveOrphanFilesAction allDataFilesTable(String newAllDataFilesTable) { - this.allDataFilesTable = newAllDataFilesTable; + public RemoveOrphanFilesAction location(String newLocation) { + this.location = newLocation; return this; } @@ -80,61 +86,88 @@ public RemoveOrphanFilesAction olderThan(long newOlderThanTimestamp) { return this; } - public RemoveOrphanFilesAction dryRun(boolean newDryRun) { - this.dryRun = newDryRun; + public RemoveOrphanFilesAction deleteWith(Consumer newDeleteFunc) { + this.deleteFunc = newDeleteFunc; return this; } @Override - public RemoveOrphanFilesActionResult execute() { - Preconditions.checkArgument(allDataFilesTable != null, "allDataFilesTable must be set"); - Preconditions.checkArgument(olderThanTimestamp != null, "olderThanTimestamp should be set"); + public List execute() { + Preconditions.checkArgument(olderThanTimestamp != null, "olderThanTimestamp must be set"); Dataset validDataFileDF = buildValidDataFileDF(); - Dataset actualDataFileDF = buildActualDataFileDF(); + Dataset validMetadataFileDF = buildValidMetadataFileDF(); + Dataset validFileDF = validDataFileDF.union(validMetadataFileDF); + Dataset actualFileDF = buildActualFileDF(); - Column joinCond = validDataFileDF.col("file_path").equalTo(actualDataFileDF.col("file_path")); - List orphanDataFiles = actualDataFileDF.join(validDataFileDF, joinCond, "leftanti") + Column joinCond = validFileDF.col("file_path").equalTo(actualFileDF.col("file_path")); + List orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti") .as(Encoders.STRING()) .collectAsList(); - if (!dryRun) { - Tasks.foreach(orphanDataFiles) - .noRetry() - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete data file: {}", file, exc)) - .run(fileIO::deleteFile); - } + Tasks.foreach(orphanFiles) + .noRetry() + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) + .run(deleteFunc::accept); - return new RemoveOrphanFilesActionResult(orphanDataFiles); + return orphanFiles; } private Dataset buildValidDataFileDF() { + String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES); return spark.read().format("iceberg") - .load(allDataFilesTable) - .select("file_path") - .distinct(); + .load(allDataFilesMetadataTable) + .select("file_path"); + } + + private Dataset buildValidMetadataFileDF() { + String allManifestsMetadataTable = metadataTableName(MetadataTableType.ALL_MANIFESTS); + Dataset manifestDF = spark.read().format("iceberg") + .load(allManifestsMetadataTable) + .selectExpr("path as file_path"); + + List otherMetadataFiles = Lists.newArrayList(); + + for (Snapshot snapshot : table.snapshots()) { + String manifestListLocation = snapshot.manifestListLocation(); + if (manifestListLocation != null) { + otherMetadataFiles.add(manifestListLocation); + } + } + + otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); + + TableMetadata metadata = ops.current(); + otherMetadataFiles.add(metadata.file().location()); + for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) { + otherMetadataFiles.add(previousMetadataFile.file()); + } + + Dataset otherMetadataFileDF = spark + .createDataset(otherMetadataFiles, Encoders.STRING()) + .toDF("file_path"); + + return manifestDF.union(otherMetadataFileDF); } - private Dataset buildActualDataFileDF() { + private Dataset buildActualFileDF() { List topLevelDirs = Lists.newArrayList(); List matchingTopLevelFiles = Lists.newArrayList(); try { - Path dataPath = new Path(dataLocation); - FileSystem fs = dataPath.getFileSystem(hadoopConf.value()); + Path path = new Path(location); + FileSystem fs = path.getFileSystem(hadoopConf.value()); - for (FileStatus file : fs.listStatus(dataPath, HiddenPathFilter.get())) { - // TODO: handle custom metadata folders - // we need to ignore the metadata folder when data is written to the root table location - if (file.isDirectory() && !"metadata".equals(file.getPath().getName())) { + for (FileStatus file : fs.listStatus(path, HiddenPathFilter.get())) { + if (file.isDirectory()) { topLevelDirs.add(file.getPath().toString()); } else if (file.isFile() && file.getModificationTime() < olderThanTimestamp) { matchingTopLevelFiles.add(file.getPath().toString()); } } } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to determine top-level files and dirs"); + throw new RuntimeIOException(e, "Failed to determine top-level files and dirs in {}", location); } JavaRDD matchingTopLevelFileRDD = sparkContext.parallelize(matchingTopLevelFiles, 1); @@ -194,4 +227,13 @@ private static List listDirRecursively(String dir, Predicate throw new RuntimeIOException(e); } } + + private String metadataTableName(MetadataTableType type) { + String tableName = table.toString(); + if (tableName.contains("/")) { + return tableName + "#" + type; + } else { + return tableName.replaceFirst("(hadoop\\.)|(hive\\.)", "") + "." + type; + } + } } diff --git a/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesActionResult.java b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesActionResult.java deleted file mode 100644 index 8d713368adf3..000000000000 --- a/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesActionResult.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg; - -import java.util.List; - -public class RemoveOrphanFilesActionResult { - private final List dataFiles; - - public RemoveOrphanFilesActionResult(List dataFiles) { - this.dataFiles = dataFiles; - } - - public List dataFiles() { - return dataFiles; - } -} diff --git a/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java b/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java index 505fe9511dc2..4d6bcaa243d7 100644 --- a/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java +++ b/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java @@ -136,19 +136,17 @@ public void testDryRun() throws IOException, InterruptedException { Actions actions = Actions.forTable(table); - RemoveOrphanFilesActionResult result1 = actions.removeOrphanFiles() - .allDataFilesTable(tableLocation + "#all_data_files") + List result1 = actions.removeOrphanFiles() .olderThan(System.currentTimeMillis()) - .dryRun(true) + .deleteWith(s -> { }) .execute(); - Assert.assertEquals("Action should find 1 data file", invalidFiles, result1.dataFiles()); + Assert.assertEquals("Action should find 1 file", invalidFiles, result1); Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0)))); - RemoveOrphanFilesActionResult result2 = actions.removeOrphanFiles() - .allDataFilesTable(tableLocation + "#all_data_files") + List result2 = actions.removeOrphanFiles() .olderThan(System.currentTimeMillis()) .execute(); - Assert.assertEquals("Action should delete 1 data file", invalidFiles, result2.dataFiles()); + Assert.assertEquals("Action should delete 1 file", invalidFiles, result2); Assert.assertFalse("Invalid file should not be present", fs.exists(new Path(invalidFiles.get(0)))); List expectedRecords = Lists.newArrayList(); @@ -218,12 +216,11 @@ public void testAllValidFilesAreKept() throws IOException, InterruptedException Actions actions = Actions.forTable(table); - RemoveOrphanFilesActionResult result = actions.removeOrphanFiles() - .allDataFilesTable(tableLocation + "#all_data_files") + List result = actions.removeOrphanFiles() .olderThan(System.currentTimeMillis()) .execute(); - Assert.assertEquals("Should delete 4 data files", 4, result.dataFiles().size()); + Assert.assertEquals("Should delete 4 files", 4, result.size()); Path dataPath = new Path(tableLocation + "/data"); FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); @@ -241,6 +238,51 @@ public void testAllValidFilesAreKept() throws IOException, InterruptedException } } + @Test + public void testWapFilesAreKept() throws InterruptedException { + Map props = Maps.newHashMap(); + props.put(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true"); + Table table = TABLES.create(SCHEMA, SPEC, props, tableLocation); + + List records = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); + + // normal write + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + spark.conf().set("spark.wap.id", "1"); + + // wap write + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + Assert.assertEquals("Should not return data from the staged snapshot", records, actualRecords); + + // sleep for 1 second to unsure files will be old enough + Thread.sleep(1000); + + Actions actions = Actions.forTable(table); + + List result = actions.removeOrphanFiles() + .olderThan(System.currentTimeMillis()) + .execute(); + + Assert.assertTrue("Should not delete any files", result.isEmpty()); + } + @Test public void testMetadataFolderIsIntact() throws InterruptedException { // write data directly to the table location @@ -266,12 +308,11 @@ public void testMetadataFolderIsIntact() throws InterruptedException { Actions actions = Actions.forTable(table); - RemoveOrphanFilesActionResult result = actions.removeOrphanFiles() - .allDataFilesTable(tableLocation + "#all_data_files") + List result = actions.removeOrphanFiles() .olderThan(System.currentTimeMillis()) .execute(); - Assert.assertEquals("Should delete 1 data file", 1, result.dataFiles().size()); + Assert.assertEquals("Should delete 1 file", 1, result.size()); Dataset resultDF = spark.read().format("iceberg").load(tableLocation); List actualRecords = resultDF @@ -308,12 +349,58 @@ public void testOlderThanTimestamp() throws InterruptedException { Actions actions = Actions.forTable(table); - RemoveOrphanFilesActionResult result = actions.removeOrphanFiles() - .allDataFilesTable(tableLocation + "#all_data_files") + List result = actions.removeOrphanFiles() .olderThan(timestamp) .execute(); - Assert.assertEquals("Should delete only 2 data files", 2, result.dataFiles().size()); + Assert.assertEquals("Should delete only 2 files", 2, result.size()); + } + + @Test + public void testRemoveUnreachableMetadataVersionFiles() throws InterruptedException { + Map props = Maps.newHashMap(); + props.put(TableProperties.WRITE_NEW_DATA_LOCATION, tableLocation); + props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, "1"); + Table table = TABLES.create(SCHEMA, SPEC, props, tableLocation); + + List records = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + // sleep for 1 second to unsure files will be old enough + Thread.sleep(1000); + + Actions actions = Actions.forTable(table); + + List result = actions.removeOrphanFiles() + .olderThan(System.currentTimeMillis()) + .execute(); + + Assert.assertEquals("Should delete 1 file", 1, result.size()); + Assert.assertTrue("Should remove v1 file", result.get(0).contains("v1.metadata.json")); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records); + expectedRecords.addAll(records); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); } private List snapshotFiles(long snapshotId) { diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java index 669c8769370f..6854a09b5d39 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java @@ -19,7 +19,10 @@ package org.apache.iceberg.spark.source; +import java.io.IOException; import java.util.HashMap; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.iceberg.PartitionSpec; @@ -78,12 +81,12 @@ public static void stopMetastoreAndSpark() { } @After - public void dropTable() throws Exception { - clients.run(client -> { - client.dropTable(TestIcebergSourceHiveTables.currentIdentifier.namespace().level(0), - TestIcebergSourceHiveTables.currentIdentifier.name()); - return null; - }); + public void dropTable() throws IOException { + Table table = catalog.loadTable(currentIdentifier); + Path tablePath = new Path(table.location()); + FileSystem fs = tablePath.getFileSystem(hiveConf); + fs.delete(tablePath, true); + catalog.dropTable(currentIdentifier, false); } @Override diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index fc27ea887f93..396afe507634 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.iceberg.Actions; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; @@ -688,4 +689,46 @@ public void testPartitionsTable() { Assert.assertEquals("Actual results should have one row", 1, actualAfterFirstCommit.size()); TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), expected.get(0), actualAfterFirstCommit.get(0)); } + + @Test + public void testRemoveOrphanFilesActionSupport() throws InterruptedException { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List records = Lists.newArrayList( + new SimpleRecord(1, "1") + ); + + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + df.write().mode("append").parquet(table.location() + "/data"); + + // sleep for 1 second to ensure files will be old enough + Thread.sleep(1000); + + Actions actions = Actions.forTable(table); + + List result1 = actions.removeOrphanFiles() + .location(table.location() + "/metadata") + .olderThan(System.currentTimeMillis()) + .execute(); + Assert.assertTrue("Should not delete any metadata files", result1.isEmpty()); + + List result2 = actions.removeOrphanFiles() + .olderThan(System.currentTimeMillis()) + .execute(); + Assert.assertEquals("Should delete 1 data file", 1, result2.size()); + + Dataset resultDF = spark.read().format("iceberg").load(loadLocation(tableIdentifier)); + List actualRecords = resultDF + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", records, actualRecords); + } } From e0c96f353f6abc6804bdf1944e8677c1a62c3191 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 8 Apr 2020 17:41:26 -0700 Subject: [PATCH 3/5] Remove extra line --- api/src/main/java/org/apache/iceberg/io/LocationProvider.java | 1 - 1 file changed, 1 deletion(-) diff --git a/api/src/main/java/org/apache/iceberg/io/LocationProvider.java b/api/src/main/java/org/apache/iceberg/io/LocationProvider.java index 7b273d3eabe4..58f70c1de883 100644 --- a/api/src/main/java/org/apache/iceberg/io/LocationProvider.java +++ b/api/src/main/java/org/apache/iceberg/io/LocationProvider.java @@ -29,7 +29,6 @@ * Implementations must be {@link Serializable} because instances will be serialized to tasks. */ public interface LocationProvider extends Serializable { - /** * Return a fully-qualified data file location for the given filename. * From f3e694856dbf5aaa0c4577dcc38ae1fce413b3fe Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 9 Apr 2020 18:58:04 -0700 Subject: [PATCH 4/5] Rework listing and add javadocs --- .../main/java/org/apache/iceberg/Action.java | 10 ++ .../iceberg/RemoveOrphanFilesAction.java | 133 +++++++++++------- .../iceberg/TestRemoveOrphanFilesAction.java | 81 ++++++++++- 3 files changed, 173 insertions(+), 51 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/Action.java b/spark/src/main/java/org/apache/iceberg/Action.java index af18e40a4876..8e2317687d78 100644 --- a/spark/src/main/java/org/apache/iceberg/Action.java +++ b/spark/src/main/java/org/apache/iceberg/Action.java @@ -19,6 +19,16 @@ package org.apache.iceberg; +/** + * An action performed on a table. + * + * @param the Java type of the result produced by this action + */ public interface Action { + /** + * Executes this action. + * + * @return the result of this action + */ R execute(); } diff --git a/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java index bfefaeb35a52..2956dc669b90 100644 --- a/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java +++ b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; @@ -44,8 +45,21 @@ import org.apache.spark.util.SerializableConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import parquet.Preconditions; +/** + * An action that removes orphan metadata and data files by listing a given location and comparing + * the actual files in that location with data and metadata files referenced by all valid snapshots. + * The location must be accessible for listing via the Hadoop {@link FileSystem}. + *

+ * By default, this action cleans up the table location returned by {@link Table#location()} and + * removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified + * by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}. + * For example, someone might point this action to the data folder to clean up only orphan data files. + * In addition, there is a way to configure an alternative delete method via {@link #deleteWith(Consumer)}. + *

+ * Note: It is dangerous to call this action with a short retention interval as it might corrupt + * the state of the table if another operation is writing at the same time. + */ public class RemoveOrphanFilesAction implements Action> { private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class); @@ -58,7 +72,7 @@ public class RemoveOrphanFilesAction implements Action> { private final TableOperations ops; private String location = null; - private Long olderThanTimestamp = null; + private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3); private Consumer deleteFunc = new Consumer() { @Override public void accept(String file) { @@ -76,16 +90,34 @@ public void accept(String file) { this.location = table.location(); } + /** + * Removes orphan files in the given location. + * + * @param newLocation a location + * @return this for method chaining + */ public RemoveOrphanFilesAction location(String newLocation) { this.location = newLocation; return this; } + /** + * Removes orphan files that are older than the given timestamp. + * + * @param newOlderThanTimestamp a timestamp in milliseconds + * @return this for method chaining + */ public RemoveOrphanFilesAction olderThan(long newOlderThanTimestamp) { this.olderThanTimestamp = newOlderThanTimestamp; return this; } + /** + * Passes an alternative delete implementation that will be used to delete orphan files. + * + * @param newDeleteFunc a delete func + * @return this for method chaining + */ public RemoveOrphanFilesAction deleteWith(Consumer newDeleteFunc) { this.deleteFunc = newDeleteFunc; return this; @@ -93,8 +125,6 @@ public RemoveOrphanFilesAction deleteWith(Consumer newDeleteFunc) { @Override public List execute() { - Preconditions.checkArgument(olderThanTimestamp != null, "olderThanTimestamp must be set"); - Dataset validDataFileDF = buildValidDataFileDF(); Dataset validMetadataFileDF = buildValidMetadataFileDF(); Dataset validFileDF = validDataFileDF.union(validMetadataFileDF); @@ -152,77 +182,63 @@ private Dataset buildValidMetadataFileDF() { } private Dataset buildActualFileDF() { - List topLevelDirs = Lists.newArrayList(); - List matchingTopLevelFiles = Lists.newArrayList(); + List subDirs = Lists.newArrayList(); + List matchingFiles = Lists.newArrayList(); - try { - Path path = new Path(location); - FileSystem fs = path.getFileSystem(hadoopConf.value()); + Predicate predicate = file -> file.getModificationTime() < olderThanTimestamp; - for (FileStatus file : fs.listStatus(path, HiddenPathFilter.get())) { - if (file.isDirectory()) { - topLevelDirs.add(file.getPath().toString()); - } else if (file.isFile() && file.getModificationTime() < olderThanTimestamp) { - matchingTopLevelFiles.add(file.getPath().toString()); - } - } - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to determine top-level files and dirs in {}", location); - } + // list at most 3 levels and only dirs that have less than 10 direct sub dirs on the driver + listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, matchingFiles); - JavaRDD matchingTopLevelFileRDD = sparkContext.parallelize(matchingTopLevelFiles, 1); + JavaRDD matchingFileRDD = sparkContext.parallelize(matchingFiles, 1); - if (topLevelDirs.isEmpty()) { - return spark.createDataset(matchingTopLevelFileRDD.rdd(), Encoders.STRING()).toDF("file_path"); + if (subDirs.isEmpty()) { + return spark.createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF("file_path"); } - int parallelism = Math.min(topLevelDirs.size(), partitionDiscoveryParallelism); - JavaRDD topLevelDirRDD = sparkContext.parallelize(topLevelDirs, parallelism); + int parallelism = Math.min(subDirs.size(), partitionDiscoveryParallelism); + JavaRDD subDirRDD = sparkContext.parallelize(subDirs, parallelism); Broadcast conf = sparkContext.broadcast(hadoopConf); - JavaRDD matchingLeafFileRDD = topLevelDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp)); + JavaRDD matchingLeafFileRDD = subDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp)); - JavaRDD matchingFileRDD = matchingTopLevelFileRDD.union(matchingLeafFileRDD); - return spark.createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF("file_path"); + JavaRDD completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD); + return spark.createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF("file_path"); } - private static FlatMapFunction, String> listDirsRecursively( - Broadcast conf, - long olderThanTimestamp) { + private static void listDirRecursively( + String dir, Predicate predicate, Configuration conf, int maxDepth, + int maxDirectSubDirs, List remainingSubDirs, List matchingFiles) { - return (FlatMapFunction, String>) dirs -> { - List files = Lists.newArrayList(); - Predicate predicate = file -> file.getModificationTime() < olderThanTimestamp; - dirs.forEachRemaining(dir -> { - List dirFiles = listDirRecursively(dir, predicate, conf.value().value()); - files.addAll(dirFiles); - }); - return files.iterator(); - }; - } + // stop listing whenever we reach the max depth + if (maxDepth <= 0) { + remainingSubDirs.add(dir); + return; + } - private static List listDirRecursively(String dir, Predicate predicate, Configuration conf) { try { Path path = new Path(dir); FileSystem fs = path.getFileSystem(conf); - List childDirs = Lists.newArrayList(); - List matchingFiles = Lists.newArrayList(); + List subDirs = Lists.newArrayList(); for (FileStatus file : fs.listStatus(path, HiddenPathFilter.get())) { if (file.isDirectory()) { - childDirs.add(file.getPath().toString()); + subDirs.add(file.getPath().toString()); } else if (file.isFile() && predicate.test(file)) { matchingFiles.add(file.getPath().toString()); } } - for (String childDir : childDirs) { - List childDirFiles = listDirRecursively(childDir, predicate, conf); - matchingFiles.addAll(childDirFiles); + // stop listing if the number of direct sub dirs is bigger than maxDirectSubDirs + if (subDirs.size() > maxDirectSubDirs) { + remainingSubDirs.addAll(subDirs); + return; } - return matchingFiles; + for (String subDir : subDirs) { + listDirRecursively(subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, matchingFiles); + } } catch (IOException e) { throw new RuntimeIOException(e); } @@ -236,4 +252,25 @@ private String metadataTableName(MetadataTableType type) { return tableName.replaceFirst("(hadoop\\.)|(hive\\.)", "") + "." + type; } } + + private static FlatMapFunction, String> listDirsRecursively( + Broadcast conf, + long olderThanTimestamp) { + + return (FlatMapFunction, String>) dirs -> { + List subDirs = Lists.newArrayList(); + List files = Lists.newArrayList(); + + Predicate predicate = file -> file.getModificationTime() < olderThanTimestamp; + + int maxDepth = Integer.MAX_VALUE; + int maxDirectSubDirs = Integer.MAX_VALUE; + + dirs.forEachRemaining(dir -> { + listDirRecursively(dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, files); + }); + + return files.iterator(); + }; + } } diff --git a/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java b/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java index 4d6bcaa243d7..9f4bc57c04dc 100644 --- a/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java +++ b/spark/src/test/java/org/apache/iceberg/TestRemoveOrphanFilesAction.java @@ -137,16 +137,21 @@ public void testDryRun() throws IOException, InterruptedException { Actions actions = Actions.forTable(table); List result1 = actions.removeOrphanFiles() + .deleteWith(s -> { }) + .execute(); + Assert.assertTrue("Default olderThan interval should be safe", result1.isEmpty()); + + List result2 = actions.removeOrphanFiles() .olderThan(System.currentTimeMillis()) .deleteWith(s -> { }) .execute(); - Assert.assertEquals("Action should find 1 file", invalidFiles, result1); + Assert.assertEquals("Action should find 1 file", invalidFiles, result2); Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0)))); - List result2 = actions.removeOrphanFiles() + List result3 = actions.removeOrphanFiles() .olderThan(System.currentTimeMillis()) .execute(); - Assert.assertEquals("Action should delete 1 file", invalidFiles, result2); + Assert.assertEquals("Action should delete 1 file", invalidFiles, result3); Assert.assertFalse("Invalid file should not be present", fs.exists(new Path(invalidFiles.get(0)))); List expectedRecords = Lists.newArrayList(); @@ -403,6 +408,76 @@ public void testRemoveUnreachableMetadataVersionFiles() throws InterruptedExcept Assert.assertEquals("Rows must match", expectedRecords, actualRecords); } + @Test + public void testManyTopLevelPartitions() throws InterruptedException { + Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation); + + List records = Lists.newArrayList(); + for (int i = 0; i < 100; i++) { + records.add(new ThreeColumnRecord(i, String.valueOf(i), String.valueOf(i))); + } + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + // sleep for 1 second to unsure files will be old enough + Thread.sleep(1000); + + Actions actions = Actions.forTable(table); + + List result = actions.removeOrphanFiles() + .olderThan(System.currentTimeMillis()) + .execute(); + + Assert.assertTrue("Should not delete any files", result.isEmpty()); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + Assert.assertEquals("Rows must match", records, actualRecords); + } + + @Test + public void testManyLeafPartitions() throws InterruptedException { + Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation); + + List records = Lists.newArrayList(); + for (int i = 0; i < 100; i++) { + records.add(new ThreeColumnRecord(i, String.valueOf(i % 3), String.valueOf(i))); + } + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + // sleep for 1 second to unsure files will be old enough + Thread.sleep(1000); + + Actions actions = Actions.forTable(table); + + List result = actions.removeOrphanFiles() + .olderThan(System.currentTimeMillis()) + .execute(); + + Assert.assertTrue("Should not delete any files", result.isEmpty()); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + Assert.assertEquals("Rows must match", records, actualRecords); + } + private List snapshotFiles(long snapshotId) { return spark.read().format("iceberg") .option("snapshot-id", snapshotId) From 6c01b75d0d738bc4b227b3deb5ffb29e73071dc6 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 9 Apr 2020 19:17:01 -0700 Subject: [PATCH 5/5] Explain metadataTableName --- .../java/org/apache/iceberg/RemoveOrphanFilesAction.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java index 2956dc669b90..a82ad2409de9 100644 --- a/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java +++ b/spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java @@ -248,8 +248,11 @@ private String metadataTableName(MetadataTableType type) { String tableName = table.toString(); if (tableName.contains("/")) { return tableName + "#" + type; - } else { + } else if (tableName.startsWith("hadoop.") || tableName.startsWith("hive.")) { + // HiveCatalog and HadoopCatalog prepend a logical name which we need to drop for Spark 2.4 return tableName.replaceFirst("(hadoop\\.)|(hive\\.)", "") + "." + type; + } else { + return tableName + "." + type; } }