Flink: apply row-level delete when reading#1517
Conversation
| Avro.ReadBuilder builder = Avro.read(getInputFile(task)) | ||
| .reuseContainers() | ||
| .project(projectedSchema) | ||
| .reuseContainers(false) |
There was a problem hiding this comment.
@JingsongLi , This is used to fix the UT. How can we copy the RowData?
There was a problem hiding this comment.
@chenjunjiedada can you please clarify for me what the UT stands for?
There was a problem hiding this comment.
I mean the unit tests in TestFlinkScan. The getRows puts each row from inputformat.nexRecord(null) into List while the row is reused when the file format is Avro, so the result in List is wrong.
I didn't find a simple way to copy the GenericRowData, so I set reuseContainer to false to align with Parquet and ORC cases. I changed it back in 8526b6d since I found the converter could be used to copy the row. But there comes new concern about the double copies for Parquet and ORC. @openinx @JingsongLi @rdblue , Should we reuse the container for Flink read?
There was a problem hiding this comment.
The inputformat.nexRecord returns reused record, it is OK in Flink.
There was a problem hiding this comment.
I think we can create a PR to reuse Parquet container for Flink and Spark.
There was a problem hiding this comment.
@JingsongLi , Thanks for your comments. I think it would be better to use an option to set reuse, let me create one.
There was a problem hiding this comment.
I am slight -1 for option, If there are no side effects, why do we need to provide this option? (testing is not a good example, we should consider user-face interface)
There was a problem hiding this comment.
And if this reuse flap is false, I think there may also be some risks.
Note in Flink and Spark reader, we are reusing binary for StringReader.
Maybe these binaries are chunk buffers that have been reused by parquet reader (CC: @rdblue), so even if reuse flag is false, users cannot assume returning row's security.
There was a problem hiding this comment.
@JingsongLi, I created #1522. We could also discuss there.
| private static StructLikeSet rowSet(Table table) throws IOException { | ||
| return rowSet(table, "*"); | ||
| private StructLikeSet rowSet(Table tbl) throws IOException { | ||
| return rowSet(tbl, "*"); |
There was a problem hiding this comment.
The checkstyle reports hidden variable issue because it has the same name between the parameter and class member. I updated the member table to testTable in the PR. will rebase this accordingly.
|
@chenjunjiedada, now that #1497 is merged, can you rebase this one on top of that? That should remove most of the test changes. |
|
Just back from holiday, I will update this tomorrow. |
|
Thanks, and welcome back. |
8526b6d to
dc2b9db
Compare
|
|
||
| Stream<EncryptedInputFile> encrypted = task.files().stream() | ||
| .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream())) | ||
| .distinct() |
There was a problem hiding this comment.
distinct compares files using equals, which is not overridden for data or delete files. This should instead use the approach that Spark uses:
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
task.files().stream()
.flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
.forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
.map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));
// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(encrypted::iterator);There was a problem hiding this comment.
I see, updated.
| private final HadoopTables tables = new HadoopTables(conf); | ||
| private final FileFormat format; | ||
|
|
||
| private String tableLocation; |
There was a problem hiding this comment.
If this used a Hive table instead, then it wouldn't be necessary to keep state that isn't passed as method arguments. I think that would be less brittle.
There was a problem hiding this comment.
Make sense to me, updated.
| FlinkSource.Builder builder = FlinkSource.forRowData().tableLoader(TableLoader.fromHadoopTable(tableLocation)); | ||
| Schema projected = testTable.schema().select(columns); | ||
| RowType rowType = FlinkSchemaUtil.convert(projected); | ||
| FlinkInputFormat inputFormat = builder.project(FlinkSchemaUtil.toSchema(rowType)).buildFormat(); |
There was a problem hiding this comment.
Why half-configure the builder above and then finish it here? I think it would be simpler to use this:
Schema projected = testTable.schema().select(columns);
RowType rowType = FlinkSchemaUtil.convert(projected);
FlinkInputFormat inputFormat = FlinkSource.forRowData()
.tableLoader(TableLoader.fromHadoopTable(tableLocation))
.project(FlinkSchemaUtil.toSchema(rowType))
.buildFormat();| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| public static List<Row> getRows(FlinkInputFormat inputFormat) throws IOException { |
There was a problem hiding this comment.
I think it is a bad practice to make helper methods in one test suite public and use them in another suite. Instead, helper methods should be moved to an appropriate test utility class. That way, we don't have test utility code that is hard to find because it lives in whatever test was written first.
There was a problem hiding this comment.
OK, I added TestHelpers to contain getRows and getRowData.
ab23ed6 to
5656be8
Compare
| inputFormat.open(s); | ||
| while (!inputFormat.reachedEnd()) { | ||
| RowData row = inputFormat.nextRecord(null); | ||
| results.add((Row) converter.toExternal(row)); |
There was a problem hiding this comment.
This seems strange to me. Why convert rows to external and convert them back to internal in the getRowData method? Why not move this implementation into getRowData and convert to external in this one?
Also, is there a better name for these methods? What about readRows or scan? Those would be a bit more clear about what is going on in these. The original was called runFormat, which is also a good name.
There was a problem hiding this comment.
inputFormat.nextRecord() returns the record which will be reused, so it needs to copy the returned RowData otherwise the element in the result list are same. Currently, there's no explicitly API in Flink to copy RowData. Only the serializer RowDataSerializer is a class in Flink for copying RowData while it is an internal class. DataStructureConverter.toExternal and toInteranl can construct the record according to RowData and Row. @JingsongLi @openinx Do you have any suggestion on this?
There was a problem hiding this comment.
I think you can get serializer from FlinkSource.Builder.build().getType().createSerializer()
Conflicts: flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
5656be8 to
75dce7f
Compare
75dce7f to
265524f
Compare
c8eecd3 to
3a61448
Compare
| private TestHelpers() { | ||
| } | ||
|
|
||
| public static RowData copyRowData(RowData from, RowType rowType) { |
There was a problem hiding this comment.
@JingsongLi , I can not use RowDataSerializer directly since the returned RowData may contain metadata column after merging with position deletes. So I created this function to do the copy job.
|
Looks like tests are failing with this: |
|
Thanks for reminding! It works before but I changed to use getter way to avoid null checking while not realize that serializer cannot copy the null value. Too late didn't wait for the build result... |
|
Thanks, @chenjunjiedada! Good to have all of the read paths updated for row-level deletes! |
This includes #1497, I will rebase when #1497 get merged.