From f99744687f2025ff50a51d36e3dd939e1574c4df Mon Sep 17 00:00:00 2001 From: liliwei Date: Thu, 13 Jan 2022 22:05:52 +0800 Subject: [PATCH 1/4] Test: unit test for parquet\avro table properties --- .../iceberg/avro/TestTableProperties.java | 77 ++++++ .../iceberg/flink/TestTableProperties.java | 229 ++++++++++++++++++ .../iceberg/parquet/TestTableProperties.java | 87 +++++++ 3 files changed, 393 insertions(+) create mode 100644 core/src/test/java/org/apache/iceberg/avro/TestTableProperties.java create mode 100644 flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java create mode 100644 parquet/src/test/java/org/apache/iceberg/parquet/TestTableProperties.java diff --git a/core/src/test/java/org/apache/iceberg/avro/TestTableProperties.java b/core/src/test/java/org/apache/iceberg/avro/TestTableProperties.java new file mode 100644 index 000000000000..418caf82c7a1 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestTableProperties.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.io.File; +import java.util.Map; +import org.apache.avro.file.DataFileConstants; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.mockito.Mockito.verify; + +public class TestTableProperties { + + public static final Schema SCHEMA = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()) + ); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Test + public void testAvroProperties() throws Exception { + ImmutableMap properties = ImmutableMap.of( + TableProperties.AVRO_COMPRESSION, DataFileConstants.SNAPPY_CODEC, + TableProperties.DEFAULT_FILE_FORMAT, FileFormat.AVRO.name()); + + File folder = TEMPORARY_FOLDER.newFolder(); + + String warehouse = folder.getAbsolutePath(); + String tablePath = warehouse.concat("/test"); + Assert.assertTrue("Should create the table path correctly.", new File(tablePath).mkdir()); + + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = new HadoopTables().create(SCHEMA, spec, properties, tablePath); + + Avro.WriteBuilder writeBuilder = Mockito.spy(Avro.write(Files.localOutput(TEMPORARY_FOLDER.newFile()))); + writeBuilder.forTable(table); + ArgumentCaptor> argument = ArgumentCaptor.forClass(Map.class); + verify(writeBuilder).setAll(argument.capture()); + Map config = argument.getValue(); + + Assert.assertEquals(DataFileConstants.SNAPPY_CODEC, config.get(TableProperties.AVRO_COMPRESSION)); + Assert.assertEquals(FileFormat.AVRO.name(), config.get(TableProperties.DEFAULT_FILE_FORMAT)); + } +} diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java new file mode 100644 index 000000000000..61a58f6bb680 --- /dev/null +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.avro.file.CodecFactory; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.common.DynFields; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.mockito.Mockito.verify; + +@RunWith(Parameterized.class) +public class TestTableProperties extends FlinkCatalogTestBase { + + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = + MiniClusterResource.createWithClassloaderCheckDisabled(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private final boolean isStreamingJob; + private final FileFormat format; + + private TableEnvironment tEnv; + + public TestTableProperties(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) { + super(catalogName, baseNamespace); + this.format = format; + this.isStreamingJob = isStreamingJob; + } + + @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") + public static Iterable parameters() { + List parameters = Lists.newArrayList(); + for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) { + for (Boolean isStreaming : new Boolean[] {true, false}) { + for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) { + String catalogName = (String) catalogParams[0]; + Namespace baseNamespace = (Namespace) catalogParams[1]; + parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); + } + } + } + return parameters; + } + + @Override + protected TableEnvironment getTableEnv() { + if (tEnv == null) { + synchronized (this) { + EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings + .newInstance(); + if (isStreamingJob) { + settingsBuilder.inStreamingMode(); + StreamExecutionEnvironment env = StreamExecutionEnvironment + .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG); + env.enableCheckpointing(400); + env.setMaxParallelism(2); + env.setParallelism(2); + tEnv = StreamTableEnvironment.create(env, settingsBuilder.build()); + } else { + settingsBuilder.inBatchMode(); + tEnv = TableEnvironment.create(settingsBuilder.build()); + } + } + } + return tEnv; + } + + @Override + @Before + public void before() { + super.before(); + sql("CREATE DATABASE %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + } + + @Override + @After + public void clean() { + sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + super.clean(); + } + + @Test + public void testParquetTableProperties() throws Exception { + if (format != FileFormat.PARQUET) { + return; + } + String parquetTableName = "test_table_parquet"; + + String groupSizeBytes = "10000"; + String pageSizeBytes = "10000"; + String dictSizeBytes = "10000"; + String compressionCodec = "uncompressed"; + + sql("CREATE TABLE %s (id int, data varchar) with (" + + "'" + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES + "'='" + groupSizeBytes + "'," + + "'" + TableProperties.PARQUET_PAGE_SIZE_BYTES + "'='" + pageSizeBytes + "'," + + "'" + TableProperties.PARQUET_DICT_SIZE_BYTES + "'='" + dictSizeBytes + "'," + + "'" + TableProperties.PARQUET_COMPRESSION + "'='" + compressionCodec + "'," + + "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='%s')", + parquetTableName, format); + Table icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, parquetTableName)); + + Parquet.WriteBuilder writeBuilder = Mockito.spy(Parquet.write(Files.localOutput(TEMPORARY_FOLDER.newFile()))); + + writeBuilder.forTable(icebergTable); + ArgumentCaptor> argument = ArgumentCaptor.forClass(Map.class); + verify(writeBuilder).setAll(argument.capture()); + Map config = argument.getValue(); + + Assert.assertEquals(groupSizeBytes, config.get(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES)); + Assert.assertEquals(pageSizeBytes, config.get(TableProperties.PARQUET_PAGE_SIZE_BYTES)); + Assert.assertEquals(dictSizeBytes, config.get(TableProperties.PARQUET_DICT_SIZE_BYTES)); + Assert.assertEquals(compressionCodec, config.get(TableProperties.PARQUET_COMPRESSION)); + Assert.assertEquals(FileFormat.PARQUET.name(), config.get(TableProperties.DEFAULT_FILE_FORMAT)); + + DynFields.BoundField createContextFunc = + DynFields.builder().hiddenImpl(Parquet.WriteBuilder.class, "createContextFunc").build(writeBuilder); + Object apply = createContextFunc.get().apply(config); + + Integer rowGroupSize = + (Integer) DynFields.builder().hiddenImpl(apply.getClass(), "rowGroupSize").build(apply).get(); + Assert.assertEquals(groupSizeBytes, String.valueOf(rowGroupSize)); + + Integer pageSize = + (Integer) DynFields.builder().hiddenImpl(apply.getClass(), "pageSize").build(apply).get(); + Assert.assertEquals(pageSizeBytes, String.valueOf(pageSize)); + + Integer dictionaryPageSize = + (Integer) DynFields.builder().hiddenImpl(apply.getClass(), "dictionaryPageSize").build(apply).get(); + Assert.assertEquals(dictSizeBytes, String.valueOf(dictionaryPageSize)); + + CompressionCodecName codec = + (CompressionCodecName) DynFields.builder().hiddenImpl(apply.getClass(), "codec").build(apply).get(); + Assert.assertEquals(CompressionCodecName.UNCOMPRESSED, codec); + + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, parquetTableName); + } + + @Test + public void testAvroTableProperties() throws Exception { + if (format != FileFormat.AVRO) { + return; + } + String avroTableName = "test_table_avro"; + String compressionCodec = "snappy"; + + sql("CREATE TABLE %s (id int, data varchar) with (" + + "'" + TableProperties.AVRO_COMPRESSION + "'='" + compressionCodec + "'," + + "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='%s')", + avroTableName, format); + Table icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, avroTableName)); + + Avro.WriteBuilder writeBuilder = Mockito.spy(Avro.write(Files.localOutput(TEMPORARY_FOLDER.newFile()))); + + writeBuilder.forTable(icebergTable); + ArgumentCaptor> argument = ArgumentCaptor.forClass(Map.class); + verify(writeBuilder).setAll(argument.capture()); + Map config = argument.getValue(); + + Assert.assertEquals(compressionCodec, config.get(TableProperties.AVRO_COMPRESSION)); + Assert.assertEquals(FileFormat.AVRO.name(), config.get(TableProperties.DEFAULT_FILE_FORMAT)); + + DynFields.BoundField createContextFunc = + DynFields.builder().hiddenImpl(Avro.WriteBuilder.class, "createContextFunc").build(writeBuilder); + Object apply = createContextFunc.get().apply(config); + + CodecFactory codecFactory = + (CodecFactory) DynFields.builder().hiddenImpl(apply.getClass(), "codec").build(apply).get(); + + Assert.assertEquals(CodecFactory.snappyCodec().toString(), codecFactory.toString()); + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, avroTableName); + } + + @Ignore // TODO: test orc table properties + public void testORCTableProperties() throws Exception { + if (format != FileFormat.ORC) { + return; + } + } +} diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestTableProperties.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestTableProperties.java new file mode 100644 index 000000000000..7cc67bc1e8c2 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestTableProperties.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.io.File; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.ArgumentCaptor; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +public class TestTableProperties { + + public static final Schema SCHEMA = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()) + ); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Test + public void testParquetProperties() throws Exception { + String groupSizeBytes = "10000"; + String pageSizeBytes = "10000"; + String dictSizeBytes = "10000"; + String compressionCodec = "uncompressed"; + + ImmutableMap properties = ImmutableMap.of( + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, groupSizeBytes, + TableProperties.PARQUET_PAGE_SIZE_BYTES, pageSizeBytes, + TableProperties.PARQUET_DICT_SIZE_BYTES, dictSizeBytes, + TableProperties.PARQUET_COMPRESSION, compressionCodec, + TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()); + + File folder = TEMPORARY_FOLDER.newFolder(); + + String warehouse = folder.getAbsolutePath(); + String tablePath = warehouse.concat("/test"); + Assert.assertTrue("Should create the table path correctly.", new File(tablePath).mkdir()); + + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = new HadoopTables().create(SCHEMA, spec, properties, tablePath); + + Parquet.WriteBuilder writeBuilder = spy(Parquet.write(Files.localOutput(TEMPORARY_FOLDER.newFile()))); + writeBuilder.forTable(table); + ArgumentCaptor> argument = ArgumentCaptor.forClass(Map.class); + verify(writeBuilder).setAll(argument.capture()); + Map config = argument.getValue(); + + Assert.assertEquals(groupSizeBytes, config.get(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES)); + Assert.assertEquals(pageSizeBytes, config.get(TableProperties.PARQUET_PAGE_SIZE_BYTES)); + Assert.assertEquals(dictSizeBytes, config.get(TableProperties.PARQUET_DICT_SIZE_BYTES)); + Assert.assertEquals(compressionCodec, config.get(TableProperties.PARQUET_COMPRESSION)); + Assert.assertEquals(FileFormat.PARQUET.name(), config.get(TableProperties.DEFAULT_FILE_FORMAT)); + } +} From 9d325218c7d8135e4d2cab49df6473ed6a147b0b Mon Sep 17 00:00:00 2001 From: liliwei Date: Tue, 18 Jan 2022 22:52:56 +0800 Subject: [PATCH 2/4] De-reflection --- .../iceberg/flink/TestTableProperties.java | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java index 61a58f6bb680..d4176d52fc85 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java @@ -21,8 +21,6 @@ import java.util.List; import java.util.Map; -import java.util.function.Function; -import org.apache.avro.file.CodecFactory; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -35,10 +33,8 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.common.DynFields; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -162,26 +158,6 @@ public void testParquetTableProperties() throws Exception { Assert.assertEquals(compressionCodec, config.get(TableProperties.PARQUET_COMPRESSION)); Assert.assertEquals(FileFormat.PARQUET.name(), config.get(TableProperties.DEFAULT_FILE_FORMAT)); - DynFields.BoundField createContextFunc = - DynFields.builder().hiddenImpl(Parquet.WriteBuilder.class, "createContextFunc").build(writeBuilder); - Object apply = createContextFunc.get().apply(config); - - Integer rowGroupSize = - (Integer) DynFields.builder().hiddenImpl(apply.getClass(), "rowGroupSize").build(apply).get(); - Assert.assertEquals(groupSizeBytes, String.valueOf(rowGroupSize)); - - Integer pageSize = - (Integer) DynFields.builder().hiddenImpl(apply.getClass(), "pageSize").build(apply).get(); - Assert.assertEquals(pageSizeBytes, String.valueOf(pageSize)); - - Integer dictionaryPageSize = - (Integer) DynFields.builder().hiddenImpl(apply.getClass(), "dictionaryPageSize").build(apply).get(); - Assert.assertEquals(dictSizeBytes, String.valueOf(dictionaryPageSize)); - - CompressionCodecName codec = - (CompressionCodecName) DynFields.builder().hiddenImpl(apply.getClass(), "codec").build(apply).get(); - Assert.assertEquals(CompressionCodecName.UNCOMPRESSED, codec); - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, parquetTableName); } @@ -209,14 +185,6 @@ public void testAvroTableProperties() throws Exception { Assert.assertEquals(compressionCodec, config.get(TableProperties.AVRO_COMPRESSION)); Assert.assertEquals(FileFormat.AVRO.name(), config.get(TableProperties.DEFAULT_FILE_FORMAT)); - DynFields.BoundField createContextFunc = - DynFields.builder().hiddenImpl(Avro.WriteBuilder.class, "createContextFunc").build(writeBuilder); - Object apply = createContextFunc.get().apply(config); - - CodecFactory codecFactory = - (CodecFactory) DynFields.builder().hiddenImpl(apply.getClass(), "codec").build(apply).get(); - - Assert.assertEquals(CodecFactory.snappyCodec().toString(), codecFactory.toString()); sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, avroTableName); } From 26a9bf7d94f226a67f1d92850fca912aa72cb65d Mon Sep 17 00:00:00 2001 From: liliwei Date: Wed, 19 Jan 2022 08:25:35 +0800 Subject: [PATCH 3/4] delete flink unit test --- .../iceberg/flink/TestTableProperties.java | 197 ------------------ 1 file changed, 197 deletions(-) delete mode 100644 flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java deleted file mode 100644 index d4176d52fc85..000000000000 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestTableProperties.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink; - -import java.util.List; -import java.util.Map; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.test.util.MiniClusterWithClientResource; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Files; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; - -import static org.mockito.Mockito.verify; - -@RunWith(Parameterized.class) -public class TestTableProperties extends FlinkCatalogTestBase { - - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - MiniClusterResource.createWithClassloaderCheckDisabled(); - - @ClassRule - public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - - private final boolean isStreamingJob; - private final FileFormat format; - - private TableEnvironment tEnv; - - public TestTableProperties(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) { - super(catalogName, baseNamespace); - this.format = format; - this.isStreamingJob = isStreamingJob; - } - - @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") - public static Iterable parameters() { - List parameters = Lists.newArrayList(); - for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) { - for (Boolean isStreaming : new Boolean[] {true, false}) { - for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) { - String catalogName = (String) catalogParams[0]; - Namespace baseNamespace = (Namespace) catalogParams[1]; - parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); - } - } - } - return parameters; - } - - @Override - protected TableEnvironment getTableEnv() { - if (tEnv == null) { - synchronized (this) { - EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings - .newInstance(); - if (isStreamingJob) { - settingsBuilder.inStreamingMode(); - StreamExecutionEnvironment env = StreamExecutionEnvironment - .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG); - env.enableCheckpointing(400); - env.setMaxParallelism(2); - env.setParallelism(2); - tEnv = StreamTableEnvironment.create(env, settingsBuilder.build()); - } else { - settingsBuilder.inBatchMode(); - tEnv = TableEnvironment.create(settingsBuilder.build()); - } - } - } - return tEnv; - } - - @Override - @Before - public void before() { - super.before(); - sql("CREATE DATABASE %s", flinkDatabase); - sql("USE CATALOG %s", catalogName); - sql("USE %s", DATABASE); - } - - @Override - @After - public void clean() { - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); - super.clean(); - } - - @Test - public void testParquetTableProperties() throws Exception { - if (format != FileFormat.PARQUET) { - return; - } - String parquetTableName = "test_table_parquet"; - - String groupSizeBytes = "10000"; - String pageSizeBytes = "10000"; - String dictSizeBytes = "10000"; - String compressionCodec = "uncompressed"; - - sql("CREATE TABLE %s (id int, data varchar) with (" + - "'" + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES + "'='" + groupSizeBytes + "'," + - "'" + TableProperties.PARQUET_PAGE_SIZE_BYTES + "'='" + pageSizeBytes + "'," + - "'" + TableProperties.PARQUET_DICT_SIZE_BYTES + "'='" + dictSizeBytes + "'," + - "'" + TableProperties.PARQUET_COMPRESSION + "'='" + compressionCodec + "'," + - "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='%s')", - parquetTableName, format); - Table icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, parquetTableName)); - - Parquet.WriteBuilder writeBuilder = Mockito.spy(Parquet.write(Files.localOutput(TEMPORARY_FOLDER.newFile()))); - - writeBuilder.forTable(icebergTable); - ArgumentCaptor> argument = ArgumentCaptor.forClass(Map.class); - verify(writeBuilder).setAll(argument.capture()); - Map config = argument.getValue(); - - Assert.assertEquals(groupSizeBytes, config.get(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES)); - Assert.assertEquals(pageSizeBytes, config.get(TableProperties.PARQUET_PAGE_SIZE_BYTES)); - Assert.assertEquals(dictSizeBytes, config.get(TableProperties.PARQUET_DICT_SIZE_BYTES)); - Assert.assertEquals(compressionCodec, config.get(TableProperties.PARQUET_COMPRESSION)); - Assert.assertEquals(FileFormat.PARQUET.name(), config.get(TableProperties.DEFAULT_FILE_FORMAT)); - - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, parquetTableName); - } - - @Test - public void testAvroTableProperties() throws Exception { - if (format != FileFormat.AVRO) { - return; - } - String avroTableName = "test_table_avro"; - String compressionCodec = "snappy"; - - sql("CREATE TABLE %s (id int, data varchar) with (" + - "'" + TableProperties.AVRO_COMPRESSION + "'='" + compressionCodec + "'," + - "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='%s')", - avroTableName, format); - Table icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, avroTableName)); - - Avro.WriteBuilder writeBuilder = Mockito.spy(Avro.write(Files.localOutput(TEMPORARY_FOLDER.newFile()))); - - writeBuilder.forTable(icebergTable); - ArgumentCaptor> argument = ArgumentCaptor.forClass(Map.class); - verify(writeBuilder).setAll(argument.capture()); - Map config = argument.getValue(); - - Assert.assertEquals(compressionCodec, config.get(TableProperties.AVRO_COMPRESSION)); - Assert.assertEquals(FileFormat.AVRO.name(), config.get(TableProperties.DEFAULT_FILE_FORMAT)); - - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, avroTableName); - } - - @Ignore // TODO: test orc table properties - public void testORCTableProperties() throws Exception { - if (format != FileFormat.ORC) { - return; - } - } -} From e0b776abe8f0ac225ef7d2157d975647c72d478f Mon Sep 17 00:00:00 2001 From: liliwei Date: Wed, 19 Jan 2022 08:27:31 +0800 Subject: [PATCH 4/4] fix format --- .../java/org/apache/iceberg/avro/TestTableProperties.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/avro/TestTableProperties.java b/core/src/test/java/org/apache/iceberg/avro/TestTableProperties.java index 418caf82c7a1..0e3fd9d15152 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestTableProperties.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestTableProperties.java @@ -36,8 +36,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; public class TestTableProperties { @@ -65,7 +65,7 @@ public void testAvroProperties() throws Exception { PartitionSpec spec = PartitionSpec.unpartitioned(); Table table = new HadoopTables().create(SCHEMA, spec, properties, tablePath); - Avro.WriteBuilder writeBuilder = Mockito.spy(Avro.write(Files.localOutput(TEMPORARY_FOLDER.newFile()))); + Avro.WriteBuilder writeBuilder = spy(Avro.write(Files.localOutput(TEMPORARY_FOLDER.newFile()))); writeBuilder.forTable(table); ArgumentCaptor> argument = ArgumentCaptor.forClass(Map.class); verify(writeBuilder).setAll(argument.capture());