diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md
index 939cfc6d8bb..0b260c7613e 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md
@@ -164,6 +164,20 @@ Pipeline Connector Options
Long |
单个记录的最大大小(以byte为单位)。 |
+
+ | sharding.suffix.key |
+ optional |
+ (none) |
+ Long |
+ 每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认 sink 表名为 test_table${suffix_key}。默认分片字段为第一个分区列。表之间用';'分隔。表和字段之间用':'分割。例如,我们设置 sharding.suffix.key 为'table1:col1;table2:col2'。 |
+
+
+ | sharding.suffix.separator |
+ optional |
+ "_" |
+ String |
+ 用于分割表名称和分片后缀的分隔符。默认是 '_'。如果设置为 '-',那么表名称会是 test_table-${suffix}。 |
+
diff --git a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md
index 579d7015e9f..3f3d693fee8 100644
--- a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md
+++ b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md
@@ -164,6 +164,20 @@ Pipeline Connector Options
Long |
The maximum size of a single record in bytes. |
+
+ | sharding.suffix.key |
+ optional |
+ (none) |
+ Long |
+ Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by ':'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'. |
+
+
+ | sharding.suffix.separator |
+ optional |
+ "_" |
+ String |
+ Separator for sharding suffix in table names, allow defining the separator between table name and sharding suffix. Default value is '_'. For example, if set to '-', the default table name would be test_table-${suffix} |
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java
index 97e95452069..4ea58ab3517 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java
@@ -17,12 +17,17 @@
package org.apache.flink.cdc.connectors.elasticsearch.config;
+import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
import org.apache.http.HttpHost;
import java.io.Serializable;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_SEPARATOR;
/** Elasticsearch DataSink Options reference {@link ElasticsearchSinkOptions}. */
public class ElasticsearchSinkOptions implements Serializable {
@@ -37,6 +42,8 @@ public class ElasticsearchSinkOptions implements Serializable {
private final int version;
private final String username;
private final String password;
+ private final Map shardingKey;
+ private final String shardingSeparator;
/** Constructor for ElasticsearchSinkOptions. */
public ElasticsearchSinkOptions(
@@ -50,6 +57,34 @@ public ElasticsearchSinkOptions(
int version,
String username,
String password) {
+ this(
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBatchSizeInBytes,
+ maxTimeInBufferMS,
+ maxRecordSizeInBytes,
+ networkConfig,
+ version,
+ username,
+ password,
+ Collections.emptyMap(),
+ SHARDING_SUFFIX_SEPARATOR.defaultValue());
+ }
+
+ public ElasticsearchSinkOptions(
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long maxBatchSizeInBytes,
+ long maxTimeInBufferMS,
+ long maxRecordSizeInBytes,
+ NetworkConfig networkConfig,
+ int version,
+ String username,
+ String password,
+ Map shardingKey,
+ String shardingSeparator) {
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
@@ -60,6 +95,8 @@ public ElasticsearchSinkOptions(
this.version = version;
this.username = username;
this.password = password;
+ this.shardingKey = shardingKey;
+ this.shardingSeparator = shardingSeparator;
}
/** @return the maximum batch size */
@@ -113,4 +150,12 @@ public String getUsername() {
public String getPassword() {
return password;
}
+
+ public Map getShardingKey() {
+ return shardingKey;
+ }
+
+ public String getShardingSeparator() {
+ return shardingSeparator;
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
index 2968babb0f6..800c0f359f7 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
@@ -46,12 +46,15 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_SEPARATOR;
+
/** A serializer for Event to BulkOperationVariant. */
public class ElasticsearchEventSerializer implements ElementConverter {
private final ObjectMapper objectMapper = new ObjectMapper();
@@ -70,8 +73,18 @@ public class ElasticsearchEventSerializer implements ElementConverter shardingKey;
+ private final String shardingSeparator;
+
public ElasticsearchEventSerializer(ZoneId zoneId) {
+ this(zoneId, Collections.emptyMap(), SHARDING_SUFFIX_SEPARATOR.defaultValue());
+ }
+
+ public ElasticsearchEventSerializer(
+ ZoneId zoneId, Map shardingKey, String shardingSeparator) {
this.pipelineZoneId = zoneId;
+ this.shardingKey = shardingKey;
+ this.shardingSeparator = shardingSeparator;
}
@Override
@@ -145,7 +158,7 @@ private BulkOperationVariant createBulkOperationVariant(DataChangeEvent event)
case UPDATE:
valueMap = serializeRecord(tableId, event.after(), schema, pipelineZoneId);
return new IndexOperation.Builder<>()
- .index(tableId.toString())
+ .index(tableSharding(tableId, schema, valueMap))
.id(id)
.document(valueMap)
.build();
@@ -156,6 +169,16 @@ private BulkOperationVariant createBulkOperationVariant(DataChangeEvent event)
}
}
+ public String tableSharding(TableId tableId, Schema schema, Map valueMap) {
+ Object value = null;
+ if (shardingKey.containsKey(tableId)) {
+ value = valueMap.get(shardingKey.get(tableId));
+ } else if (!schema.partitionKeys().isEmpty()) {
+ value = valueMap.get(schema.partitionKeys().get(0));
+ }
+ return value != null ? tableId.toString() + shardingSeparator + value : tableId.toString();
+ }
+
private Object[] generateUniqueId(RecordData recordData, Schema schema, TableId tableId) {
List primaryKeys = schema.primaryKeys();
List converters =
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java
index 14d7d4ebeec..b98d3e86109 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java
@@ -79,7 +79,9 @@ public EventSinkProvider getEventSinkProvider() {
}
private EventSinkProvider getElasticsearch6SinkProvider() {
- ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(zoneId);
+ ElasticsearchEventSerializer serializer =
+ new ElasticsearchEventSerializer(
+ zoneId, esOptions.getShardingKey(), esOptions.getShardingSeparator());
org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts =
esOptions.getHosts().stream()
.map(
@@ -128,7 +130,11 @@ private EventSinkProvider getElasticsearch8SinkProvider() {
Elasticsearch8AsyncSinkBuilder sinkBuilder =
new Elasticsearch8AsyncSinkBuilder()
.setHosts(esOptions.getHosts().toArray(new HttpHost[0]))
- .setElementConverter(new ElasticsearchEventSerializer(zoneId))
+ .setElementConverter(
+ new ElasticsearchEventSerializer(
+ zoneId,
+ esOptions.getShardingKey(),
+ esOptions.getShardingSeparator()))
.setMaxBatchSize(esOptions.getMaxBatchSize())
.setMaxInFlightRequests(esOptions.getMaxInFlightRequests())
.setMaxBufferedRequests(esOptions.getMaxBufferedRequests())
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java
index b8bcf59b928..83f7c115218 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java
@@ -19,6 +19,7 @@
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@@ -31,8 +32,10 @@
import java.time.ZoneId;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -45,6 +48,8 @@
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_RECORD_SIZE_IN_BYTES;
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_TIME_IN_BUFFER_MS;
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.PASSWORD;
+import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_KEY;
+import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_SEPARATOR;
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.USERNAME;
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.VERSION;
@@ -52,6 +57,7 @@
public class ElasticsearchDataSinkFactory implements DataSinkFactory {
public static final String IDENTIFIER = "elasticsearch";
+ private static final String ES_INDEX_ILLEGAL_CHARS = "\\/*?\"<>| ,#";
@Override
public DataSink createDataSink(Context context) {
@@ -85,6 +91,25 @@ private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConf
String username = cdcConfig.get(USERNAME);
String password = cdcConfig.get(PASSWORD);
int version = cdcConfig.get(VERSION);
+ Map shardingMaps = new HashMap<>();
+ String shardingKey = cdcConfig.get(SHARDING_SUFFIX_KEY);
+ String shardingSeparator = cdcConfig.get(SHARDING_SUFFIX_SEPARATOR);
+ if (!shardingKey.isEmpty()) {
+ for (String tables : shardingKey.split(";")) {
+ String[] splits = tables.split(":");
+ if (splits.length == 2) {
+ TableId tableId = TableId.parse(splits[0]);
+ shardingMaps.put(tableId, splits[1].trim());
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s is malformed, please refer to the documents",
+ SHARDING_SUFFIX_KEY.key()));
+ }
+ }
+ }
+ validateShardingSeparator(shardingSeparator);
+
NetworkConfig networkConfig =
new NetworkConfig(hosts, username, password, null, null, null);
return new ElasticsearchSinkOptions(
@@ -97,7 +122,9 @@ private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConf
networkConfig,
version,
username,
- password);
+ password,
+ shardingMaps,
+ shardingSeparator);
}
private List parseHosts(String hostsStr) {
@@ -130,6 +157,8 @@ public Set> optionalOptions() {
optionalOptions.add(MAX_RECORD_SIZE_IN_BYTES);
optionalOptions.add(USERNAME);
optionalOptions.add(PASSWORD);
+ optionalOptions.add(SHARDING_SUFFIX_KEY);
+ optionalOptions.add(SHARDING_SUFFIX_SEPARATOR);
return optionalOptions;
}
@@ -151,4 +180,22 @@ private void validateRequiredOptions(Configuration configuration) {
.collect(Collectors.joining("\n"))));
}
}
+
+ private void validateShardingSeparator(String separator) {
+ if (!separator.equals(separator.toLowerCase())) {
+ throw new ValidationException(
+ String.format(
+ "%s is malformed, elasticsearch index only support lowercase.",
+ SHARDING_SUFFIX_SEPARATOR.key()));
+ }
+
+ for (char c : ES_INDEX_ILLEGAL_CHARS.toCharArray()) {
+ if (separator.indexOf(c) != -1) {
+ throw new ValidationException(
+ String.format(
+ "%s is malformed, elasticsearch index cannot include \\, /, *, ?, \", <, >, |, ` ` (space character), ,, #",
+ SHARDING_SUFFIX_SEPARATOR.key()));
+ }
+ }
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
index b2629040059..213b446c8d7 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
@@ -98,6 +98,22 @@ public class ElasticsearchDataSinkOptions {
.noDefaultValue()
.withDescription("The password for Elasticsearch authentication.");
+ /** The sharding for Elasticsearch index, default sink table name is test_table_${suffix}. */
+ public static final ConfigOption SHARDING_SUFFIX_KEY =
+ ConfigOptions.key("sharding.suffix.key")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ "Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by '$'.For example, we can set sharding.suffix.key by 'table1$col1;table2$col2'");
+
+ /** The sharding for Elasticsearch index, default sink table name is test_table_${suffix}. */
+ public static final ConfigOption SHARDING_SUFFIX_SEPARATOR =
+ ConfigOptions.key("sharding.suffix.separator")
+ .stringType()
+ .defaultValue("_")
+ .withDescription(
+ "Separator for sharding suffix in table names, allow defining the separator between table name and sharding suffix. Default value is '_'. For example, if set to '-', the default table name would be test_table-${suffix}");
+
private ElasticsearchDataSinkOptions() {
// This class should not be instantiated
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java
index 32835349d82..90e3f4bb75a 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java
@@ -30,6 +30,8 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -128,8 +130,48 @@ void testPrefixedRequiredOption() {
Assertions.assertThat(dataSink).isInstanceOf(ElasticsearchDataSink.class);
}
+ /**
+ * Test the `validateShardingSeparator` method with illegal sharding separators. This test
+ * checks two scenarios: 1. Separators containing illegal characters. 2. A separator with
+ * uppercase letters.
+ */
+ @Test
+ void testIllegalShardingSeparator()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ ElasticsearchDataSinkFactory sinkFactory =
+ (ElasticsearchDataSinkFactory) getElasticsearchDataSinkFactory();
+ Method method =
+ ElasticsearchDataSinkFactory.class.getDeclaredMethod(
+ "validateShardingSeparator", String.class);
+ method.setAccessible(true);
+
+ // Test an array of invalid separators with illegal characters
+ String[] invalidSeparators = {"*", " ", ">", "<", "|", "?", "\"", ",", "#", "\\"};
+ for (String invalidSeparator : invalidSeparators) {
+ Throwable thrown =
+ Assertions.catchThrowable(() -> method.invoke(sinkFactory, invalidSeparator));
+ Assertions.assertThat(extractException(thrown))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "sharding.suffix.separator is malformed, elasticsearch index cannot include \\, /, *, ?, \", <, >, |, ` ` (space character), ,, #");
+ }
+
+ // Test a separator with uppercase letters
+ Throwable thrown = Assertions.catchThrowable(() -> method.invoke(sinkFactory, "_TEST"));
+ Assertions.assertThat(extractException(thrown))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "sharding.suffix.separator is malformed, elasticsearch index only support lowercase.");
+ }
+
// Helper methods
+ /** Helper method to extract the actual cause from an InvocationTargetException. */
+ private Throwable extractException(Throwable ex) {
+ Assertions.assertThat(ex).isInstanceOf(InvocationTargetException.class);
+ return ex.getCause();
+ }
+
private DataSinkFactory getElasticsearchDataSinkFactory() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java
new file mode 100644
index 00000000000..43241ca4b4c
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.sink;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.elasticsearch.serializer.ElasticsearchEventSerializer;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ElasticsearchEventSerializer}. */
+public class ElasticsearchEventSerializerTest {
+ TableId tableId = TableId.tableId("test");
+
+ @Test
+ void testTableShardingWithString() {
+ HashMap shardingKey = new HashMap<>();
+ shardingKey.put(tableId, "col1");
+ String index = getShardingString(shardingKey, "");
+ assertThat(index).isEqualTo("testc-10");
+ }
+
+ @Test
+ void testTableShardingWithInteger() {
+ HashMap shardingKey = new HashMap<>();
+ shardingKey.put(tableId, "id");
+ String index = getShardingString(shardingKey, "");
+ assertThat(index).isEqualTo("test110");
+ }
+
+ @Test
+ void testTableShardingWithDate() {
+ HashMap shardingKey = new HashMap<>();
+ shardingKey.put(tableId, "dt");
+ String index = getShardingString(shardingKey, "");
+ assertThat(index).isEqualTo("test2025-01-01");
+ }
+
+ @Test
+ void testTableShardingWithNull() {
+ HashMap shardingKey = new HashMap<>();
+ shardingKey.put(tableId, "col2");
+ String index = getShardingString(shardingKey, "");
+ assertThat(index).isEqualTo("test");
+ }
+
+ @Test
+ void testTableShardingWithPartitionCol() {
+ String index = getShardingString(Collections.emptyMap(), "_");
+ assertThat(index).isEqualTo("test_2025-01-01");
+ }
+
+ @Test
+ void testTableShardingWithSeparator() {
+ String index = getShardingString(Collections.emptyMap(), "$");
+ assertThat(index).isEqualTo("test$2025-01-01");
+ }
+
+ private String getShardingString(Map shardingKey, String shardingSeparator) {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(255).notNull(),
+ DataTypes.FLOAT(),
+ DataTypes.VARCHAR(45),
+ DataTypes.VARCHAR(55),
+ DataTypes.TIMESTAMP_TZ(),
+ DataTypes.DATE()
+ },
+ new String[] {"id", "name", "weight", "col1", "col2", "create_time", "dt"});
+ BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
+ DataChangeEvent event =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 110,
+ BinaryStringData.fromString("scooter"),
+ 5.5f,
+ BinaryStringData.fromString("c-10"),
+ null,
+ ZonedTimestampData.fromZonedDateTime(
+ LocalDateTime.of(2023, 11, 11, 11, 11, 11, 11)
+ .atZone(ZoneId.systemDefault())),
+ (int) LocalDate.of(2025, 1, 1).toEpochDay()
+ }));
+ ElasticsearchEventSerializer serializer =
+ new ElasticsearchEventSerializer(ZoneId.of("UTC"), shardingKey, shardingSeparator);
+ Schema tableSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(255).notNull())
+ .physicalColumn("weight", DataTypes.FLOAT())
+ .physicalColumn("col1", DataTypes.VARCHAR(45))
+ .physicalColumn("col2", DataTypes.VARCHAR(55))
+ .physicalColumn("create_time", DataTypes.TIMESTAMP_TZ())
+ .physicalColumn("dt", DataTypes.DATE())
+ .partitionKey("dt")
+ .build();
+
+ serializer.apply(new CreateTableEvent(tableId, tableSchema), new MockContext());
+ return serializer.apply(event, new MockContext())._toBulkOperation().index().index();
+ }
+
+ class MockContext implements SinkWriter.Context {
+
+ @Override
+ public long currentWatermark() {
+ return 0;
+ }
+
+ @Override
+ public Long timestamp() {
+ return null;
+ }
+ }
+}