Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ jaxb-api = "2.3.1"
jaxb-runtime = "2.3.3"
jetty = "9.4.53.v20231009"
junit = "5.10.1"
kafka = "3.6.1"
kryo-shaded = "4.0.3"
microprofile-openapi-api = "3.1.1"
mockito = "4.11.0"
Expand Down Expand Up @@ -152,6 +153,9 @@ jackson214-bom = { module = "com.fasterxml.jackson:jackson-bom", version.ref =
jackson215-bom = { module = "com.fasterxml.jackson:jackson-bom", version.ref = "jackson215" }
jaxb-api = { module = "javax.xml.bind:jaxb-api", version.ref = "jaxb-api" }
jaxb-runtime = { module = "org.glassfish.jaxb:jaxb-runtime", version.ref = "jaxb-runtime" }
kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka" }
kafka-connect-api = { module = "org.apache.kafka:connect-api", version.ref = "kafka" }
kafka-connect-json = { module = "org.apache.kafka:connect-json", version.ref = "kafka" }
microprofile-openapi-api = { module = "org.eclipse.microprofile.openapi:microprofile-openapi-api", version.ref = "microprofile-openapi-api" }
nessie-client = { module = "org.projectnessie.nessie:nessie-client", version.ref = "nessie" }
netty-buffer = { module = "io.netty:netty-buffer", version.ref = "netty-buffer" }
Expand Down
27 changes: 27 additions & 0 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,30 @@ project(":iceberg-kafka-connect:iceberg-kafka-connect-events") {
useJUnitPlatform()
}
}

project(":iceberg-kafka-connect:iceberg-kafka-connect") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the duplicate name? Could this be :iceberg-kafka-connect:iceberg-sink?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the convention of the spark projects to some extent, e.g. :iceberg-spark:iceberg-spark-<version>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so it sounds like the idea is to make the final artifact name iceberg-kafka-connect. That makes sense.

dependencies {
api project(':iceberg-api')
implementation project(':iceberg-core')
implementation project(':iceberg-common')
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
implementation project(':iceberg-data')
implementation project(':iceberg-kafka-connect:iceberg-kafka-connect-events')
implementation platform(libs.jackson.bom)
implementation "com.fasterxml.jackson.core:jackson-core"
implementation "com.fasterxml.jackson.core:jackson-databind"
implementation libs.avro.avro

compileOnly libs.kafka.clients
compileOnly libs.kafka.connect.api
compileOnly libs.kafka.connect.json

testImplementation libs.hadoop3.client
testRuntimeOnly project(':iceberg-parquet')
testRuntimeOnly project(':iceberg-orc')
}

test {
useJUnitPlatform()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.iceberg.types.Types;

/** Class for Avro-related utility methods. */
class AvroUtil {
public class AvroUtil {
static final Map<Integer, String> FIELD_ID_TO_CLASS =
ImmutableMap.of(
DataComplete.ASSIGNMENTS_ELEMENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.OffsetDateTime;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.types.Types.TimestampType;
Expand Down Expand Up @@ -53,6 +54,7 @@ public CommitComplete(Schema avroSchema) {
}

public CommitComplete(UUID commitId, OffsetDateTime validThroughTs) {
Preconditions.checkNotNull(commitId, "Commit ID cannot be null");
this.commitId = commitId;
this.validThroughTs = validThroughTs;
this.avroSchema = AVRO_SCHEMA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.OffsetDateTime;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
Expand Down Expand Up @@ -64,6 +65,8 @@ public CommitToTable(
TableReference tableReference,
Long snapshotId,
OffsetDateTime validThroughTs) {
Preconditions.checkNotNull(commitId, "Commit ID cannot be null");
Preconditions.checkNotNull(tableReference, "Table reference cannot be null");
this.commitId = commitId;
this.tableReference = tableReference;
this.snapshotId = snapshotId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types.ListType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
Expand Down Expand Up @@ -55,6 +56,7 @@ public DataComplete(Schema avroSchema) {
}

public DataComplete(UUID commitId, List<TopicPartitionOffset> assignments) {
Preconditions.checkNotNull(commitId, "Commit ID cannot be null");
this.commitId = commitId;
this.assignments = assignments;
this.avroSchema = AVRO_SCHEMA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.avro.Schema;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types.ListType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
Expand Down Expand Up @@ -62,6 +63,8 @@ public DataWritten(
TableReference tableReference,
List<DataFile> dataFiles,
List<DeleteFile> deleteFiles) {
Preconditions.checkNotNull(commitId, "Commit ID cannot be null");
Preconditions.checkNotNull(tableReference, "Table reference cannot be null");
this.partitionType = partitionType;
this.commitId = commitId;
this.tableReference = tableReference;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.NestedField;
Expand Down Expand Up @@ -59,6 +60,9 @@ public Event(Schema avroSchema) {
}

public Event(String groupId, Payload payload) {
Preconditions.checkNotNull(groupId, "Group ID cannot be null");
Preconditions.checkNotNull(payload, "Payload cannot be null");

this.id = UUID.randomUUID();
this.type = payload.type();
this.timestamp = OffsetDateTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.MICROS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.types.Types.UUIDType;
Expand All @@ -45,6 +46,7 @@ public StartCommit(Schema avroSchema) {
}

public StartCommit(UUID commitId) {
Preconditions.checkNotNull(commitId, "Commit ID cannot be null");
this.commitId = commitId;
this.avroSchema = AVRO_SCHEMA;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.avro.util.Utf8;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types.ListType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StringType;
Expand Down Expand Up @@ -63,6 +64,9 @@ public TableReference(Schema avroSchema) {
}

public TableReference(String catalog, List<String> namespace, String name) {
Preconditions.checkNotNull(catalog, "Catalog cannot be null");
Preconditions.checkNotNull(namespace, "Namespace cannot be null");
Preconditions.checkNotNull(name, "Name cannot be null");
this.catalog = catalog;
this.namespace = namespace;
this.name = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.OffsetDateTime;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.NestedField;
Expand Down Expand Up @@ -58,6 +59,7 @@ public TopicPartitionOffset(Schema avroSchema) {
}

public TopicPartitionOffset(String topic, int partition, Long offset, OffsetDateTime timestamp) {
Preconditions.checkNotNull(topic, "Topic cannot be null");
this.topic = topic;
this.partition = partition;
this.offset = offset;
Expand Down
Loading