Skip to content

Flink: deprecate ReaderFunction with a new Converter interface to simplify user experience#10956

Merged
stevenzwu merged 4 commits into
apache:mainfrom
stevenzwu:flip27-source-converter
Aug 21, 2024
Merged

Flink: deprecate ReaderFunction with a new Converter interface to simplify user experience#10956
stevenzwu merged 4 commits into
apache:mainfrom
stevenzwu:flip27-source-converter

Conversation

@stevenzwu
Copy link
Copy Markdown
Contributor

No description provided.


@Override
public TypeInformation getProducedType() {
return TypeInformation.of(RowData.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need the RowType here?

Copy link
Copy Markdown
Contributor Author

@stevenzwu stevenzwu Aug 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right. we need to do sth like this.

      TypeInformation<RowData> typeInfo =
          FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));

I will actually remove the whole IdentityConverter. earlier I was thinking about create a default IdentityConverter if the converter is null. but that wasn't needed anymore.

Will construct the proper type info when adding the buildStream(env) for the inferring parallelism PR.

RowDataFileScanTaskReader rowDataReader =
new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive, filters);
return new LimitableDataIterator<>(
new ConverterFileScanTaskReader<>(rowDataReader, converter),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it worth to add the converter to the reader instead of adding a new wrapper? Getting a bit hard to follow this many embedded readers

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought it too. To execute the converter in this method, we would need to override/extend the DataIterator. That is also non trivial, as DataIterator is not a pure simple CloseableIterator. Running the converter inside the ConverterFileScanTaskReader is a little simpler.

@stevenzwu stevenzwu force-pushed the flip27-source-converter branch from 4219f03 to 24e315a Compare August 19, 2024 16:17
import org.apache.iceberg.io.CloseableIterator;

@Internal
public class ConverterFileScanTaskReader<T> implements FileScanTaskReader<T> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add this as an internal class to the ConverterReaderFunction? Both classes are small, and not used elsewhere, so this could help us keeping the source file number lower.

Not a strong opinion, just an idea

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a great idea. will do

Comment on lines +76 to +79
{FileFormat.AVRO, 2, false},
{FileFormat.PARQUET, 2, true},
{FileFormat.PARQUET, 2, false},
{FileFormat.ORC, 2, true}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you opt for this exact parametrization?

I happy for having tests for Parquet in both cases, as that is the main usecase for us now:

      {FileFormat.PARQUET, 2, true},
      {FileFormat.PARQUET, 2, false},

I might have opted for testing the future features, and keep one test for backward comp, like this:

      {FileFormat.AVRO, 2, true},
      {FileFormat.PARQUET, 2, true},
      {FileFormat.PARQUET, 2, false},
      {FileFormat.ORC, 2, true}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me. will change

Copy link
Copy Markdown
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, some questions, where I have no strong opinion, but I wanted to raise them, so we decide on the consciously.

@stevenzwu stevenzwu merged commit 85cf79d into apache:main Aug 21, 2024
@stevenzwu stevenzwu deleted the flip27-source-converter branch August 21, 2024 22:05
stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Aug 22, 2024
stevenzwu added a commit that referenced this pull request Aug 22, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants