MR: apply row-level delete files when reading#1497
Conversation
|
|
||
| DeleteFile eqDeletes = FileHelpers.writeDeleteFile( | ||
| table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema); | ||
| table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, dataSchema); |
There was a problem hiding this comment.
Can you import this class directly to avoid so many changes in this file?
| .forEach(set::add); | ||
| return set; | ||
| } | ||
|
|
There was a problem hiding this comment.
Nit: unnecessary whitespace change
| checkResiduals(task); | ||
| } | ||
| splits.add(new IcebergSplit(conf, task)); | ||
| splits.add(new IcebergSplit(conf, task, table.io(), table.encryption())); |
There was a problem hiding this comment.
While I would like to get the encryption manager and io changes in, I don't think that they should be mixed into this commit. Was it necessary to do this for some reason?
There was a problem hiding this comment.
The GenericDeleteFilter constructor needs FileIO as parameter.
There was a problem hiding this comment.
In that case, this can pass the FileIO somehow, or we can work on getting the other PR done before this one. But I don't think we should mix the two features together.
There was a problem hiding this comment.
For now, this could create a new HadoopFileIO and use that instead. That would be the easiest path forward.
| public static final Schema SCHEMA = new Schema( | ||
| required(1, "id", Types.IntegerType.get()), | ||
| required(2, "data", Types.StringType.get()) | ||
| ); |
There was a problem hiding this comment.
Why not put the schema and spec in the parent class, DeletesReadTest? The data it generates is for this schema.
| } | ||
|
|
||
| protected void generateTestData() throws IOException { | ||
| this.records = new ArrayList<>(); |
There was a problem hiding this comment.
We prefer using Lists.newArrayList()
| public void writeTestDataFile() throws IOException { | ||
| File tableDir = temp.newFolder(); | ||
| tableDir.delete(); | ||
| this.table = TestTables.create(tableDir, "test", SCHEMA, SPEC, 2); |
There was a problem hiding this comment.
I think a better way to break down the class would be to have an abstract Table createTable(String name, Schema, Spec) method. Then the table and dataFile fields don't need to be shared. I also don't think that there is a need to make records public either.
There was a problem hiding this comment.
Make sense to me. Updated.
| return iterable; | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") |
There was a problem hiding this comment.
deletes.filter(...) needs this.
| case GENERIC: | ||
| DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, tableSchema, readSchema); | ||
| Schema requiredSchema = deletes.requiredSchema(); | ||
| iter = deletes.filter(openTask(currentTask, requiredSchema)); |
There was a problem hiding this comment.
Why not return deletes.filter(...) here? That would remove the need for iter and break.
| } | ||
| } | ||
|
|
||
| return parameters; |
There was a problem hiding this comment.
Is there a simpler way to configure this? Normally, we build these using literals instead of a block of code.
There was a problem hiding this comment.
Yes, I just updated it.
| return rowSet(table, "*"); | ||
| } | ||
|
|
||
| private static StructLikeSet rowSet(Table table, String... columns) { |
There was a problem hiding this comment.
This method is what reads the rows from the table using Spark. Deleting this method and using the one in DeletesReadTest makes this test suite use the exact same read path as the generics -- IcebergGenerics.
You can probably make this method abstract and implement it in both classes to get around this. You'll also need to implement a read using the input format or Hive runner to test the Hive code.
There was a problem hiding this comment.
Sorry, I missed this. I just added this back and also use input format to read records.
8faeac6 to
84310d9
Compare
rdsr
left a comment
There was a problem hiding this comment.
LGTM!. The changes in MR to incorporate deletes are minimal and clean!
84310d9 to
e1102d7
Compare
| } | ||
|
|
||
| private abstract static class TestInputFormat<T> { | ||
| public abstract static class TestInputFormat<T> { |
There was a problem hiding this comment.
Why was this changed to public?
There was a problem hiding this comment.
This is because the testInputFormat is type of TestIcebergInputFormats.TestInputFormat.Factory<Record> which needs to access the TestInputFormat
There was a problem hiding this comment.
I tried to minimize the changes to access modifiers in TestIcebergInputFormat, the last place that needs the modifier of TestInputFormat to be public is getRecords method.
0a952f9 to
f5d382d
Compare
| records.add(record.copy("id", 122, "data", "g")); | ||
|
|
||
| this.dataFile = FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), Row.of(0), records); | ||
|
|
There was a problem hiding this comment.
Nit: whitespace-only change.
|
|
||
| @Before | ||
| public void writeTestDataFile() throws IOException { | ||
| public void prepareData() throws IOException { |
There was a problem hiding this comment.
Nit: unnecessary method renames cause more changed lines than needed.
| private List<Record> records = null; | ||
| private DataFile dataFile = null; | ||
| private DataFile dataFile; | ||
| private List<Record> records; |
There was a problem hiding this comment.
Changing the order of these two lines and dropping the default also causes unnecessary changes.
| .bucket("data", 16) | ||
| .build(); | ||
|
|
||
| protected final String testTableName = "test"; |
There was a problem hiding this comment.
I think this should be private. If it is needed by subclases, it should be passed into methods, not shared. I think this is only used by Spark, so it should be easy to fix.
3e6140b to
f20d633
Compare
|
@chenjunjiedada, I went ahead and fixed the remaining issues and opened a PR against your branch. Could you please take a look? |
|
Merged. Thanks, @chenjunjiedada! |
This applies row-level delete files when reading for IcebergInputFormat. This also includes changes from #985.
This also refactors the deletes read unit tests to a separated base test class to avoid duplication.