From d34332e882d89a0d005947ed41b5da62472727ec Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 29 Apr 2020 20:18:52 +0800 Subject: [PATCH] Using FileIO and EncryptionManager for MR InputFiles --- .../mr/mapreduce/IcebergInputFormat.java | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 0d35644ab89f..c7589fb63217 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -61,14 +61,17 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mr.SerializationUtil; import org.apache.iceberg.orc.ORC; @@ -252,7 +255,7 @@ public List getSplits(JobContext context) { //TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet checkResiduals(task); } - splits.add(new IcebergSplit(conf, task)); + splits.add(new IcebergSplit(conf, task, fileIO(table), table.encryption())); }); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); @@ -273,6 +276,14 @@ private static void checkResiduals(CombinedScanTask task) { }); } + private FileIO fileIO(Table table) { + if (table.io() instanceof HadoopFileIO) { + return new HadoopFileIO(((HadoopFileIO) table.io()).conf()); + } else { + return table.io(); + } + } + @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { return new IcebergRecordReader<>(); @@ -290,12 +301,16 @@ private static final class IcebergRecordReader extends RecordReader private T currentRow; private Iterator currentIterator; private Closeable currentCloseable; + private FileIO io; + private EncryptionManager encryptionManager; @Override public void initialize(InputSplit split, TaskAttemptContext newContext) { Configuration conf = newContext.getConfiguration(); // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances CombinedScanTask task = ((IcebergSplit) split).task; + this.io = ((IcebergSplit) split).io; + this.encryptionManager = ((IcebergSplit) split).encryptionManager; this.context = newContext; this.tasks = task.files().iterator(); this.tableSchema = SchemaParser.fromJson(conf.get(TABLE_SCHEMA)); @@ -379,8 +394,10 @@ private Iterator open(FileScanTask currentTask) { private Iterator open(FileScanTask currentTask, Schema readSchema) { DataFile file = currentTask.file(); - // TODO we should make use of FileIO to create inputFile - InputFile inputFile = HadoopInputFile.fromLocation(file.path(), context.getConfiguration()); + InputFile inputFile = encryptionManager.decrypt(EncryptedFiles.encryptedInput( + io.newInputFile(file.path().toString()), + file.keyMetadata())); + CloseableIterable iterable; switch (file.format()) { case AVRO: @@ -543,10 +560,14 @@ static class IcebergSplit extends InputSplit implements Writable { private CombinedScanTask task; private transient String[] locations; private transient Configuration conf; + private FileIO io; + private EncryptionManager encryptionManager; - IcebergSplit(Configuration conf, CombinedScanTask task) { + IcebergSplit(Configuration conf, CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) { this.task = task; this.conf = conf; + this.io = io; + this.encryptionManager = encryptionManager; } @Override @@ -572,6 +593,14 @@ public void write(DataOutput out) throws IOException { byte[] data = SerializationUtil.serializeToBytes(this.task); out.writeInt(data.length); out.write(data); + + byte[] ioData = SerializationUtil.serializeToBytes(io); + out.writeInt(ioData.length); + out.write(ioData); + + byte[] encryptionManagerData = SerializationUtil.serializeToBytes(encryptionManager); + out.writeInt(encryptionManagerData.length); + out.write(encryptionManagerData); } @Override @@ -579,6 +608,14 @@ public void readFields(DataInput in) throws IOException { byte[] data = new byte[in.readInt()]; in.readFully(data); this.task = SerializationUtil.deserializeFromBytes(data); + + byte[] ioData = new byte[in.readInt()]; + in.readFully(ioData); + this.io = SerializationUtil.deserializeFromBytes(ioData); + + byte[] encryptionManagerData = new byte[in.readInt()]; + in.readFully(encryptionManagerData); + this.encryptionManager = SerializationUtil.deserializeFromBytes(encryptionManagerData); } } }