Skip to content

Flink 1.20: Update Flink to use planned Avro reads#11386

Merged
RussellSpitzer merged 1 commit into
apache:mainfrom
jbonofre:FLINK_AVRO_PLANNED_READS
Oct 29, 2024
Merged

Flink 1.20: Update Flink to use planned Avro reads#11386
RussellSpitzer merged 1 commit into
apache:mainfrom
jbonofre:FLINK_AVRO_PLANNED_READS

Conversation

@jbonofre
Copy link
Copy Markdown
Member

No description provided.

@github-actions github-actions Bot added the flink label Oct 24, 2024
@jbonofre
Copy link
Copy Markdown
Member Author

@pvary @aokolnychyi @rdblue this PR mimics what has been done in Spark to use Avro planned reads.

@jbonofre
Copy link
Copy Markdown
Member Author

It seems the upsert test is stale. I'm investigating (maybe the schema type mapping).

@jbonofre
Copy link
Copy Markdown
Member Author

The problem seems to be related to:

java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.flink.table.data.StringData (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.flink.table.data.StringData is in unnamed module of loader 'app')
        at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
        at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221)
        at org.apache.iceberg.flink.data.RowDataProjection.getValue(RowDataProjection.java:172)
        at org.apache.iceberg.flink.data.RowDataProjection.isNullAt(RowDataProjection.java:193)
        at org.apache.iceberg.flink.data.RowDataUtil.clone(RowDataUtil.java:96)
        at org.apache.iceberg.flink.source.reader.RowDataRecordFactory.clone(RowDataRecordFactory.java:71)
        at org.apache.iceberg.flink.source.reader.RowDataRecordFactory.clone(RowDataRecordFactory.java:28)
        at org.apache.iceberg.flink.source.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.next(ArrayPoolDataIteratorBatcher.java:98)
        at org.apache.iceberg.flink.source.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.next(ArrayPoolDataIteratorBatcher.java:67)
        at org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(IcebergSourceSplitReader.java:96)
        at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        ... 6 more

So it seems related to string read from the GenericRowData. I'm checking.

@jbonofre jbonofre force-pushed the FLINK_AVRO_PLANNED_READS branch from ec483e0 to 0f3ca5b Compare October 26, 2024 06:15
@jbonofre
Copy link
Copy Markdown
Member Author

I fixed the issue on ValueReaders about strings.

Avro.read(Files.localInput(recordsFile))
.project(schema)
.createReaderFunc(FlinkAvroReader::new)
.createResolvingReader(FlinkPlannedAvroReader::create)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: Do we have remaining tests for the old reader? I usually try to keep at least a few tests for the deprecated features as well, so they are not broken unintentionally by future changes (and mark them as deprecated, so we don't forget to remove them when the feature is removed).
If there are other tests? Do we test the same functions for the new reader?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I can add tests specific to the "old" FlinkAvroReader.

@jbonofre jbonofre force-pushed the FLINK_AVRO_PLANNED_READS branch 2 times, most recently from c0481ed to a4e7692 Compare October 28, 2024 06:46
@jbonofre
Copy link
Copy Markdown
Member Author

I fixed all types (primitves and arrays) reads. Tests should be happy now 😄

@jbonofre jbonofre force-pushed the FLINK_AVRO_PLANNED_READS branch from a4e7692 to 65b1165 Compare October 28, 2024 06:52
@jbonofre jbonofre added this to the Iceberg 1.7.0 milestone Oct 28, 2024
@RussellSpitzer
Copy link
Copy Markdown
Member

@pvary + @jbonofre how important is this for 1.7? If it is important we need to wrap this up ASAP

@jbonofre
Copy link
Copy Markdown
Member Author

@RussellSpitzer the planned Avro reads has been added to Spark (for Iceberg 1.7.x). This one is not a blocker for 1.7.0 but a good to have to benefit the same performance boost as Spark.

Copy link
Copy Markdown
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @pvary, but apart from that, it looks good to me 👍

@RussellSpitzer
Copy link
Copy Markdown
Member

Alright! Chatted with folks and we will do the old-path tests in a follow up.

@RussellSpitzer RussellSpitzer merged commit 602c2b2 into apache:main Oct 29, 2024
@RussellSpitzer
Copy link
Copy Markdown
Member

Thanks @jbonofre for the PR and @Fokko and @pvary for Review. Let's add some additional tests for Spark and Flink in a followup for the old path.

@pvary
Copy link
Copy Markdown
Contributor

pvary commented Nov 4, 2024

@jbonofre: I think we should backport this change to Flink 1.19, and Flink 1.18 as well.

zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants