diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 0555ace2ffd7..0ea4bad609df 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -62,6 +62,7 @@ jaxb-api = "2.3.1" jaxb-runtime = "2.3.3" jetty = "9.4.53.v20231009" junit = "5.10.1" +kafka = "3.6.1" kryo-shaded = "4.0.3" microprofile-openapi-api = "3.1.1" mockito = "4.11.0" @@ -152,6 +153,9 @@ jackson214-bom = { module = "com.fasterxml.jackson:jackson-bom", version.ref = jackson215-bom = { module = "com.fasterxml.jackson:jackson-bom", version.ref = "jackson215" } jaxb-api = { module = "javax.xml.bind:jaxb-api", version.ref = "jaxb-api" } jaxb-runtime = { module = "org.glassfish.jaxb:jaxb-runtime", version.ref = "jaxb-runtime" } +kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka" } +kafka-connect-api = { module = "org.apache.kafka:connect-api", version.ref = "kafka" } +kafka-connect-json = { module = "org.apache.kafka:connect-json", version.ref = "kafka" } microprofile-openapi-api = { module = "org.eclipse.microprofile.openapi:microprofile-openapi-api", version.ref = "microprofile-openapi-api" } nessie-client = { module = "org.projectnessie.nessie:nessie-client", version.ref = "nessie" } netty-buffer = { module = "io.netty:netty-buffer", version.ref = "netty-buffer" } diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index 102cf6a89283..08f044c233e9 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -30,3 +30,30 @@ project(":iceberg-kafka-connect:iceberg-kafka-connect-events") { useJUnitPlatform() } } + +project(":iceberg-kafka-connect:iceberg-kafka-connect") { + dependencies { + api project(':iceberg-api') + implementation project(':iceberg-core') + implementation project(':iceberg-common') + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation project(':iceberg-data') + implementation project(':iceberg-kafka-connect:iceberg-kafka-connect-events') + implementation platform(libs.jackson.bom) + implementation "com.fasterxml.jackson.core:jackson-core" + implementation "com.fasterxml.jackson.core:jackson-databind" + implementation libs.avro.avro + + compileOnly libs.kafka.clients + compileOnly libs.kafka.connect.api + compileOnly libs.kafka.connect.json + + testImplementation libs.hadoop3.client + testRuntimeOnly project(':iceberg-parquet') + testRuntimeOnly project(':iceberg-orc') + } + + test { + useJUnitPlatform() + } +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java index 8f184c21b401..ff86a7de78b3 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java @@ -34,7 +34,7 @@ import org.apache.iceberg.types.Types; /** Class for Avro-related utility methods. */ -class AvroUtil { +public class AvroUtil { static final Map FIELD_ID_TO_CLASS = ImmutableMap.of( DataComplete.ASSIGNMENTS_ELEMENT, diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java index 8a2cf632407d..940445a22052 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java @@ -21,6 +21,7 @@ import java.time.OffsetDateTime; import java.util.UUID; import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.types.Types.TimestampType; @@ -53,6 +54,7 @@ public CommitComplete(Schema avroSchema) { } public CommitComplete(UUID commitId, OffsetDateTime validThroughTs) { + Preconditions.checkNotNull(commitId, "Commit ID cannot be null"); this.commitId = commitId; this.validThroughTs = validThroughTs; this.avroSchema = AVRO_SCHEMA; diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java index df32ff017690..f4806278a0db 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java @@ -21,6 +21,7 @@ import java.time.OffsetDateTime; import java.util.UUID; import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; @@ -64,6 +65,8 @@ public CommitToTable( TableReference tableReference, Long snapshotId, OffsetDateTime validThroughTs) { + Preconditions.checkNotNull(commitId, "Commit ID cannot be null"); + Preconditions.checkNotNull(tableReference, "Table reference cannot be null"); this.commitId = commitId; this.tableReference = tableReference; this.snapshotId = snapshotId; diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java index e91414d2512d..1fa9ae7e0abf 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.UUID; import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types.ListType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; @@ -55,6 +56,7 @@ public DataComplete(Schema avroSchema) { } public DataComplete(UUID commitId, List assignments) { + Preconditions.checkNotNull(commitId, "Commit ID cannot be null"); this.commitId = commitId; this.assignments = assignments; this.avroSchema = AVRO_SCHEMA; diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java index 5edc401a2919..c6815925a904 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java @@ -23,6 +23,7 @@ import org.apache.avro.Schema; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types.ListType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; @@ -62,6 +63,8 @@ public DataWritten( TableReference tableReference, List dataFiles, List deleteFiles) { + Preconditions.checkNotNull(commitId, "Commit ID cannot be null"); + Preconditions.checkNotNull(tableReference, "Table reference cannot be null"); this.partitionType = partitionType; this.commitId = commitId; this.tableReference = tableReference; diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java index f8ee619142c9..8b6dbc0f45a6 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java @@ -25,6 +25,7 @@ import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.NestedField; @@ -59,6 +60,9 @@ public Event(Schema avroSchema) { } public Event(String groupId, Payload payload) { + Preconditions.checkNotNull(groupId, "Group ID cannot be null"); + Preconditions.checkNotNull(payload, "Payload cannot be null"); + this.id = UUID.randomUUID(); this.type = payload.type(); this.timestamp = OffsetDateTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.MICROS); diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java index f26a92249507..ab9d81994c3c 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java @@ -20,6 +20,7 @@ import java.util.UUID; import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.types.Types.UUIDType; @@ -45,6 +46,7 @@ public StartCommit(Schema avroSchema) { } public StartCommit(UUID commitId) { + Preconditions.checkNotNull(commitId, "Commit ID cannot be null"); this.commitId = commitId; this.avroSchema = AVRO_SCHEMA; } diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java index 4d8d43e69633..d1400f58f74c 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java @@ -27,6 +27,7 @@ import org.apache.avro.util.Utf8; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types.ListType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StringType; @@ -63,6 +64,9 @@ public TableReference(Schema avroSchema) { } public TableReference(String catalog, List namespace, String name) { + Preconditions.checkNotNull(catalog, "Catalog cannot be null"); + Preconditions.checkNotNull(namespace, "Namespace cannot be null"); + Preconditions.checkNotNull(name, "Name cannot be null"); this.catalog = catalog; this.namespace = namespace; this.name = name; diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java index 7bcf5327a8b5..6ed67f232fb5 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java @@ -21,6 +21,7 @@ import java.time.OffsetDateTime; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.NestedField; @@ -58,6 +59,7 @@ public TopicPartitionOffset(Schema avroSchema) { } public TopicPartitionOffset(String topic, int partition, Long offset, OffsetDateTime timestamp) { + Preconditions.checkNotNull(topic, "Topic cannot be null"); this.topic = topic; this.partition = partition; this.offset = offset; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java new file mode 100644 index 000000000000..aa1ecdd5d1ba --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static java.util.stream.Collectors.toList; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; +import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergSinkConfig extends AbstractConfig { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkConfig.class.getName()); + + public static final String INTERNAL_TRANSACTIONAL_SUFFIX_PROP = + "iceberg.coordinator.transactional.suffix"; + private static final String ROUTE_REGEX = "route-regex"; + private static final String ID_COLUMNS = "id-columns"; + private static final String PARTITION_BY = "partition-by"; + private static final String COMMIT_BRANCH = "commit-branch"; + + private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; + private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; + private static final String KAFKA_PROP_PREFIX = "iceberg.kafka."; + private static final String TABLE_PROP_PREFIX = "iceberg.table."; + private static final String AUTO_CREATE_PROP_PREFIX = "iceberg.tables.auto-create-props."; + private static final String WRITE_PROP_PREFIX = "iceberg.table.write-props."; + + private static final String CATALOG_NAME_PROP = "iceberg.catalog"; + private static final String TABLES_PROP = "iceberg.tables"; + private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled"; + private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field"; + private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch"; + private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns"; + private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by"; + // FIXME: add config for CDC and upsert mode + private static final String TABLES_AUTO_CREATE_ENABLED_PROP = + "iceberg.tables.auto-create-enabled"; + private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP = + "iceberg.tables.evolve-schema-enabled"; + private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP = + "iceberg.tables.schema-force-optional"; + private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP = + "iceberg.tables.schema-case-insensitive"; + private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic"; + private static final String CONTROL_GROUP_ID_PROP = "iceberg.control.group-id"; + private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms"; + private static final int COMMIT_INTERVAL_MS_DEFAULT = 300_000; + private static final String COMMIT_TIMEOUT_MS_PROP = "iceberg.control.commit.timeout-ms"; + private static final int COMMIT_TIMEOUT_MS_DEFAULT = 30_000; + private static final String COMMIT_THREADS_PROP = "iceberg.control.commit.threads"; + private static final String CONNECT_GROUP_ID_PROP = "iceberg.connect.group-id"; + private static final String HADDOP_CONF_DIR_PROP = "iceberg.hadoop-conf-dir"; + + private static final String NAME_PROP = "name"; + private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers"; + + private static final String DEFAULT_CATALOG_NAME = "iceberg"; + private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg"; + public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-"; + + public static final int SCHEMA_UPDATE_RETRIES = 2; // 3 total attempts + public static final int CREATE_TABLE_RETRIES = 2; // 3 total attempts + + @VisibleForTesting static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))"; + + public static final ConfigDef CONFIG_DEF = newConfigDef(); + + public static String version() { + String kcVersion = IcebergSinkConfig.class.getPackage().getImplementationVersion(); + if (kcVersion == null) { + kcVersion = "unknown"; + } + return IcebergBuild.version() + "-kc-" + kcVersion; + } + + private static ConfigDef newConfigDef() { + ConfigDef configDef = new ConfigDef(); + configDef.define( + TABLES_PROP, + ConfigDef.Type.LIST, + null, + Importance.HIGH, + "Comma-delimited list of destination tables"); + configDef.define( + TABLES_DYNAMIC_PROP, + ConfigDef.Type.BOOLEAN, + false, + Importance.MEDIUM, + "Enable dynamic routing to tables based on a record value"); + configDef.define( + TABLES_ROUTE_FIELD_PROP, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Source record field for routing records to tables"); + configDef.define( + TABLES_DEFAULT_COMMIT_BRANCH, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Default branch for commits"); + configDef.define( + TABLES_DEFAULT_ID_COLUMNS, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Default ID columns for tables, comma-separated"); + configDef.define( + TABLES_DEFAULT_PARTITION_BY, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Default partition spec to use when creating tables, comma-separated"); + configDef.define( + TABLES_AUTO_CREATE_ENABLED_PROP, + ConfigDef.Type.BOOLEAN, + false, + Importance.MEDIUM, + "Set to true to automatically create destination tables, false otherwise"); + configDef.define( + TABLES_SCHEMA_FORCE_OPTIONAL_PROP, + ConfigDef.Type.BOOLEAN, + false, + Importance.MEDIUM, + "Set to true to set columns as optional during table create and evolution, false to respect schema"); + configDef.define( + TABLES_SCHEMA_CASE_INSENSITIVE_PROP, + ConfigDef.Type.BOOLEAN, + false, + Importance.MEDIUM, + "Set to true to look up table columns by case-insensitive name, false for case-sensitive"); + configDef.define( + TABLES_EVOLVE_SCHEMA_ENABLED_PROP, + ConfigDef.Type.BOOLEAN, + false, + Importance.MEDIUM, + "Set to true to add any missing record fields to the table schema, false otherwise"); + configDef.define( + CATALOG_NAME_PROP, + ConfigDef.Type.STRING, + DEFAULT_CATALOG_NAME, + Importance.MEDIUM, + "Iceberg catalog name"); + configDef.define( + CONTROL_TOPIC_PROP, + ConfigDef.Type.STRING, + DEFAULT_CONTROL_TOPIC, + Importance.MEDIUM, + "Name of the control topic"); + configDef.define( + CONTROL_GROUP_ID_PROP, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Name of the consumer group to store offsets"); + configDef.define( + CONNECT_GROUP_ID_PROP, + ConfigDef.Type.STRING, + null, + Importance.LOW, + "Name of the Connect consumer group, should not be set under normal conditions"); + configDef.define( + COMMIT_INTERVAL_MS_PROP, + ConfigDef.Type.INT, + COMMIT_INTERVAL_MS_DEFAULT, + Importance.MEDIUM, + "Coordinator interval for performing Iceberg table commits, in millis"); + configDef.define( + COMMIT_TIMEOUT_MS_PROP, + ConfigDef.Type.INT, + COMMIT_TIMEOUT_MS_DEFAULT, + Importance.MEDIUM, + "Coordinator time to wait for worker responses before committing, in millis"); + configDef.define( + COMMIT_THREADS_PROP, + ConfigDef.Type.INT, + Runtime.getRuntime().availableProcessors() * 2, + Importance.MEDIUM, + "Coordinator threads to use for table commits, default is (cores * 2)"); + configDef.define( + HADDOP_CONF_DIR_PROP, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Coordinator threads to use for table commits, default is (cores * 2)"); + return configDef; + } + + private final Map originalProps; + private final Map catalogProps; + private final Map hadoopProps; + private final Map kafkaProps; + private final Map autoCreateProps; + private final Map writeProps; + private final Map tableConfigMap = Maps.newHashMap(); + private final JsonConverter jsonConverter; + + public IcebergSinkConfig(Map originalProps) { + super(CONFIG_DEF, originalProps); + this.originalProps = originalProps; + + this.catalogProps = PropertyUtil.propertiesWithPrefix(originalProps, CATALOG_PROP_PREFIX); + this.hadoopProps = PropertyUtil.propertiesWithPrefix(originalProps, HADOOP_PROP_PREFIX); + + this.kafkaProps = Maps.newHashMap(loadWorkerProps()); + kafkaProps.putAll(PropertyUtil.propertiesWithPrefix(originalProps, KAFKA_PROP_PREFIX)); + + this.autoCreateProps = + PropertyUtil.propertiesWithPrefix(originalProps, AUTO_CREATE_PROP_PREFIX); + this.writeProps = PropertyUtil.propertiesWithPrefix(originalProps, WRITE_PROP_PREFIX); + + this.jsonConverter = new JsonConverter(); + jsonConverter.configure( + ImmutableMap.of( + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, + false, + ConverterConfig.TYPE_CONFIG, + ConverterType.VALUE.getName())); + + validate(); + } + + private void validate() { + checkState(!catalogProps().isEmpty(), "Must specify Iceberg catalog properties"); + if (tables() != null) { + checkState(!dynamicTablesEnabled(), "Cannot specify both static and dynamic table names"); + } else if (dynamicTablesEnabled()) { + checkState( + tablesRouteField() != null, "Must specify a route field if using dynamic table names"); + } else { + throw new ConfigException("Must specify table name(s)"); + } + } + + private void checkState(boolean condition, String msg) { + if (!condition) { + throw new ConfigException(msg); + } + } + + public String connectorName() { + return originalProps.get(NAME_PROP); + } + + public String transactionalSuffix() { + // this is for internal use and is not part of the config definition... + return originalProps.get(INTERNAL_TRANSACTIONAL_SUFFIX_PROP); + } + + public Map catalogProps() { + return catalogProps; + } + + public Map hadoopProps() { + return hadoopProps; + } + + public Map kafkaProps() { + return kafkaProps; + } + + public Map autoCreateProps() { + return autoCreateProps; + } + + public Map writeProps() { + return writeProps; + } + + public String catalogName() { + return getString(CATALOG_NAME_PROP); + } + + public List tables() { + return getList(TABLES_PROP); + } + + public boolean dynamicTablesEnabled() { + return getBoolean(TABLES_DYNAMIC_PROP); + } + + public String tablesRouteField() { + return getString(TABLES_ROUTE_FIELD_PROP); + } + + public String tablesDefaultCommitBranch() { + return getString(TABLES_DEFAULT_COMMIT_BRANCH); + } + + public String tablesDefaultIdColumns() { + return getString(TABLES_DEFAULT_ID_COLUMNS); + } + + public String tablesDefaultPartitionBy() { + return getString(TABLES_DEFAULT_PARTITION_BY); + } + + public TableSinkConfig tableConfig(String tableName) { + return tableConfigMap.computeIfAbsent( + tableName, + notUsed -> { + Map tableConfig = + PropertyUtil.propertiesWithPrefix(originalProps, TABLE_PROP_PREFIX + tableName + "."); + + String routeRegexStr = tableConfig.get(ROUTE_REGEX); + Pattern routeRegex = routeRegexStr == null ? null : Pattern.compile(routeRegexStr); + + String idColumnsStr = tableConfig.getOrDefault(ID_COLUMNS, tablesDefaultIdColumns()); + List idColumns = stringToList(idColumnsStr, ","); + + String partitionByStr = + tableConfig.getOrDefault(PARTITION_BY, tablesDefaultPartitionBy()); + List partitionBy = stringToList(partitionByStr, COMMA_NO_PARENS_REGEX); + + String commitBranch = + tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch()); + + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch); + }); + } + + @VisibleForTesting + static List stringToList(String value, String regex) { + if (value == null || value.isEmpty()) { + return ImmutableList.of(); + } + + return Arrays.stream(value.split(regex)).map(String::trim).collect(toList()); + } + + public String controlTopic() { + return getString(CONTROL_TOPIC_PROP); + } + + public String controlGroupId() { + String result = getString(CONTROL_GROUP_ID_PROP); + if (result != null) { + return result; + } + String connectorName = connectorName(); + Preconditions.checkNotNull(connectorName, "Connector name cannot be null"); + return DEFAULT_CONTROL_GROUP_PREFIX + connectorName; + } + + public String connectGroupId() { + String result = getString(CONNECT_GROUP_ID_PROP); + if (result != null) { + return result; + } + + String connectorName = connectorName(); + Preconditions.checkNotNull(connectorName, "Connector name cannot be null"); + return "connect-" + connectorName; + } + + public int commitIntervalMs() { + return getInt(COMMIT_INTERVAL_MS_PROP); + } + + public int commitTimeoutMs() { + return getInt(COMMIT_TIMEOUT_MS_PROP); + } + + public int commitThreads() { + return getInt(COMMIT_THREADS_PROP); + } + + public String hadoopConfDir() { + return getString(HADDOP_CONF_DIR_PROP); + } + + public boolean autoCreateEnabled() { + return getBoolean(TABLES_AUTO_CREATE_ENABLED_PROP); + } + + public boolean evolveSchemaEnabled() { + return getBoolean(TABLES_EVOLVE_SCHEMA_ENABLED_PROP); + } + + public boolean schemaForceOptional() { + return getBoolean(TABLES_SCHEMA_FORCE_OPTIONAL_PROP); + } + + public boolean schemaCaseInsensitive() { + return getBoolean(TABLES_SCHEMA_CASE_INSENSITIVE_PROP); + } + + public JsonConverter jsonConverter() { + return jsonConverter; + } + + /** + * This method attempts to load the Kafka Connect worker properties, which are not exposed to + * connectors. It does this by parsing the Java command used to launch the worker, extracting the + * name of the properties file, and then loading the file.
+ * The sink uses these properties, if available, when initializing its internal Kafka clients. By + * doing this, Kafka-related properties only need to be set in the worker properties and do not + * need to be duplicated in the sink config.
+ * If the worker properties cannot be loaded, then Kafka-related properties must be set via the + * `iceberg.kafka.*` sink configs. + * + * @return The Kafka Connect worker properties + */ + private Map loadWorkerProps() { + String javaCmd = System.getProperty("sun.java.command"); + if (javaCmd != null && !javaCmd.isEmpty()) { + List args = Splitter.on(' ').splitToList(javaCmd); + if (args.size() > 1 + && (args.get(0).endsWith(".ConnectDistributed") + || args.get(0).endsWith(".ConnectStandalone"))) { + Properties result = new Properties(); + try (InputStream in = Files.newInputStream(Paths.get(args.get(1)))) { + result.load(in); + // sanity check that this is the config we want + if (result.containsKey(BOOTSTRAP_SERVERS_PROP)) { + return Maps.fromProperties(result); + } + } catch (Exception e) { + // NO-OP + } + } + } + LOG.info( + "Worker properties not loaded, using only {}* properties for Kafka clients", + KAFKA_PROP_PREFIX); + return ImmutableMap.of(); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java new file mode 100644 index 000000000000..485b209302d5 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static java.util.stream.Collectors.toList; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.IntStream; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.sink.SinkConnector; + +public class IcebergSinkConnector extends SinkConnector { + + private Map props; + + @Override + public String version() { + return IcebergSinkConfig.version(); + } + + @Override + public void start(Map connectorProps) { + this.props = connectorProps; + } + + @Override + public Class taskClass() { + // FIXME: update this when the connector channel is added + // return IcebergSinkTask.class; + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + String txnSuffix = "-txn-" + UUID.randomUUID() + "-"; + return IntStream.range(0, maxTasks) + .mapToObj( + i -> { + Map map = Maps.newHashMap(props); + map.put(IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP, txnSuffix + i); + return map; + }) + .collect(toList()); + } + + @Override + public void stop() {} + + @Override + public ConfigDef config() { + return IcebergSinkConfig.CONFIG_DEF; + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java new file mode 100644 index 000000000000..0ecde1f7dd0b --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import java.util.List; +import java.util.regex.Pattern; + +public class TableSinkConfig { + + private final Pattern routeRegex; + private final List idColumns; + private final List partitionBy; + private final String commitBranch; + + public TableSinkConfig( + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch) { + this.routeRegex = routeRegex; + this.idColumns = idColumns; + this.partitionBy = partitionBy; + this.commitBranch = commitBranch; + } + + public Pattern routeRegex() { + return routeRegex; + } + + public List idColumns() { + return idColumns; + } + + public List partitionBy() { + return partitionBy; + } + + public String commitBranch() { + return commitBranch; + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java new file mode 100644 index 000000000000..da88b3b50ffe --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; + +public class IcebergWriter implements RecordWriter { + private final Table table; + private final String tableName; + private final IcebergSinkConfig config; + private final List writerResults; + + // FIXME: update this when the record converter is added + // private RecordConverter recordConverter; + private TaskWriter writer; + + public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) { + this.table = table; + this.tableName = tableName; + this.config = config; + this.writerResults = Lists.newArrayList(); + initNewWriter(); + } + + private void initNewWriter() { + this.writer = Utilities.createTableWriter(table, tableName, config); + // FIXME: update this when the record converter is added + // this.recordConverter = new RecordConverter(table, config); + } + + @Override + public void write(SinkRecord record) { + try { + // TODO: config to handle tombstones instead of always ignoring? + if (record.value() != null) { + Record row = convertToRow(record); + + // FIXME: add CDC operation support + + writer.write(row); + } + } catch (Exception e) { + throw new DataException( + String.format( + "An error occurred converting record, topic: %s, partition, %d, offset: %d", + record.topic(), record.kafkaPartition(), record.kafkaOffset()), + e); + } + } + + private Record convertToRow(SinkRecord record) { + // FIXME: update this when the record converter is added + return null; + } + + private void flush() { + WriteResult writeResult; + try { + writeResult = writer.complete(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + writerResults.add( + new WriterResult( + TableIdentifier.parse(tableName), + Arrays.asList(writeResult.dataFiles()), + Arrays.asList(writeResult.deleteFiles()), + table.spec().partitionType())); + } + + @Override + public List complete() { + flush(); + + List result = Lists.newArrayList(writerResults); + writerResults.clear(); + + return result; + } + + @Override + public void close() { + try { + writer.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java new file mode 100644 index 000000000000..67d0e850e62e --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.Tasks; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergWriterFactory { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergWriterFactory.class); + + private final Catalog catalog; + private final IcebergSinkConfig config; + + public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) { + this.catalog = catalog; + this.config = config; + } + + public RecordWriter createWriter( + String tableName, SinkRecord sample, boolean ignoreMissingTable) { + TableIdentifier identifier = TableIdentifier.parse(tableName); + Table table; + try { + table = catalog.loadTable(identifier); + } catch (NoSuchTableException nst) { + if (config.autoCreateEnabled()) { + table = autoCreateTable(tableName, sample); + } else if (ignoreMissingTable) { + return new NoOpWriter(); + } else { + throw nst; + } + } + + return new IcebergWriter(table, tableName, config); + } + + @VisibleForTesting + Table autoCreateTable(String tableName, SinkRecord sample) { + StructType structType; + if (sample.valueSchema() == null) { + Type type = SchemaUtils.inferIcebergType(sample.value(), config); + if (type == null) { + throw new DataException("Unable to create table from empty object"); + } + structType = type.asStructType(); + } else { + structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType(); + } + + org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields()); + TableIdentifier identifier = TableIdentifier.parse(tableName); + + List partitionBy = config.tableConfig(tableName).partitionBy(); + PartitionSpec spec; + try { + spec = SchemaUtils.createPartitionSpec(schema, partitionBy); + } catch (Exception e) { + LOG.error( + "Unable to create partition spec {}, table {} will be unpartitioned", + partitionBy, + identifier, + e); + spec = PartitionSpec.unpartitioned(); + } + + PartitionSpec partitionSpec = spec; + AtomicReference result = new AtomicReference<>(); + Tasks.range(1) + .retry(IcebergSinkConfig.CREATE_TABLE_RETRIES) + .run( + notUsed -> { + try { + result.set( + catalog.createTable( + identifier, schema, partitionSpec, config.autoCreateProps())); + } catch (AlreadyExistsException e) { + result.set(catalog.loadTable(identifier)); + } + }); + return result.get(); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java new file mode 100644 index 000000000000..31abe09cf1a4 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.List; +import org.apache.kafka.connect.sink.SinkRecord; + +public class NoOpWriter implements RecordWriter { + @Override + public void write(SinkRecord record) { + // NO-OP + } + + @Override + public List complete() { + // NO-OP + return null; + } + + @Override + public void close() { + // NO-OP + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java new file mode 100644 index 000000000000..1d429e44e675 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; + +public class PartitionedAppendWriter extends PartitionedFanoutWriter { + + private final PartitionKey partitionKey; + private final InternalRecordWrapper wrapper; + + public PartitionedAppendWriter( + PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.partitionKey = new PartitionKey(spec, schema); + this.wrapper = new InternalRecordWrapper(schema.asStruct()); + } + + @Override + protected PartitionKey partition(Record row) { + partitionKey.partition(wrapper.wrap(row)); + return partitionKey; + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java new file mode 100644 index 000000000000..0b4d7566eab7 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWriter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.List; +import org.apache.kafka.connect.sink.SinkRecord; + +public interface RecordWriter extends Cloneable { + + void write(SinkRecord record); + + List complete(); + + void close(); +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java new file mode 100644 index 000000000000..2bb0e65f204b --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.Collection; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Type.PrimitiveType; + +public class SchemaUpdate { + + public static class Consumer { + private final Map addColumns = Maps.newHashMap(); + private final Map updateTypes = Maps.newHashMap(); + private final Map makeOptionals = Maps.newHashMap(); + + public Collection addColumns() { + return addColumns.values(); + } + + public Collection updateTypes() { + return updateTypes.values(); + } + + public Collection makeOptionals() { + return makeOptionals.values(); + } + + public boolean empty() { + return addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty(); + } + + public void addColumn(String parentName, String name, Type type) { + AddColumn addCol = new AddColumn(parentName, name, type); + addColumns.put(addCol.key(), addCol); + } + + public void updateType(String name, PrimitiveType type) { + updateTypes.put(name, new UpdateType(name, type)); + } + + public void makeOptional(String name) { + makeOptionals.put(name, new MakeOptional(name)); + } + } + + public static class AddColumn extends SchemaUpdate { + private final String parentName; + private final String name; + private final Type type; + + public AddColumn(String parentName, String name, Type type) { + this.parentName = parentName; + this.name = name; + this.type = type; + } + + public String parentName() { + return parentName; + } + + public String name() { + return name; + } + + public String key() { + return parentName == null ? name : parentName + "." + name; + } + + public Type type() { + return type; + } + } + + public static class UpdateType extends SchemaUpdate { + private final String name; + private final PrimitiveType type; + + public UpdateType(String name, PrimitiveType type) { + this.name = name; + this.type = type; + } + + public String name() { + return name; + } + + public PrimitiveType type() { + return type; + } + } + + public static class MakeOptional extends SchemaUpdate { + private final String name; + + public MakeOptional(String name) { + this.name = name; + } + + public String name() { + return name; + } + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java new file mode 100644 index 000000000000..64fa89041c29 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.data.SchemaUpdate.AddColumn; +import org.apache.iceberg.connect.data.SchemaUpdate.MakeOptional; +import org.apache.iceberg.connect.data.SchemaUpdate.UpdateType; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types.BinaryType; +import org.apache.iceberg.types.Types.BooleanType; +import org.apache.iceberg.types.Types.DateType; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.DoubleType; +import org.apache.iceberg.types.Types.FloatType; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimeType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.Tasks; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SchemaUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class); + + private static final Pattern TRANSFORM_REGEX = Pattern.compile("(\\w+)\\((.+)\\)"); + + public static PrimitiveType needsDataTypeUpdate(Type currentIcebergType, Schema valueSchema) { + if (currentIcebergType.typeId() == TypeID.FLOAT && valueSchema.type() == Schema.Type.FLOAT64) { + return DoubleType.get(); + } + if (currentIcebergType.typeId() == TypeID.INTEGER && valueSchema.type() == Schema.Type.INT64) { + return LongType.get(); + } + return null; + } + + public static void applySchemaUpdates(Table table, SchemaUpdate.Consumer updates) { + if (updates == null || updates.empty()) { + // no updates to apply + return; + } + + Tasks.range(1) + .retry(IcebergSinkConfig.SCHEMA_UPDATE_RETRIES) + .run(notUsed -> commitSchemaUpdates(table, updates)); + } + + private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updates) { + // get the latest schema in case another process updated it + table.refresh(); + + // filter out columns that have already been added + List addColumns = + updates.addColumns().stream() + .filter(addCol -> !columnExists(table.schema(), addCol)) + .collect(Collectors.toList()); + + // filter out columns that have the updated type + List updateTypes = + updates.updateTypes().stream() + .filter(updateType -> !typeMatches(table.schema(), updateType)) + .collect(Collectors.toList()); + + // filter out columns that have already been made optional + List makeOptionals = + updates.makeOptionals().stream() + .filter(makeOptional -> !isOptional(table.schema(), makeOptional)) + .collect(Collectors.toList()); + + if (addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty()) { + // no updates to apply + LOG.info("Schema for table {} already up-to-date", table.name()); + return; + } + + // apply the updates + UpdateSchema updateSchema = table.updateSchema(); + addColumns.forEach( + update -> updateSchema.addColumn(update.parentName(), update.name(), update.type())); + updateTypes.forEach(update -> updateSchema.updateColumn(update.name(), update.type())); + makeOptionals.forEach(update -> updateSchema.makeColumnOptional(update.name())); + updateSchema.commit(); + LOG.info("Schema for table {} updated with new columns", table.name()); + } + + private static boolean columnExists(org.apache.iceberg.Schema schema, AddColumn update) { + return schema.findType(update.key()) != null; + } + + private static boolean typeMatches(org.apache.iceberg.Schema schema, UpdateType update) { + Type type = schema.findType(update.name()); + if (type == null) { + throw new IllegalArgumentException("Invalid column: " + update.name()); + } + return type.typeId() == update.type().typeId(); + } + + private static boolean isOptional(org.apache.iceberg.Schema schema, MakeOptional update) { + NestedField field = schema.findField(update.name()); + if (field == null) { + throw new IllegalArgumentException("Invalid column: " + update.name()); + } + return field.isOptional(); + } + + public static PartitionSpec createPartitionSpec( + org.apache.iceberg.Schema schema, List partitionBy) { + if (partitionBy.isEmpty()) { + return PartitionSpec.unpartitioned(); + } + + PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema); + partitionBy.forEach( + partitionField -> { + Matcher matcher = TRANSFORM_REGEX.matcher(partitionField); + if (matcher.matches()) { + String transform = matcher.group(1); + switch (transform) { + case "year": + case "years": + specBuilder.year(matcher.group(2)); + break; + case "month": + case "months": + specBuilder.month(matcher.group(2)); + break; + case "day": + case "days": + specBuilder.day(matcher.group(2)); + break; + case "hour": + case "hours": + specBuilder.hour(matcher.group(2)); + break; + case "bucket": + { + Pair args = transformArgPair(matcher.group(2)); + specBuilder.bucket(args.first(), args.second()); + break; + } + case "truncate": + { + Pair args = transformArgPair(matcher.group(2)); + specBuilder.truncate(args.first(), args.second()); + break; + } + default: + throw new UnsupportedOperationException("Unsupported transform: " + transform); + } + } else { + specBuilder.identity(partitionField); + } + }); + return specBuilder.build(); + } + + private static Pair transformArgPair(String argsStr) { + List parts = Splitter.on(',').splitToList(argsStr); + if (parts.size() != 2) { + throw new IllegalArgumentException("Invalid argument " + argsStr + ", should have 2 parts"); + } + return Pair.of(parts.get(0).trim(), Integer.parseInt(parts.get(1).trim())); + } + + public static Type toIcebergType(Schema valueSchema, IcebergSinkConfig config) { + return new SchemaGenerator(config).toIcebergType(valueSchema); + } + + public static Type inferIcebergType(Object value, IcebergSinkConfig config) { + return new SchemaGenerator(config).inferIcebergType(value); + } + + static class SchemaGenerator { + + private int fieldId = 1; + private final IcebergSinkConfig config; + + SchemaGenerator(IcebergSinkConfig config) { + this.config = config; + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + Type toIcebergType(Schema valueSchema) { + switch (valueSchema.type()) { + case BOOLEAN: + return BooleanType.get(); + case BYTES: + if (Decimal.LOGICAL_NAME.equals(valueSchema.name())) { + int scale = Integer.parseInt(valueSchema.parameters().get(Decimal.SCALE_FIELD)); + return DecimalType.of(38, scale); + } + return BinaryType.get(); + case INT8: + case INT16: + return IntegerType.get(); + case INT32: + if (Date.LOGICAL_NAME.equals(valueSchema.name())) { + return DateType.get(); + } else if (Time.LOGICAL_NAME.equals(valueSchema.name())) { + return TimeType.get(); + } + return IntegerType.get(); + case INT64: + if (Timestamp.LOGICAL_NAME.equals(valueSchema.name())) { + return TimestampType.withZone(); + } + return LongType.get(); + case FLOAT32: + return FloatType.get(); + case FLOAT64: + return DoubleType.get(); + case ARRAY: + Type elementType = toIcebergType(valueSchema.valueSchema()); + if (config.schemaForceOptional() || valueSchema.valueSchema().isOptional()) { + return ListType.ofOptional(nextId(), elementType); + } else { + return ListType.ofRequired(nextId(), elementType); + } + case MAP: + Type keyType = toIcebergType(valueSchema.keySchema()); + Type valueType = toIcebergType(valueSchema.valueSchema()); + if (config.schemaForceOptional() || valueSchema.valueSchema().isOptional()) { + return MapType.ofOptional(nextId(), nextId(), keyType, valueType); + } else { + return MapType.ofRequired(nextId(), nextId(), keyType, valueType); + } + case STRUCT: + List structFields = + valueSchema.fields().stream() + .map( + field -> + NestedField.of( + nextId(), + config.schemaForceOptional() || field.schema().isOptional(), + field.name(), + toIcebergType(field.schema()))) + .collect(Collectors.toList()); + return StructType.of(structFields); + case STRING: + default: + return StringType.get(); + } + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public Type inferIcebergType(Object value) { + if (value == null) { + return null; + } else if (value instanceof String) { + return StringType.get(); + } else if (value instanceof Boolean) { + return BooleanType.get(); + } else if (value instanceof BigDecimal) { + BigDecimal bigDecimal = (BigDecimal) value; + return DecimalType.of(bigDecimal.precision(), bigDecimal.scale()); + } else if (value instanceof Integer || value instanceof Long) { + return LongType.get(); + } else if (value instanceof Float || value instanceof Double) { + return DoubleType.get(); + } else if (value instanceof LocalDate) { + return DateType.get(); + } else if (value instanceof LocalTime) { + return TimeType.get(); + } else if (value instanceof java.util.Date || value instanceof OffsetDateTime) { + return TimestampType.withZone(); + } else if (value instanceof LocalDateTime) { + return TimestampType.withoutZone(); + } else if (value instanceof List) { + List list = (List) value; + if (list.isEmpty()) { + return null; + } + Type elementType = inferIcebergType(list.get(0)); + return elementType == null ? null : ListType.ofOptional(nextId(), elementType); + } else if (value instanceof Map) { + Map map = (Map) value; + List structFields = + map.entrySet().stream() + .filter(entry -> entry.getKey() != null && entry.getValue() != null) + .map( + entry -> { + Type valueType = inferIcebergType(entry.getValue()); + return valueType == null + ? null + : NestedField.optional(nextId(), entry.getKey().toString(), valueType); + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (structFields.isEmpty()) { + return null; + } + return StructType.of(structFields); + } else { + return null; + } + } + + private int nextId() { + return fieldId++; + } + } + + private SchemaUtils() {} +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java new file mode 100644 index 000000000000..ec13b003a21a --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utilities { + + private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName()); + private static final List HADOOP_CONF_FILES = + ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml"); + + public static Catalog loadCatalog(IcebergSinkConfig config) { + return CatalogUtil.buildIcebergCatalog( + config.catalogName(), config.catalogProps(), loadHadoopConfig(config)); + } + + // use reflection here to avoid requiring Hadoop as a dependency + private static Object loadHadoopConfig(IcebergSinkConfig config) { + Class configClass = + DynClasses.builder() + .impl("org.apache.hadoop.hdfs.HdfsConfiguration") + .impl("org.apache.hadoop.conf.Configuration") + .orNull() + .build(); + + if (configClass == null) { + LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; + } + + try { + Object result = DynConstructors.builder().hiddenImpl(configClass).build().newInstance(); + BoundMethod addResourceMethod = + DynMethods.builder("addResource").impl(configClass, URL.class).build(result); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + + // load any config files in the specified config directory + String hadoopConfDir = config.hadoopConfDir(); + if (hadoopConfDir != null) { + HADOOP_CONF_FILES.forEach( + confFile -> { + Path path = Paths.get(hadoopConfDir, confFile); + if (Files.exists(path)) { + try { + addResourceMethod.invoke(path.toUri().toURL()); + } catch (IOException e) { + LOG.warn("Error adding Hadoop resource {}, resource was not added", path, e); + } + } + }); + } + + // set any Hadoop properties specified in the sink config + config.hadoopProps().forEach(setMethod::invoke); + + LOG.info("Hadoop config initialized: {}", configClass.getName()); + return result; + } catch (Exception e) { + LOG.warn( + "Hadoop found on classpath but could not create config, proceeding without config", e); + } + return null; + } + + @SuppressWarnings("unchecked") + public static Object extractFromRecordValue(Object recordValue, String fieldName) { + List fields = Splitter.on('.').splitToList(fieldName); + if (recordValue instanceof Struct) { + return valueFromStruct((Struct) recordValue, fields); + } else if (recordValue instanceof Map) { + return valueFromMap((Map) recordValue, fields); + } else { + throw new UnsupportedOperationException( + "Cannot extract value from type: " + recordValue.getClass().getName()); + } + } + + private static Object valueFromStruct(Struct parent, List fields) { + Struct struct = parent; + for (int idx = 0; idx < fields.size() - 1; idx++) { + Object value = fieldValueFromStruct(struct, fields.get(idx)); + if (value == null) { + return null; + } + Preconditions.checkState(value instanceof Struct, "Expected a struct type"); + struct = (Struct) value; + } + return fieldValueFromStruct(struct, fields.get(fields.size() - 1)); + } + + private static Object fieldValueFromStruct(Struct struct, String fieldName) { + Field structField = struct.schema().field(fieldName); + if (structField == null) { + return null; + } + return struct.get(structField); + } + + @SuppressWarnings("unchecked") + private static Object valueFromMap(Map parent, List fields) { + Map map = parent; + for (int idx = 0; idx < fields.size() - 1; idx++) { + Object value = map.get(fields.get(idx)); + if (value == null) { + return null; + } + Preconditions.checkState(value instanceof Map, "Expected a map type"); + map = (Map) value; + } + return map.get(fields.get(fields.size() - 1)); + } + + public static TaskWriter createTableWriter( + Table table, String tableName, IcebergSinkConfig config) { + Map tableProps = Maps.newHashMap(table.properties()); + tableProps.putAll(config.writeProps()); + + String formatStr = tableProps.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + FileFormat format = FileFormat.fromString(formatStr); + + long targetFileSize = + PropertyUtil.propertyAsLong( + tableProps, WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + + Set identifierFieldIds = table.schema().identifierFieldIds(); + + // override the identifier fields if the config is set + List idCols = config.tableConfig(tableName).idColumns(); + if (!idCols.isEmpty()) { + identifierFieldIds = + idCols.stream() + .map( + colName -> { + NestedField field = table.schema().findField(colName); + if (field == null) { + throw new IllegalArgumentException("ID column not found: " + colName); + } + return field.fieldId(); + }) + .collect(toSet()); + } + + FileAppenderFactory appenderFactory; + if (identifierFieldIds == null || identifierFieldIds.isEmpty()) { + appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec(), null, null, null) + .setAll(tableProps); + } else { + appenderFactory = + new GenericAppenderFactory( + table.schema(), + table.spec(), + Ints.toArray(identifierFieldIds), + TypeUtil.select(table.schema(), Sets.newHashSet(identifierFieldIds)), + null) + .setAll(tableProps); + } + + // (partition ID + task ID + operation ID) must be unique + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, System.currentTimeMillis()) + .defaultSpec(table.spec()) + .operationId(UUID.randomUUID().toString()) + .format(format) + .build(); + + // FIXME: add delta writers + + TaskWriter writer; + if (table.spec().isUnpartitioned()) { + writer = + new UnpartitionedWriter<>( + table.spec(), format, appenderFactory, fileFactory, table.io(), targetFileSize); + } else { + writer = + new PartitionedAppendWriter( + table.spec(), + format, + appenderFactory, + fileFactory, + table.io(), + targetFileSize, + table.schema()); + } + return writer; + } + + private Utilities() {} +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java new file mode 100644 index 000000000000..cb3a700da247 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types.StructType; + +public class WriterResult { + + private final TableIdentifier tableIdentifier; + private final List dataFiles; + private final List deleteFiles; + private final StructType partitionStruct; + + public WriterResult( + TableIdentifier tableIdentifier, + List dataFiles, + List deleteFiles, + StructType partitionStruct) { + this.tableIdentifier = tableIdentifier; + this.dataFiles = dataFiles; + this.deleteFiles = deleteFiles; + this.partitionStruct = partitionStruct; + } + + public TableIdentifier tableIdentifier() { + return tableIdentifier; + } + + public List dataFiles() { + return dataFiles; + } + + public List deleteFiles() { + return deleteFiles; + } + + public StructType partitionStruct() { + return partitionStruct; + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConfigTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConfigTest.java new file mode 100644 index 000000000000..e2b2f96e09ba --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConfigTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; + +public class IcebergSinkConfigTest { + + @Test + public void testGetVersion() { + String version = IcebergSinkConfig.version(); + assertThat(version).isNotNull(); + } + + @Test + public void testInvalid() { + Map props = + ImmutableMap.of( + "topics", "source-topic", + "iceberg.catalog.type", "rest", + "iceberg.tables", "db.landing", + "iceberg.tables.dynamic-enabled", "true"); + assertThatThrownBy(() -> new IcebergSinkConfig(props)) + .isInstanceOf(ConfigException.class) + .hasMessage("Cannot specify both static and dynamic table names"); + } + + @Test + public void testGetDefault() { + Map props = + ImmutableMap.of( + "iceberg.catalog.type", "rest", + "topics", "source-topic", + "iceberg.tables", "db.landing"); + IcebergSinkConfig config = new IcebergSinkConfig(props); + assertThat(config.commitIntervalMs()).isEqualTo(300_000); + } + + @Test + public void testStringToList() { + List result = IcebergSinkConfig.stringToList(null, ","); + assertThat(result).isEmpty(); + + result = IcebergSinkConfig.stringToList("", ","); + assertThat(result).isEmpty(); + + result = IcebergSinkConfig.stringToList("one ", ","); + assertThat(result).contains("one"); + + result = IcebergSinkConfig.stringToList("one, two", ","); + assertThat(result).contains("one", "two"); + + result = IcebergSinkConfig.stringToList("bucket(id, 4)", ","); + assertThat(result).contains("bucket(id", "4)"); + + result = + IcebergSinkConfig.stringToList("bucket(id, 4)", IcebergSinkConfig.COMMA_NO_PARENS_REGEX); + assertThat(result).contains("bucket(id, 4)"); + + result = + IcebergSinkConfig.stringToList( + "bucket(id, 4), type", IcebergSinkConfig.COMMA_NO_PARENS_REGEX); + assertThat(result).contains("bucket(id, 4)", "type"); + } + + @Test + public void testStringWithParensToList() {} +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java new file mode 100644 index 000000000000..86502794b224 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import static org.apache.iceberg.connect.IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.sink.SinkConnector; +import org.junit.jupiter.api.Test; + +public class IcebergSinkConnectorTest { + + @Test + public void testTaskConfigs() { + SinkConnector connector = new IcebergSinkConnector(); + connector.start(ImmutableMap.of()); + List> configs = connector.taskConfigs(3); + assertThat(configs).hasSize(3); + configs.forEach(map -> assertThat(map).containsKey(INTERNAL_TRANSACTIONAL_SUFFIX_PROP)); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java new file mode 100644 index 000000000000..80adc7fc3e03 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import org.apache.iceberg.LocationProviders; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; + +public class BaseWriterTest { + + protected InMemoryFileIO fileIO; + protected Table table; + + protected static final Schema SCHEMA = + new Schema( + ImmutableList.of( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.required(3, "id2", Types.LongType.get())), + ImmutableSet.of(1, 3)); + + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("data").build(); + + @BeforeEach + public void before() { + fileIO = new InMemoryFileIO(); + + table = mock(Table.class); + when(table.schema()).thenReturn(SCHEMA); + when(table.spec()).thenReturn(PartitionSpec.unpartitioned()); + when(table.io()).thenReturn(fileIO); + when(table.locationProvider()) + .thenReturn(LocationProviders.locationsFor("file", ImmutableMap.of())); + when(table.encryption()).thenReturn(PlaintextEncryptionManager.instance()); + when(table.properties()).thenReturn(ImmutableMap.of()); + } + + protected WriteResult writeTest( + List rows, IcebergSinkConfig config, Class expectedWriterClass) { + try (TaskWriter writer = Utilities.createTableWriter(table, "name", config)) { + assertThat(writer.getClass()).isEqualTo(expectedWriterClass); + + rows.forEach( + row -> { + try { + writer.write(row); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + return writer.complete(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java new file mode 100644 index 000000000000..93d1d2fa6bea --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.StringType; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; + +public class IcebergWriterFactoryTest { + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @SuppressWarnings("unchecked") + public void testAutoCreateTable(boolean partitioned) { + Catalog catalog = mock(Catalog.class); + when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table")); + + TableSinkConfig tableConfig = mock(TableSinkConfig.class); + if (partitioned) { + when(tableConfig.partitionBy()).thenReturn(ImmutableList.of("data")); + } + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.autoCreateProps()).thenReturn(ImmutableMap.of("test-prop", "foo1")); + when(config.tableConfig(any())).thenReturn(tableConfig); + + SinkRecord record = mock(SinkRecord.class); + when(record.value()).thenReturn(ImmutableMap.of("id", 123, "data", "foo2")); + + IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config); + factory.autoCreateTable("db.tbl", record); + + ArgumentCaptor identCaptor = ArgumentCaptor.forClass(TableIdentifier.class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Schema.class); + ArgumentCaptor specCaptor = ArgumentCaptor.forClass(PartitionSpec.class); + ArgumentCaptor> propsCaptor = ArgumentCaptor.forClass(Map.class); + + verify(catalog) + .createTable( + identCaptor.capture(), + schemaCaptor.capture(), + specCaptor.capture(), + propsCaptor.capture()); + + assertThat(identCaptor.getValue()).isEqualTo(TableIdentifier.of("db", "tbl")); + assertThat(schemaCaptor.getValue().findField("id").type()).isEqualTo(LongType.get()); + assertThat(schemaCaptor.getValue().findField("data").type()).isEqualTo(StringType.get()); + assertThat(specCaptor.getValue().isPartitioned()).isEqualTo(partitioned); + assertThat(propsCaptor.getValue()).containsKey("test-prop"); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedAppendWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedAppendWriterTest.java new file mode 100644 index 000000000000..eaf68798140d --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedAppendWriterTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class PartitionedAppendWriterTest extends BaseWriterTest { + + @ParameterizedTest + @ValueSource(strings = {"parquet", "orc"}) + public void testPartitionedAppendWriter(String format) { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format)); + + when(table.spec()).thenReturn(SPEC); + + Record row1 = GenericRecord.create(SCHEMA); + row1.setField("id", 123L); + row1.setField("data", "hello world!"); + row1.setField("id2", 123L); + + Record row2 = GenericRecord.create(SCHEMA); + row2.setField("id", 234L); + row2.setField("data", "foobar"); + row2.setField("id2", 234L); + + WriteResult result = + writeTest(ImmutableList.of(row1, row2), config, PartitionedAppendWriter.class); + + // 1 data file for each partition (2 total) + assertThat(result.dataFiles()).hasSize(2); + assertThat(result.dataFiles()).allMatch(file -> file.format() == FileFormat.fromString(format)); + assertThat(result.deleteFiles()).hasSize(0); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUtilsTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUtilsTest.java new file mode 100644 index 000000000000..44be5f2eb9ad --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SchemaUtilsTest.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types.BinaryType; +import org.apache.iceberg.types.Types.BooleanType; +import org.apache.iceberg.types.Types.DateType; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.DoubleType; +import org.apache.iceberg.types.Types.FloatType; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimeType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class SchemaUtilsTest { + + private static final org.apache.iceberg.Schema SIMPLE_SCHEMA = + new org.apache.iceberg.Schema( + NestedField.required(1, "i", IntegerType.get()), + NestedField.required(2, "f", FloatType.get())); + + private static final org.apache.iceberg.Schema NESTED_SCHEMA = + new org.apache.iceberg.Schema( + NestedField.required(3, "s", StringType.get()), + NestedField.required(4, "st", StructType.of(SIMPLE_SCHEMA.columns()))); + + private static final org.apache.iceberg.Schema SCHEMA_FOR_SPEC = + new org.apache.iceberg.Schema( + NestedField.required(1, "i", IntegerType.get()), + NestedField.required(2, "s", StringType.get()), + NestedField.required(3, "ts1", TimestampType.withZone()), + NestedField.required(4, "ts2", TimestampType.withZone()), + NestedField.required(5, "ts3", TimestampType.withZone()), + NestedField.required(6, "ts4", TimestampType.withZone())); + + @Test + public void testApplySchemaUpdates() { + UpdateSchema updateSchema = mock(UpdateSchema.class); + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + when(table.updateSchema()).thenReturn(updateSchema); + + // the updates to "i" should be ignored as it already exists and is the same type + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + consumer.addColumn(null, "i", IntegerType.get()); + consumer.updateType("i", IntegerType.get()); + consumer.makeOptional("i"); + consumer.updateType("f", DoubleType.get()); + consumer.addColumn(null, "s", StringType.get()); + + SchemaUtils.applySchemaUpdates(table, consumer); + verify(table).refresh(); + verify(table).updateSchema(); + + verify(updateSchema).addColumn(isNull(), eq("s"), isA(StringType.class)); + verify(updateSchema).updateColumn(eq("f"), isA(DoubleType.class)); + verify(updateSchema).makeColumnOptional(eq("i")); + verify(updateSchema).commit(); + + // check that there are no unexpected invocations... + verify(updateSchema).addColumn(isNull(), anyString(), any()); + verify(updateSchema).updateColumn(any(), any()); + verify(updateSchema).makeColumnOptional(any()); + } + + @Test + public void testApplyNestedSchemaUpdates() { + UpdateSchema updateSchema = mock(UpdateSchema.class); + Table table = mock(Table.class); + when(table.schema()).thenReturn(NESTED_SCHEMA); + when(table.updateSchema()).thenReturn(updateSchema); + + // the updates to "st.i" should be ignored as it already exists and is the same type + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + consumer.addColumn("st", "i", IntegerType.get()); + consumer.updateType("st.i", IntegerType.get()); + consumer.makeOptional("st.i"); + consumer.updateType("st.f", DoubleType.get()); + consumer.addColumn("st", "s", StringType.get()); + + SchemaUtils.applySchemaUpdates(table, consumer); + verify(table).refresh(); + verify(table).updateSchema(); + + verify(updateSchema).addColumn(eq("st"), eq("s"), isA(StringType.class)); + verify(updateSchema).updateColumn(eq("st.f"), isA(DoubleType.class)); + verify(updateSchema).makeColumnOptional(eq("st.i")); + verify(updateSchema).commit(); + + // check that there are no unexpected invocations... + verify(updateSchema).addColumn(anyString(), anyString(), any()); + verify(updateSchema).updateColumn(any(), any()); + verify(updateSchema).makeColumnOptional(any()); + } + + @Test + public void testApplySchemaUpdatesNoUpdates() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + SchemaUtils.applySchemaUpdates(table, null); + verify(table, times(0)).refresh(); + verify(table, times(0)).updateSchema(); + + SchemaUtils.applySchemaUpdates(table, new SchemaUpdate.Consumer()); + verify(table, times(0)).refresh(); + verify(table, times(0)).updateSchema(); + } + + @Test + public void testNeedsDataTypeUpdate() { + // valid updates + assertThat(SchemaUtils.needsDataTypeUpdate(FloatType.get(), Schema.FLOAT64_SCHEMA)) + .isInstanceOf(DoubleType.class); + assertThat(SchemaUtils.needsDataTypeUpdate(IntegerType.get(), Schema.INT64_SCHEMA)) + .isInstanceOf(LongType.class); + + // other updates will be skipped + assertThat(SchemaUtils.needsDataTypeUpdate(IntegerType.get(), Schema.STRING_SCHEMA)).isNull(); + assertThat(SchemaUtils.needsDataTypeUpdate(FloatType.get(), Schema.STRING_SCHEMA)).isNull(); + assertThat(SchemaUtils.needsDataTypeUpdate(StringType.get(), Schema.INT64_SCHEMA)).isNull(); + } + + @Test + public void testCreatePartitionSpecUnpartitioned() { + PartitionSpec spec = SchemaUtils.createPartitionSpec(SCHEMA_FOR_SPEC, ImmutableList.of()); + assertThat(spec.isPartitioned()).isFalse(); + } + + @Test + public void testCreatePartitionSpec() { + List partitionFields = + ImmutableList.of( + "year(ts1)", + "month(ts2)", + "day(ts3)", + "hour(ts4)", + "bucket(i, 4)", + "truncate(s, 10)", + "s"); + PartitionSpec spec = SchemaUtils.createPartitionSpec(SCHEMA_FOR_SPEC, partitionFields); + assertThat(spec.isPartitioned()).isTrue(); + assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.year())); + assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.month())); + assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.day())); + assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.hour())); + assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.bucket(4))); + assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.truncate(10))); + assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.identity())); + } + + boolean matchingTransform(PartitionField partitionField, Transform expectedTransform) { + return partitionField.transform().equals(expectedTransform); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testToIcebergType(boolean forceOptional) { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.schemaForceOptional()).thenReturn(forceOptional); + + assertThat(SchemaUtils.toIcebergType(Schema.BOOLEAN_SCHEMA, config)) + .isInstanceOf(BooleanType.class); + assertThat(SchemaUtils.toIcebergType(Schema.BYTES_SCHEMA, config)) + .isInstanceOf(BinaryType.class); + assertThat(SchemaUtils.toIcebergType(Schema.INT8_SCHEMA, config)) + .isInstanceOf(IntegerType.class); + assertThat(SchemaUtils.toIcebergType(Schema.INT16_SCHEMA, config)) + .isInstanceOf(IntegerType.class); + assertThat(SchemaUtils.toIcebergType(Schema.INT32_SCHEMA, config)) + .isInstanceOf(IntegerType.class); + assertThat(SchemaUtils.toIcebergType(Schema.INT64_SCHEMA, config)).isInstanceOf(LongType.class); + assertThat(SchemaUtils.toIcebergType(Schema.FLOAT32_SCHEMA, config)) + .isInstanceOf(FloatType.class); + assertThat(SchemaUtils.toIcebergType(Schema.FLOAT64_SCHEMA, config)) + .isInstanceOf(DoubleType.class); + assertThat(SchemaUtils.toIcebergType(Schema.STRING_SCHEMA, config)) + .isInstanceOf(StringType.class); + assertThat(SchemaUtils.toIcebergType(Date.SCHEMA, config)).isInstanceOf(DateType.class); + assertThat(SchemaUtils.toIcebergType(Time.SCHEMA, config)).isInstanceOf(TimeType.class); + + Type timestampType = SchemaUtils.toIcebergType(Timestamp.SCHEMA, config); + assertThat(timestampType).isInstanceOf(TimestampType.class); + assertThat(((TimestampType) timestampType).shouldAdjustToUTC()).isTrue(); + + Type decimalType = SchemaUtils.toIcebergType(Decimal.schema(4), config); + assertThat(decimalType).isInstanceOf(DecimalType.class); + assertThat(((DecimalType) decimalType).scale()).isEqualTo(4); + + Type listType = + SchemaUtils.toIcebergType(SchemaBuilder.array(Schema.STRING_SCHEMA).build(), config); + assertThat(listType).isInstanceOf(ListType.class); + assertThat(listType.asListType().elementType()).isInstanceOf(StringType.class); + assertThat(listType.asListType().isElementOptional()).isEqualTo(forceOptional); + + Type mapType = + SchemaUtils.toIcebergType( + SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build(), config); + assertThat(mapType).isInstanceOf(MapType.class); + assertThat(mapType.asMapType().keyType()).isInstanceOf(StringType.class); + assertThat(mapType.asMapType().valueType()).isInstanceOf(StringType.class); + assertThat(mapType.asMapType().isValueOptional()).isEqualTo(forceOptional); + + Type structType = + SchemaUtils.toIcebergType( + SchemaBuilder.struct().field("i", Schema.INT32_SCHEMA).build(), config); + assertThat(structType).isInstanceOf(StructType.class); + assertThat(structType.asStructType().fieldType("i")).isInstanceOf(IntegerType.class); + assertThat(structType.asStructType().field("i").isOptional()).isEqualTo(forceOptional); + } + + @Test + public void testInferIcebergType() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + + assertThat(SchemaUtils.inferIcebergType(1, config)).isInstanceOf(LongType.class); + assertThat(SchemaUtils.inferIcebergType(1L, config)).isInstanceOf(LongType.class); + assertThat(SchemaUtils.inferIcebergType(1.1f, config)).isInstanceOf(DoubleType.class); + assertThat(SchemaUtils.inferIcebergType(1.1d, config)).isInstanceOf(DoubleType.class); + assertThat(SchemaUtils.inferIcebergType("foobar", config)).isInstanceOf(StringType.class); + assertThat(SchemaUtils.inferIcebergType(true, config)).isInstanceOf(BooleanType.class); + assertThat(SchemaUtils.inferIcebergType(LocalDate.now(), config)).isInstanceOf(DateType.class); + assertThat(SchemaUtils.inferIcebergType(LocalTime.now(), config)).isInstanceOf(TimeType.class); + + Type timestampType = SchemaUtils.inferIcebergType(new java.util.Date(), config); + assertThat(timestampType).isInstanceOf(TimestampType.class); + assertThat(((TimestampType) timestampType).shouldAdjustToUTC()).isTrue(); + + timestampType = SchemaUtils.inferIcebergType(OffsetDateTime.now(), config); + assertThat(timestampType).isInstanceOf(TimestampType.class); + assertThat(((TimestampType) timestampType).shouldAdjustToUTC()).isTrue(); + + timestampType = SchemaUtils.inferIcebergType(LocalDateTime.now(), config); + assertThat(timestampType).isInstanceOf(TimestampType.class); + assertThat(((TimestampType) timestampType).shouldAdjustToUTC()).isFalse(); + + Type decimalType = SchemaUtils.inferIcebergType(new BigDecimal("12.345"), config); + assertThat(decimalType).isInstanceOf(DecimalType.class); + assertThat(((DecimalType) decimalType).scale()).isEqualTo(3); + + assertThat(SchemaUtils.inferIcebergType(ImmutableList.of("foobar"), config)) + .isInstanceOf(ListType.class); + assertThat(SchemaUtils.inferIcebergType(ImmutableMap.of("foo", "bar"), config)) + .isInstanceOf(StructType.class); + } + + @Test + public void testInferIcebergTypeEmpty() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + + // skip infer for null + assertThat(SchemaUtils.inferIcebergType(null, config)).isNull(); + + // skip infer for empty list + assertThat(SchemaUtils.inferIcebergType(ImmutableList.of(), config)).isNull(); + // skip infer for list if first element is null + List list = Lists.newArrayList(); + list.add(null); + assertThat(SchemaUtils.inferIcebergType(list, config)).isNull(); + // skip infer for list if first element is an empty object + assertThat(SchemaUtils.inferIcebergType(ImmutableList.of(ImmutableMap.of()), config)).isNull(); + + // skip infer for empty object + assertThat(SchemaUtils.inferIcebergType(ImmutableMap.of(), config)).isNull(); + // skip infer for object if values are null + Map map = Maps.newHashMap(); + map.put("col", null); + assertThat(SchemaUtils.inferIcebergType(map, config)).isNull(); + // skip infer for object if values are empty objects + assertThat(SchemaUtils.inferIcebergType(ImmutableMap.of("nested", ImmutableMap.of()), config)) + .isNull(); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedWriterTest.java new file mode 100644 index 000000000000..807c899502f6 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedWriterTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class UnpartitionedWriterTest extends BaseWriterTest { + + @ParameterizedTest + @ValueSource(strings = {"parquet", "orc"}) + public void testUnpartitionedWriter(String format) { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format)); + + Record row1 = GenericRecord.create(SCHEMA); + row1.setField("id", 123L); + row1.setField("data", "hello world!"); + row1.setField("id2", 123L); + + Record row2 = GenericRecord.create(SCHEMA); + row2.setField("id", 234L); + row2.setField("data", "foobar"); + row2.setField("id2", 234L); + + WriteResult result = writeTest(ImmutableList.of(row1, row2), config, UnpartitionedWriter.class); + + assertThat(result.dataFiles()).hasSize(1); + assertThat(result.dataFiles()).allMatch(file -> file.format() == FileFormat.fromString(format)); + assertThat(result.deleteFiles()).hasSize(0); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java new file mode 100644 index 000000000000..cfa1709da744 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class UtilitiesTest { + + private static final String HADOOP_CONF_TEMPLATE = + "%s%s"; + + @TempDir private Path tempDir; + + public static class TestCatalog extends InMemoryCatalog implements Configurable { + private Configuration conf; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + } + + @Test + public void testLoadCatalogNoHadoopDir() { + Map props = + ImmutableMap.of( + "topics", + "mytopic", + "iceberg.tables", + "mytable", + "iceberg.hadoop.conf-prop", + "conf-value", + "iceberg.catalog.catalog-impl", + TestCatalog.class.getName()); + IcebergSinkConfig config = new IcebergSinkConfig(props); + Catalog result = Utilities.loadCatalog(config); + + assertThat(result).isInstanceOf(TestCatalog.class); + + Configuration conf = ((TestCatalog) result).conf; + assertThat(conf).isNotNull(); + + // check that the sink config property was added + assertThat(conf.get("conf-prop")).isEqualTo("conf-value"); + + // check that core-site.xml was loaded + assertThat(conf.get("foo")).isEqualTo("bar"); + } + + @ParameterizedTest + @ValueSource(strings = {"core-site.xml", "hdfs-site.xml", "hive-site.xml"}) + public void testLoadCatalogWithHadoopDir(String confFile) throws IOException { + Path path = tempDir.resolve(confFile); + String xml = String.format(HADOOP_CONF_TEMPLATE, "file-prop", "file-value"); + Files.write(path, xml.getBytes(StandardCharsets.UTF_8)); + + Map props = + ImmutableMap.of( + "topics", + "mytopic", + "iceberg.tables", + "mytable", + "iceberg.hadoop-conf-dir", + tempDir.toString(), + "iceberg.hadoop.conf-prop", + "conf-value", + "iceberg.catalog.catalog-impl", + TestCatalog.class.getName()); + IcebergSinkConfig config = new IcebergSinkConfig(props); + Catalog result = Utilities.loadCatalog(config); + + assertThat(result).isInstanceOf(TestCatalog.class); + + Configuration conf = ((TestCatalog) result).conf; + assertThat(conf).isNotNull(); + + // check that the sink config property was added + assertThat(conf.get("conf-prop")).isEqualTo("conf-value"); + + // check that the config file was loaded + assertThat(conf.get("file-prop")).isEqualTo("file-value"); + + // check that core-site.xml was loaded + assertThat(conf.get("foo")).isEqualTo("bar"); + } + + @Test + public void testExtractFromRecordValueStruct() { + Schema valSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build(); + Struct val = new Struct(valSchema).put("key", 123L); + Object result = Utilities.extractFromRecordValue(val, "key"); + assertThat(result).isEqualTo(123L); + } + + @Test + public void testExtractFromRecordValueStructNested() { + Schema idSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build(); + Schema dataSchema = SchemaBuilder.struct().field("id", idSchema).build(); + Schema valSchema = SchemaBuilder.struct().field("data", dataSchema).build(); + + Struct id = new Struct(idSchema).put("key", 123L); + Struct data = new Struct(dataSchema).put("id", id); + Struct val = new Struct(valSchema).put("data", data); + + Object result = Utilities.extractFromRecordValue(val, "data.id.key"); + assertThat(result).isEqualTo(123L); + } + + @Test + public void testExtractFromRecordValueStructNull() { + Schema valSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build(); + Struct val = new Struct(valSchema).put("key", 123L); + + Object result = Utilities.extractFromRecordValue(val, ""); + assertThat(result).isNull(); + + result = Utilities.extractFromRecordValue(val, "xkey"); + assertThat(result).isNull(); + } + + @Test + public void testExtractFromRecordValueMap() { + Map val = ImmutableMap.of("key", 123L); + Object result = Utilities.extractFromRecordValue(val, "key"); + assertThat(result).isEqualTo(123L); + } + + @Test + public void testExtractFromRecordValueMapNested() { + Map id = ImmutableMap.of("key", 123L); + Map data = ImmutableMap.of("id", id); + Map val = ImmutableMap.of("data", data); + + Object result = Utilities.extractFromRecordValue(val, "data.id.key"); + assertThat(result).isEqualTo(123L); + } + + @Test + public void testExtractFromRecordValueMapNull() { + Map val = ImmutableMap.of("key", 123L); + + Object result = Utilities.extractFromRecordValue(val, ""); + assertThat(result).isNull(); + + result = Utilities.extractFromRecordValue(val, "xkey"); + assertThat(result).isNull(); + } +} diff --git a/kafka-connect/kafka-connect/src/test/resources/core-site.xml b/kafka-connect/kafka-connect/src/test/resources/core-site.xml new file mode 100644 index 000000000000..189bc9dc299e --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/resources/core-site.xml @@ -0,0 +1,25 @@ + + + + + + + foo + bar + + \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 7a112dfd6829..3fd93db24928 100644 --- a/settings.gradle +++ b/settings.gradle @@ -195,3 +195,7 @@ if (JavaVersion.current() == JavaVersion.VERSION_1_8) { include ":iceberg-kafka-connect:kafka-connect-events" project(":iceberg-kafka-connect:kafka-connect-events").projectDir = file('kafka-connect/kafka-connect-events') project(":iceberg-kafka-connect:kafka-connect-events").name = "iceberg-kafka-connect-events" + +include ":iceberg-kafka-connect:kafka-connect" +project(":iceberg-kafka-connect:kafka-connect").projectDir = file('kafka-connect/kafka-connect') +project(":iceberg-kafka-connect:kafka-connect").name = "iceberg-kafka-connect"