Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.ConverterReaderFunction;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataConverter;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
Expand Down Expand Up @@ -211,20 +213,40 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
}
}

/**
* Create a source builder.
*
* @deprecated since 1.7.0. Will be removed in 2.0.0; use{@link IcebergSource#forRowData()} or
* {@link IcebergSource#forOutputType(RowDataConverter)} instead
*/
@Deprecated
public static <T> Builder<T> builder() {
return new Builder<>();
}

/** Create a source builder for RowData output type. */
public static Builder<RowData> forRowData() {
return new Builder<>();
}

/**
* Create a source builder that would convert {@link RowData} to the output type {@code T}.
*
* @param converter convert {@link RowData} to output type {@code T}
* @param <T> output type
* @return an IcebergSource builder
*/
public static <T> Builder<T> forOutputType(RowDataConverter<T> converter) {
return new Builder<T>().converter(converter);
}

public static class Builder<T> {
private TableLoader tableLoader;
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private ReaderFunction<T> readerFunction;
private RowDataConverter<T> converter;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
private TableSchema projectedFlinkSchema;
Expand Down Expand Up @@ -255,11 +277,28 @@ public Builder<T> splitComparator(
return this;
}

/**
* @deprecated since 1.7.0. Will be removed in 2.0.0; use{@link
* IcebergSource#forOutputType(RowDataConverter)} instead to produce output type other than
* {@link RowData}.
*/
@Deprecated
public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
Preconditions.checkState(
converter == null,
"Cannot set reader function when builder was created via IcebergSource.forOutputType(Converter)");
this.readerFunction = newReaderFunction;
return this;
}

/**
* Don't need to be public. It is set by {@link IcebergSource#forOutputType(RowDataConverter)}.
*/
private Builder<T> converter(RowDataConverter<T> newConverter) {
this.converter = newConverter;
return this;
}

