Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mr.SerializationUtil;
import org.apache.iceberg.orc.ORC;
Expand Down Expand Up @@ -252,7 +255,7 @@ public List<InputSplit> getSplits(JobContext context) {
//TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet
checkResiduals(task);
}
splits.add(new IcebergSplit(conf, task));
splits.add(new IcebergSplit(conf, task, fileIO(table), table.encryption()));
});
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close table scan: %s", scan);
Expand All @@ -273,6 +276,14 @@ private static void checkResiduals(CombinedScanTask task) {
});
}

private FileIO fileIO(Table table) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:seems like this method can be made static?

if (table.io() instanceof HadoopFileIO) {
return new HadoopFileIO(((HadoopFileIO) table.io()).conf());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this create a new HadoopFileIO? The table's FileIO should work fine.

I think this might be based on the handling in Spark, which creates a new one that wraps a broadcasted manifest. But there is no broadcast for MR so I don't think we need this method. It should work fine to use table.io().

} else {
return table.io();
}
}

@Override
public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new IcebergRecordReader<>();
Expand All @@ -290,12 +301,16 @@ private static final class IcebergRecordReader<T> extends RecordReader<Void, T>
private T currentRow;
private Iterator<T> currentIterator;
private Closeable currentCloseable;
private FileIO io;
private EncryptionManager encryptionManager;

@Override
public void initialize(InputSplit split, TaskAttemptContext newContext) {
Configuration conf = newContext.getConfiguration();
// For now IcebergInputFormat does its own split planning and does not accept FileSplit instances
CombinedScanTask task = ((IcebergSplit) split).task;
this.io = ((IcebergSplit) split).io;
this.encryptionManager = ((IcebergSplit) split).encryptionManager;
this.context = newContext;
this.tasks = task.files().iterator();
this.tableSchema = SchemaParser.fromJson(conf.get(TABLE_SCHEMA));
Expand Down Expand Up @@ -379,8 +394,10 @@ private Iterator<T> open(FileScanTask currentTask) {

private Iterator<T> open(FileScanTask currentTask, Schema readSchema) {
DataFile file = currentTask.file();
// TODO we should make use of FileIO to create inputFile
InputFile inputFile = HadoopInputFile.fromLocation(file.path(), context.getConfiguration());
InputFile inputFile = encryptionManager.decrypt(EncryptedFiles.encryptedInput(
io.newInputFile(file.path().toString()),
file.keyMetadata()));

CloseableIterable<T> iterable;
switch (file.format()) {
case AVRO:
Expand Down Expand Up @@ -543,10 +560,14 @@ static class IcebergSplit extends InputSplit implements Writable {
private CombinedScanTask task;
private transient String[] locations;
private transient Configuration conf;
private FileIO io;
private EncryptionManager encryptionManager;

IcebergSplit(Configuration conf, CombinedScanTask task) {
IcebergSplit(Configuration conf, CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
this.task = task;
this.conf = conf;
this.io = io;
this.encryptionManager = encryptionManager;
}

@Override
Expand All @@ -572,13 +593,29 @@ public void write(DataOutput out) throws IOException {
byte[] data = SerializationUtil.serializeToBytes(this.task);
out.writeInt(data.length);
out.write(data);

byte[] ioData = SerializationUtil.serializeToBytes(io);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdsr, I just noticed this. Do we need custom Serializable implementations? If all of the fields are Serializable and this is Serializable then the default implementation should just work.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to verify this, we can add a test for serializability, too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. I'll double check this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, can you elaborate more on what is the default implementation here? I don't think there's a default implementation for Serializable in MR. There's a default impl. for Writables

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java serialization will work without implementing write and readFields as long as all of the fields in an object are Serializable. The only reason to implement this is to wrap a non-serializable object or to customize in some way how serialization happens. Because all 3 fields that need to be serialized are already Serializable (FileScanTask, FileIO, and EncryptionManager) I don't think we need these methods.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow up on this, I don't think that we need to remove it in this PR. But we should be able to remove these in a follow-up.

out.writeInt(ioData.length);
out.write(ioData);

byte[] encryptionManagerData = SerializationUtil.serializeToBytes(encryptionManager);
out.writeInt(encryptionManagerData.length);
out.write(encryptionManagerData);
}

@Override
public void readFields(DataInput in) throws IOException {
byte[] data = new byte[in.readInt()];
in.readFully(data);
this.task = SerializationUtil.deserializeFromBytes(data);

byte[] ioData = new byte[in.readInt()];
in.readFully(ioData);
this.io = SerializationUtil.deserializeFromBytes(ioData);

byte[] encryptionManagerData = new byte[in.readInt()];
in.readFully(encryptionManagerData);
this.encryptionManager = SerializationUtil.deserializeFromBytes(encryptionManagerData);
}
}
}