public Builder<T> flinkConfig(ReadableConfig config) {
this.flinkConfig = config;
return this;
Expand Down Expand Up @@ -510,25 +549,7 @@ public IcebergSource<T> build() {
ScanContext context = contextBuilder.build();
context.validate();
if (readerFunction == null) {
if (table instanceof BaseMetadataTable) {
MetaDataReaderFunction rowDataReaderFunction =
new MetaDataReaderFunction(
flinkConfig, table.schema(), context.project(), table.io(), table.encryption());
this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
} else {
RowDataReaderFunction rowDataReaderFunction =
new RowDataReaderFunction(
flinkConfig,
table.schema(),
context.project(),
context.nameMapping(),
context.caseSensitive(),
table.io(),
table.encryption(),
context.filters(),
context.limit());
this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
}
this.readerFunction = readerFunction(context);
}

if (splitAssignerFactory == null) {
Expand All @@ -549,5 +570,40 @@ public IcebergSource<T> build() {
table,
emitter);
}

private ReaderFunction<T> readerFunction(ScanContext context) {
if (table instanceof BaseMetadataTable) {
MetaDataReaderFunction rowDataReaderFunction =
new MetaDataReaderFunction(
flinkConfig, table.schema(), context.project(), table.io(), table.encryption());
return (ReaderFunction<T>) rowDataReaderFunction;
} else {
if (converter == null) {
return (ReaderFunction<T>)
new RowDataReaderFunction(
flinkConfig,
table.schema(),
context.project(),
context.nameMapping(),
context.caseSensitive(),
table.io(),
table.encryption(),
context.filters(),
context.limit());
} else {
return new ConverterReaderFunction<>(
converter,
flinkConfig,
table.schema(),
context.project(),
context.nameMapping(),
context.caseSensitive(),
table.io(),
table.encryption(),
context.filters(),
context.limit());
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.avro.RowDataToAvroConverters;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.flink.FlinkSchemaUtil;

public class AvroGenericRecordConverter implements RowDataConverter<GenericRecord> {
private final Schema avroSchema;
private final RowDataToAvroConverters.RowDataToAvroConverter flinkConverter;
private final TypeInformation<GenericRecord> outputTypeInfo;

private AvroGenericRecordConverter(Schema avroSchema, RowType rowType) {
this.avroSchema = avroSchema;
this.flinkConverter = RowDataToAvroConverters.createConverter(rowType);
this.outputTypeInfo = new GenericRecordAvroTypeInfo(avroSchema);
}

public static AvroGenericRecordConverter fromIcebergSchema(
org.apache.iceberg.Schema icebergSchema, String tableName) {
RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, tableName);
return new AvroGenericRecordConverter(avroSchema, rowType);
}

public static AvroGenericRecordConverter fromAvroSchema(Schema avroSchema, String tableName) {
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema.toString());
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
RowType rowType = RowType.of(logicalType.getChildren().toArray(new LogicalType[0]));
return new AvroGenericRecordConverter(avroSchema, rowType);
}

@Override
public GenericRecord apply(RowData rowData) {
return (GenericRecord) flinkConverter.convert(avroSchema, rowData);
}

@Override
public TypeInformation<GenericRecord> getProducedType() {
return outputTypeInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,21 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/** Read Iceberg rows as {@link GenericRecord}. */
/**
* Read Iceberg rows as {@link GenericRecord}.
*
* @deprecated since 1.7.0. Will be removed in 2.0.0; use {@link
* IcebergSource#forOutputType(RowDataConverter)} and {@link AvroGenericRecordConverter}
* instead.
*/
@Deprecated
public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
private final String tableName;
private final Schema readSchema;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.InputFilesDecryptor;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.FileScanTaskReader;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

@Internal
public class ConverterReaderFunction<T> extends DataIteratorReaderFunction<T> {
private final RowDataConverter<T> converter;
private final Schema tableSchema;
private final Schema readSchema;
private final String nameMapping;
private final boolean caseSensitive;
private final FileIO io;
private final EncryptionManager encryption;
private final List<Expression> filters;
private final long limit;

private transient RecordLimiter recordLimiter = null;

public ConverterReaderFunction(
RowDataConverter<T> converter,
ReadableConfig config,
Schema tableSchema,
Schema projectedSchema,
String nameMapping,
boolean caseSensitive,
FileIO io,
EncryptionManager encryption,
List<Expression> filters,
long limit) {
super(new ListDataIteratorBatcher<>(config));
this.converter = converter;
this.tableSchema = tableSchema;
this.readSchema = readSchema(tableSchema, projectedSchema);
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.io = io;
this.encryption = encryption;
this.filters = filters;
this.limit = limit;
}

@Override
protected DataIterator<T> createDataIterator(IcebergSourceSplit split) {
RowDataFileScanTaskReader rowDataReader =
new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive, filters);
return new LimitableDataIterator<>(
new ConverterFileScanTaskReader<>(rowDataReader, converter),
split.task(),
io,
encryption,
lazyLimiter());
}

private static Schema readSchema(Schema tableSchema, Schema projectedSchema) {
Preconditions.checkNotNull(tableSchema, "Table schema can't be null");
return projectedSchema == null ? tableSchema : projectedSchema;
}

/** Lazily create RecordLimiter to avoid the need to make it serializable */
private RecordLimiter lazyLimiter() {
if (recordLimiter == null) {
this.recordLimiter = RecordLimiter.create(limit);
}

return recordLimiter;
}

private static class ConverterFileScanTaskReader<T> implements FileScanTaskReader<T> {
private final RowDataFileScanTaskReader rowDataReader;
private final RowDataConverter<T> converter;

ConverterFileScanTaskReader(
RowDataFileScanTaskReader rowDataReader, RowDataConverter<T> converter) {
this.rowDataReader = rowDataReader;
this.converter = converter;
}

@Override
public CloseableIterator<T> open(
FileScanTask fileScanTask, InputFilesDecryptor inputFilesDecryptor) {
return CloseableIterator.transform(
rowDataReader.open(fileScanTask, inputFilesDecryptor), converter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
} else {
// return an empty result, which will lead to split fetch to be idle.
// SplitFetcherManager will then close idle fetcher.
return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
return new RecordsBySplits<>(Collections.emptyMap(), Collections.emptySet());
}
}

Expand Down
Loading