From a4f341ddf753b1ac1be7002c06a9501134486f15 Mon Sep 17 00:00:00 2001 From: Junbo wang Date: Tue, 14 Jan 2025 19:56:20 +0800 Subject: [PATCH 1/6] [FLINK-36698][pipeline-connector][elasticsearch] Elasticsearch Pipeline Connector support index sharding --- .../pipeline-connectors/elasticsearch.md | 7 + .../pipeline-connectors/elasticsearch.md | 7 + .../config/ElasticsearchSinkOptions.java | 35 +++++ .../ElasticsearchEventSerializer.java | 18 ++- .../sink/ElasticsearchDataSink.java | 7 +- .../sink/ElasticsearchDataSinkFactory.java | 25 +++- .../sink/ElasticsearchDataSinkOptions.java | 8 ++ .../ElasticsearchEventSerializerTest.java | 125 ++++++++++++++++++ 8 files changed, 228 insertions(+), 4 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md index 939cfc6d8bb..2a397f59817 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md @@ -164,6 +164,13 @@ Pipeline Connector Options Long 单个记录的最大大小(以byte为单位)。 + + sharding.suffix.key + optional + (none) + Long + 每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认sink表名为test_table${suffix_key}。表之间用';'分隔。例如,我们设置sharding.suffix.key为'table1:col1;table2:col2'。 + diff --git a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md index 579d7015e9f..db5fb952a4a 100644 --- a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md +++ b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md @@ -164,6 +164,13 @@ Pipeline Connector Options Long The maximum size of a single record in bytes. + + sharding.suffix.key + optional + (none) + Long + Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Tables are separated by ';'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'. + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java index 97e95452069..b18797a1305 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java @@ -17,12 +17,15 @@ package org.apache.flink.cdc.connectors.elasticsearch.config; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; import org.apache.http.HttpHost; import java.io.Serializable; +import java.util.Collections; import java.util.List; +import java.util.Map; /** Elasticsearch DataSink Options reference {@link ElasticsearchSinkOptions}. */ public class ElasticsearchSinkOptions implements Serializable { @@ -37,6 +40,7 @@ public class ElasticsearchSinkOptions implements Serializable { private final int version; private final String username; private final String password; + private final Map shardingKey; /** Constructor for ElasticsearchSinkOptions. */ public ElasticsearchSinkOptions( @@ -50,6 +54,32 @@ public ElasticsearchSinkOptions( int version, String username, String password) { + this( + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInBytes, + networkConfig, + version, + username, + password, + Collections.emptyMap()); + } + + public ElasticsearchSinkOptions( + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + NetworkConfig networkConfig, + int version, + String username, + String password, + Map shardingKey) { this.maxBatchSize = maxBatchSize; this.maxInFlightRequests = maxInFlightRequests; this.maxBufferedRequests = maxBufferedRequests; @@ -60,6 +90,7 @@ public ElasticsearchSinkOptions( this.version = version; this.username = username; this.password = password; + this.shardingKey = shardingKey; } /** @return the maximum batch size */ @@ -113,4 +144,8 @@ public String getUsername() { public String getPassword() { return password; } + + public Map getShardingKey() { + return shardingKey; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java index 2968babb0f6..a55430f60cf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java @@ -46,6 +46,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -70,8 +71,15 @@ public class ElasticsearchEventSerializer implements ElementConverter shardingKey; + public ElasticsearchEventSerializer(ZoneId zoneId) { + this(zoneId, Collections.emptyMap()); + } + + public ElasticsearchEventSerializer(ZoneId zoneId, Map shardingKey) { this.pipelineZoneId = zoneId; + this.shardingKey = shardingKey; } @Override @@ -145,7 +153,7 @@ private BulkOperationVariant createBulkOperationVariant(DataChangeEvent event) case UPDATE: valueMap = serializeRecord(tableId, event.after(), schema, pipelineZoneId); return new IndexOperation.Builder<>() - .index(tableId.toString()) + .index(tableSharding(tableId, valueMap)) .id(id) .document(valueMap) .build(); @@ -156,6 +164,14 @@ private BulkOperationVariant createBulkOperationVariant(DataChangeEvent event) } } + public String tableSharding(TableId tableId, Map valueMap) { + if (shardingKey.containsKey(tableId)) { + Object value = valueMap.get(shardingKey.get(tableId)); + return value != null ? tableId.toString() + value : tableId.toString(); + } + return tableId.toString(); + } + private Object[] generateUniqueId(RecordData recordData, Schema schema, TableId tableId) { List primaryKeys = schema.primaryKeys(); List converters = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java index 14d7d4ebeec..8c3bcbacdb8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java @@ -79,7 +79,8 @@ public EventSinkProvider getEventSinkProvider() { } private EventSinkProvider getElasticsearch6SinkProvider() { - ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(zoneId); + ElasticsearchEventSerializer serializer = + new ElasticsearchEventSerializer(zoneId, esOptions.getShardingKey()); org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts = esOptions.getHosts().stream() .map( @@ -128,7 +129,9 @@ private EventSinkProvider getElasticsearch8SinkProvider() { Elasticsearch8AsyncSinkBuilder sinkBuilder = new Elasticsearch8AsyncSinkBuilder() .setHosts(esOptions.getHosts().toArray(new HttpHost[0])) - .setElementConverter(new ElasticsearchEventSerializer(zoneId)) + .setElementConverter( + new ElasticsearchEventSerializer( + zoneId, esOptions.getShardingKey())) .setMaxBatchSize(esOptions.getMaxBatchSize()) .setMaxInFlightRequests(esOptions.getMaxInFlightRequests()) .setMaxBufferedRequests(esOptions.getMaxBufferedRequests()) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java index b8bcf59b928..6369700022e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.factories.DataSinkFactory; import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.pipeline.PipelineOptions; @@ -31,8 +32,10 @@ import java.time.ZoneId; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -45,6 +48,7 @@ import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_RECORD_SIZE_IN_BYTES; import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_TIME_IN_BUFFER_MS; import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_KEY; import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.USERNAME; import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.VERSION; @@ -85,6 +89,23 @@ private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConf String username = cdcConfig.get(USERNAME); String password = cdcConfig.get(PASSWORD); int version = cdcConfig.get(VERSION); + Map shardingMaps = new HashMap<>(); + String shardingKey = cdcConfig.get(SHARDING_KEY); + if (!shardingKey.isEmpty()) { + for (String tables : shardingKey.split(";")) { + String[] splits = tables.split(":"); + if (splits.length == 2) { + TableId tableId = TableId.parse(splits[0]); + shardingMaps.put(tableId, splits[1].trim()); + } else { + throw new IllegalArgumentException( + String.format( + "%s is malformed, please refer to the documents", + SHARDING_KEY.key())); + } + } + } + NetworkConfig networkConfig = new NetworkConfig(hosts, username, password, null, null, null); return new ElasticsearchSinkOptions( @@ -97,7 +118,8 @@ private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConf networkConfig, version, username, - password); + password, + shardingMaps); } private List parseHosts(String hostsStr) { @@ -130,6 +152,7 @@ public Set> optionalOptions() { optionalOptions.add(MAX_RECORD_SIZE_IN_BYTES); optionalOptions.add(USERNAME); optionalOptions.add(PASSWORD); + optionalOptions.add(SHARDING_KEY); return optionalOptions; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java index b2629040059..f3f83a7cd4e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java @@ -98,6 +98,14 @@ public class ElasticsearchDataSinkOptions { .noDefaultValue() .withDescription("The password for Elasticsearch authentication."); + /** The sharding for Elasticsearch index, default sink table name is test_table_${suffix}. */ + public static final ConfigOption SHARDING_KEY = + ConfigOptions.key("sharding.suffix.key") + .stringType() + .defaultValue("") + .withDescription( + "Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Tables are separated by ';'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'"); + private ElasticsearchDataSinkOptions() { // This class should not be instantiated } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java new file mode 100644 index 00000000000..8cb68170848 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one或多个 + * contributor license agreements. See the NOTICE file distributed with + * this work for additional信息 regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express或 implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.data.ZonedTimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.connectors.elasticsearch.serializer.ElasticsearchEventSerializer; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; + +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ElasticsearchEventSerializer}. */ +public class ElasticsearchEventSerializerTest { + TableId tableId = TableId.tableId("test"); + + @Test + void testTableShardingWithString() { + HashMap shardingKey = new HashMap<>(); + shardingKey.put(tableId, "col1"); + String index = getShardingString(shardingKey); + assertThat(index).isEqualTo("testc-10"); + } + + @Test + void testTableShardingWithInteger() { + HashMap shardingKey = new HashMap<>(); + shardingKey.put(tableId, "id"); + String index = getShardingString(shardingKey); + assertThat(index).isEqualTo("test110"); + } + + @Test + void testTableShardingWithNull() { + HashMap shardingKey = new HashMap<>(); + shardingKey.put(tableId, "col2"); + String index = getShardingString(shardingKey); + assertThat(index).isEqualTo("test"); + } + + private String getShardingString(Map shardingKey) { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(255).notNull(), + DataTypes.FLOAT(), + DataTypes.VARCHAR(45), + DataTypes.VARCHAR(55), + DataTypes.TIMESTAMP_TZ() + }, + new String[] {"id", "name", "weight", "col1", "col2", "create_time"}); + BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); + DataChangeEvent event = + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 110, + BinaryStringData.fromString("scooter"), + 5.5f, + BinaryStringData.fromString("c-10"), + null, + ZonedTimestampData.fromZonedDateTime( + LocalDateTime.of(2023, 11, 11, 11, 11, 11, 11) + .atZone(ZoneId.systemDefault())) + })); + ElasticsearchEventSerializer serializer = + new ElasticsearchEventSerializer(ZoneId.of("UTC"), shardingKey); + Schema tableSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("weight", DataTypes.FLOAT()) + .physicalColumn("col1", DataTypes.VARCHAR(45)) + .physicalColumn("col2", DataTypes.VARCHAR(55)) + .physicalColumn("create_time", DataTypes.TIMESTAMP_TZ()) + .build(); + + serializer.apply(new CreateTableEvent(tableId, tableSchema), new MockContext()); + return serializer.apply(event, new MockContext())._toBulkOperation().index().index(); + } + + class MockContext implements SinkWriter.Context { + + @Override + public long currentWatermark() { + return 0; + } + + @Override + public Long timestamp() { + return null; + } + } +} From 37e1c54fc12a43d967dd988546a10f1474146385 Mon Sep 17 00:00:00 2001 From: wangjunbo Date: Mon, 20 Jan 2025 16:56:59 +0800 Subject: [PATCH 2/6] support use partition column as sharding suffix key --- .../pipeline-connectors/elasticsearch.md | 2 +- .../pipeline-connectors/elasticsearch.md | 2 +- .../ElasticsearchEventSerializer.java | 12 +++++---- .../sink/ElasticsearchDataSinkOptions.java | 2 +- .../ElasticsearchEventSerializerTest.java | 26 ++++++++++++++++--- 5 files changed, 33 insertions(+), 11 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md index 2a397f59817..ce3f57b458a 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md @@ -169,7 +169,7 @@ Pipeline Connector Options optional (none) Long - 每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认sink表名为test_table${suffix_key}。表之间用';'分隔。例如,我们设置sharding.suffix.key为'table1:col1;table2:col2'。 + 每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认sink表名为test_table${suffix_key}。默认分片字段为第一个分区列。表之间用';'分隔。表和字段之间用‘:’分割。例如,我们设置sharding.suffix.key为'table1:col1;table2:col2'。 diff --git a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md index db5fb952a4a..e8a3e9afefb 100644 --- a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md +++ b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md @@ -169,7 +169,7 @@ Pipeline Connector Options optional (none) Long - Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Tables are separated by ';'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'. + Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by ':'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'. diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java index a55430f60cf..ca642cf7f3b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java @@ -153,7 +153,7 @@ private BulkOperationVariant createBulkOperationVariant(DataChangeEvent event) case UPDATE: valueMap = serializeRecord(tableId, event.after(), schema, pipelineZoneId); return new IndexOperation.Builder<>() - .index(tableSharding(tableId, valueMap)) + .index(tableSharding(tableId, schema, valueMap)) .id(id) .document(valueMap) .build(); @@ -164,12 +164,14 @@ private BulkOperationVariant createBulkOperationVariant(DataChangeEvent event) } } - public String tableSharding(TableId tableId, Map valueMap) { + public String tableSharding(TableId tableId, Schema schema, Map valueMap) { + Object value = null; if (shardingKey.containsKey(tableId)) { - Object value = valueMap.get(shardingKey.get(tableId)); - return value != null ? tableId.toString() + value : tableId.toString(); + value = valueMap.get(shardingKey.get(tableId)); + } else if (!schema.partitionKeys().isEmpty()) { + value = valueMap.get(schema.partitionKeys().get(0)); } - return tableId.toString(); + return value != null ? tableId.toString() + value : tableId.toString(); } private Object[] generateUniqueId(RecordData recordData, Schema schema, TableId tableId) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java index f3f83a7cd4e..03e73be18d5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java @@ -104,7 +104,7 @@ public class ElasticsearchDataSinkOptions { .stringType() .defaultValue("") .withDescription( - "Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Tables are separated by ';'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'"); + "Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by ':'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'"); private ElasticsearchDataSinkOptions() { // This class should not be instantiated diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java index 8cb68170848..cca98a991e8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java @@ -32,8 +32,10 @@ import org.junit.jupiter.api.Test; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -59,6 +61,14 @@ void testTableShardingWithInteger() { assertThat(index).isEqualTo("test110"); } + @Test + void testTableShardingWithDate() { + HashMap shardingKey = new HashMap<>(); + shardingKey.put(tableId, "dt"); + String index = getShardingString(shardingKey); + assertThat(index).isEqualTo("test2025-01-01"); + } + @Test void testTableShardingWithNull() { HashMap shardingKey = new HashMap<>(); @@ -67,6 +77,12 @@ void testTableShardingWithNull() { assertThat(index).isEqualTo("test"); } + @Test + void testTableShardingWithPartitionCol() { + String index = getShardingString(Collections.emptyMap()); + assertThat(index).isEqualTo("test2025-01-01"); + } + private String getShardingString(Map shardingKey) { RowType rowType = RowType.of( @@ -76,9 +92,10 @@ private String getShardingString(Map shardingKey) { DataTypes.FLOAT(), DataTypes.VARCHAR(45), DataTypes.VARCHAR(55), - DataTypes.TIMESTAMP_TZ() + DataTypes.TIMESTAMP_TZ(), + DataTypes.DATE() }, - new String[] {"id", "name", "weight", "col1", "col2", "create_time"}); + new String[] {"id", "name", "weight", "col1", "col2", "create_time", "dt"}); BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); DataChangeEvent event = DataChangeEvent.insertEvent( @@ -92,7 +109,8 @@ private String getShardingString(Map shardingKey) { null, ZonedTimestampData.fromZonedDateTime( LocalDateTime.of(2023, 11, 11, 11, 11, 11, 11) - .atZone(ZoneId.systemDefault())) + .atZone(ZoneId.systemDefault())), + (int) LocalDate.of(2025, 1, 1).toEpochDay() })); ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(ZoneId.of("UTC"), shardingKey); @@ -104,6 +122,8 @@ private String getShardingString(Map shardingKey) { .physicalColumn("col1", DataTypes.VARCHAR(45)) .physicalColumn("col2", DataTypes.VARCHAR(55)) .physicalColumn("create_time", DataTypes.TIMESTAMP_TZ()) + .physicalColumn("dt", DataTypes.DATE()) + .partitionKey("dt") .build(); serializer.apply(new CreateTableEvent(tableId, tableSchema), new MockContext()); From abe4e6e77de2a844333a36ab8a78e4338f372a06 Mon Sep 17 00:00:00 2001 From: wangjunbo Date: Thu, 23 Jan 2025 10:55:05 +0800 Subject: [PATCH 3/6] add sharding separator --- .../config/ElasticsearchSinkOptions.java | 12 +++++- .../ElasticsearchEventSerializer.java | 9 ++-- .../sink/ElasticsearchDataSink.java | 7 +++- .../sink/ElasticsearchDataSinkFactory.java | 34 ++++++++++++--- .../sink/ElasticsearchDataSinkOptions.java | 12 +++++- .../ElasticsearchDataSinkFactoryTest.java | 42 +++++++++++++++++++ .../ElasticsearchEventSerializerTest.java | 22 ++++++---- 7 files changed, 116 insertions(+), 22 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java index b18797a1305..362e3c9e676 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java @@ -41,6 +41,7 @@ public class ElasticsearchSinkOptions implements Serializable { private final String username; private final String password; private final Map shardingKey; + private final String shardingSeparator; /** Constructor for ElasticsearchSinkOptions. */ public ElasticsearchSinkOptions( @@ -65,7 +66,8 @@ public ElasticsearchSinkOptions( version, username, password, - Collections.emptyMap()); + Collections.emptyMap(), + "_"); } public ElasticsearchSinkOptions( @@ -79,7 +81,8 @@ public ElasticsearchSinkOptions( int version, String username, String password, - Map shardingKey) { + Map shardingKey, + String shardingSeparator) { this.maxBatchSize = maxBatchSize; this.maxInFlightRequests = maxInFlightRequests; this.maxBufferedRequests = maxBufferedRequests; @@ -91,6 +94,7 @@ public ElasticsearchSinkOptions( this.username = username; this.password = password; this.shardingKey = shardingKey; + this.shardingSeparator = shardingSeparator; } /** @return the maximum batch size */ @@ -148,4 +152,8 @@ public String getPassword() { public Map getShardingKey() { return shardingKey; } + + public String getShardingSeparator() { + return shardingSeparator; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java index ca642cf7f3b..eb53097c312 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java @@ -72,14 +72,17 @@ public class ElasticsearchEventSerializer implements ElementConverter shardingKey; + private final String shardingSeparator; public ElasticsearchEventSerializer(ZoneId zoneId) { - this(zoneId, Collections.emptyMap()); + this(zoneId, Collections.emptyMap(), "_"); } - public ElasticsearchEventSerializer(ZoneId zoneId, Map shardingKey) { + public ElasticsearchEventSerializer( + ZoneId zoneId, Map shardingKey, String shardingSeparator) { this.pipelineZoneId = zoneId; this.shardingKey = shardingKey; + this.shardingSeparator = shardingSeparator; } @Override @@ -171,7 +174,7 @@ public String tableSharding(TableId tableId, Schema schema, Map } else if (!schema.partitionKeys().isEmpty()) { value = valueMap.get(schema.partitionKeys().get(0)); } - return value != null ? tableId.toString() + value : tableId.toString(); + return value != null ? tableId.toString() + shardingSeparator + value : tableId.toString(); } private Object[] generateUniqueId(RecordData recordData, Schema schema, TableId tableId) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java index 8c3bcbacdb8..b98d3e86109 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java @@ -80,7 +80,8 @@ public EventSinkProvider getEventSinkProvider() { private EventSinkProvider getElasticsearch6SinkProvider() { ElasticsearchEventSerializer serializer = - new ElasticsearchEventSerializer(zoneId, esOptions.getShardingKey()); + new ElasticsearchEventSerializer( + zoneId, esOptions.getShardingKey(), esOptions.getShardingSeparator()); org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts = esOptions.getHosts().stream() .map( @@ -131,7 +132,9 @@ private EventSinkProvider getElasticsearch8SinkProvider() { .setHosts(esOptions.getHosts().toArray(new HttpHost[0])) .setElementConverter( new ElasticsearchEventSerializer( - zoneId, esOptions.getShardingKey())) + zoneId, + esOptions.getShardingKey(), + esOptions.getShardingSeparator())) .setMaxBatchSize(esOptions.getMaxBatchSize()) .setMaxInFlightRequests(esOptions.getMaxInFlightRequests()) .setMaxBufferedRequests(esOptions.getMaxBufferedRequests()) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java index 6369700022e..d9ce926627d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java @@ -48,7 +48,8 @@ import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_RECORD_SIZE_IN_BYTES; import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_TIME_IN_BUFFER_MS; import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.PASSWORD; -import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_KEY; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_KEY; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_SEPARATOR; import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.USERNAME; import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.VERSION; @@ -90,7 +91,8 @@ private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConf String password = cdcConfig.get(PASSWORD); int version = cdcConfig.get(VERSION); Map shardingMaps = new HashMap<>(); - String shardingKey = cdcConfig.get(SHARDING_KEY); + String shardingKey = cdcConfig.get(SHARDING_SUFFIX_KEY); + String shardingSeparator = cdcConfig.get(SHARDING_SUFFIX_SEPARATOR); if (!shardingKey.isEmpty()) { for (String tables : shardingKey.split(";")) { String[] splits = tables.split(":"); @@ -101,10 +103,11 @@ private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConf throw new IllegalArgumentException( String.format( "%s is malformed, please refer to the documents", - SHARDING_KEY.key())); + SHARDING_SUFFIX_KEY.key())); } } } + validateShardingSeparator(shardingSeparator); NetworkConfig networkConfig = new NetworkConfig(hosts, username, password, null, null, null); @@ -119,7 +122,8 @@ private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConf version, username, password, - shardingMaps); + shardingMaps, + shardingSeparator); } private List parseHosts(String hostsStr) { @@ -152,7 +156,8 @@ public Set> optionalOptions() { optionalOptions.add(MAX_RECORD_SIZE_IN_BYTES); optionalOptions.add(USERNAME); optionalOptions.add(PASSWORD); - optionalOptions.add(SHARDING_KEY); + optionalOptions.add(SHARDING_SUFFIX_KEY); + optionalOptions.add(SHARDING_SUFFIX_SEPARATOR); return optionalOptions; } @@ -174,4 +179,23 @@ private void validateRequiredOptions(Configuration configuration) { .collect(Collectors.joining("\n")))); } } + + private void validateShardingSeparator(String separator) { + if (!separator.equals(separator.toLowerCase())) { + throw new ValidationException( + String.format( + "%s is malformed, elasticsearch index only support lowercase.", + SHARDING_SUFFIX_SEPARATOR.key())); + } + + String illegalChars = "\\/*?\"<>| ,#"; + for (char c : illegalChars.toCharArray()) { + if (separator.indexOf(c) != -1) { + throw new ValidationException( + String.format( + "%s is malformed, elasticsearch index cannot include \\, /, *, ?, \", <, >, |, ` ` (space character), ,, #", + SHARDING_SUFFIX_SEPARATOR.key())); + } + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java index 03e73be18d5..213b446c8d7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java @@ -99,12 +99,20 @@ public class ElasticsearchDataSinkOptions { .withDescription("The password for Elasticsearch authentication."); /** The sharding for Elasticsearch index, default sink table name is test_table_${suffix}. */ - public static final ConfigOption SHARDING_KEY = + public static final ConfigOption SHARDING_SUFFIX_KEY = ConfigOptions.key("sharding.suffix.key") .stringType() .defaultValue("") .withDescription( - "Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by ':'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'"); + "Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by '$'.For example, we can set sharding.suffix.key by 'table1$col1;table2$col2'"); + + /** The sharding for Elasticsearch index, default sink table name is test_table_${suffix}. */ + public static final ConfigOption SHARDING_SUFFIX_SEPARATOR = + ConfigOptions.key("sharding.suffix.separator") + .stringType() + .defaultValue("_") + .withDescription( + "Separator for sharding suffix in table names, allow defining the separator between table name and sharding suffix. Default value is '_'. For example, if set to '-', the default table name would be test_table-${suffix}"); private ElasticsearchDataSinkOptions() { // This class should not be instantiated diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java index 32835349d82..90e3f4bb75a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java @@ -30,6 +30,8 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -128,8 +130,48 @@ void testPrefixedRequiredOption() { Assertions.assertThat(dataSink).isInstanceOf(ElasticsearchDataSink.class); } + /** + * Test the `validateShardingSeparator` method with illegal sharding separators. This test + * checks two scenarios: 1. Separators containing illegal characters. 2. A separator with + * uppercase letters. + */ + @Test + void testIllegalShardingSeparator() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + ElasticsearchDataSinkFactory sinkFactory = + (ElasticsearchDataSinkFactory) getElasticsearchDataSinkFactory(); + Method method = + ElasticsearchDataSinkFactory.class.getDeclaredMethod( + "validateShardingSeparator", String.class); + method.setAccessible(true); + + // Test an array of invalid separators with illegal characters + String[] invalidSeparators = {"*", " ", ">", "<", "|", "?", "\"", ",", "#", "\\"}; + for (String invalidSeparator : invalidSeparators) { + Throwable thrown = + Assertions.catchThrowable(() -> method.invoke(sinkFactory, invalidSeparator)); + Assertions.assertThat(extractException(thrown)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "sharding.suffix.separator is malformed, elasticsearch index cannot include \\, /, *, ?, \", <, >, |, ` ` (space character), ,, #"); + } + + // Test a separator with uppercase letters + Throwable thrown = Assertions.catchThrowable(() -> method.invoke(sinkFactory, "_TEST")); + Assertions.assertThat(extractException(thrown)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "sharding.suffix.separator is malformed, elasticsearch index only support lowercase."); + } + // Helper methods + /** Helper method to extract the actual cause from an InvocationTargetException. */ + private Throwable extractException(Throwable ex) { + Assertions.assertThat(ex).isInstanceOf(InvocationTargetException.class); + return ex.getCause(); + } + private DataSinkFactory getElasticsearchDataSinkFactory() { DataSinkFactory sinkFactory = FactoryDiscoveryUtils.getFactoryByIdentifier( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java index cca98a991e8..875fc8091ca 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java @@ -49,7 +49,7 @@ public class ElasticsearchEventSerializerTest { void testTableShardingWithString() { HashMap shardingKey = new HashMap<>(); shardingKey.put(tableId, "col1"); - String index = getShardingString(shardingKey); + String index = getShardingString(shardingKey, ""); assertThat(index).isEqualTo("testc-10"); } @@ -57,7 +57,7 @@ void testTableShardingWithString() { void testTableShardingWithInteger() { HashMap shardingKey = new HashMap<>(); shardingKey.put(tableId, "id"); - String index = getShardingString(shardingKey); + String index = getShardingString(shardingKey, ""); assertThat(index).isEqualTo("test110"); } @@ -65,7 +65,7 @@ void testTableShardingWithInteger() { void testTableShardingWithDate() { HashMap shardingKey = new HashMap<>(); shardingKey.put(tableId, "dt"); - String index = getShardingString(shardingKey); + String index = getShardingString(shardingKey, ""); assertThat(index).isEqualTo("test2025-01-01"); } @@ -73,17 +73,23 @@ void testTableShardingWithDate() { void testTableShardingWithNull() { HashMap shardingKey = new HashMap<>(); shardingKey.put(tableId, "col2"); - String index = getShardingString(shardingKey); + String index = getShardingString(shardingKey, ""); assertThat(index).isEqualTo("test"); } @Test void testTableShardingWithPartitionCol() { - String index = getShardingString(Collections.emptyMap()); - assertThat(index).isEqualTo("test2025-01-01"); + String index = getShardingString(Collections.emptyMap(), "_"); + assertThat(index).isEqualTo("test_2025-01-01"); + } + + @Test + void testTableShardingWithSeparator() { + String index = getShardingString(Collections.emptyMap(), "$"); + assertThat(index).isEqualTo("test$2025-01-01"); } - private String getShardingString(Map shardingKey) { + private String getShardingString(Map shardingKey, String shardingSeparator) { RowType rowType = RowType.of( new DataType[] { @@ -113,7 +119,7 @@ private String getShardingString(Map shardingKey) { (int) LocalDate.of(2025, 1, 1).toEpochDay() })); ElasticsearchEventSerializer serializer = - new ElasticsearchEventSerializer(ZoneId.of("UTC"), shardingKey); + new ElasticsearchEventSerializer(ZoneId.of("UTC"), shardingKey, shardingSeparator); Schema tableSchema = Schema.newBuilder() .physicalColumn("id", DataTypes.INT().notNull()) From 92f6efb11167b42ee4e0f2f1ae8f8030058f7cfc Mon Sep 17 00:00:00 2001 From: wangjunbo Date: Thu, 23 Jan 2025 15:44:21 +0800 Subject: [PATCH 4/6] add doc and fix some minor errors --- .../docs/connectors/pipeline-connectors/elasticsearch.md | 7 +++++++ .../docs/connectors/pipeline-connectors/elasticsearch.md | 7 +++++++ .../elasticsearch/config/ElasticsearchSinkOptions.java | 4 +++- .../serializer/ElasticsearchEventSerializer.java | 4 +++- .../elasticsearch/sink/ElasticsearchDataSinkFactory.java | 4 ++-- .../sink/ElasticsearchEventSerializerTest.java | 6 +++--- 6 files changed, 25 insertions(+), 7 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md index ce3f57b458a..61532812b31 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md @@ -171,6 +171,13 @@ Pipeline Connector Options Long 每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认sink表名为test_table${suffix_key}。默认分片字段为第一个分区列。表之间用';'分隔。表和字段之间用‘:’分割。例如,我们设置sharding.suffix.key为'table1:col1;table2:col2'。 + + sharding.suffix.separator + optional + `_` + String + 用于分割表名称和分片后缀的分隔符。默认是 '_'。如果设置为 '-',那么表名称会是 `test_table-${suffix}`。 + diff --git a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md index e8a3e9afefb..6cd458acc28 100644 --- a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md +++ b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md @@ -171,6 +171,13 @@ Pipeline Connector Options Long Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by ':'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'. + + sharding.suffix.separator + optional + `_` + String + Separator for sharding suffix in table names, allow defining the separator between table name and sharding suffix. Default value is '_'. For example, if set to '-', the default table name would be `test_table-${suffix}` + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java index 362e3c9e676..4ea58ab3517 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_SEPARATOR; + /** Elasticsearch DataSink Options reference {@link ElasticsearchSinkOptions}. */ public class ElasticsearchSinkOptions implements Serializable { @@ -67,7 +69,7 @@ public ElasticsearchSinkOptions( username, password, Collections.emptyMap(), - "_"); + SHARDING_SUFFIX_SEPARATOR.defaultValue()); } public ElasticsearchSinkOptions( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java index eb53097c312..800c0f359f7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java @@ -53,6 +53,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_SEPARATOR; + /** A serializer for Event to BulkOperationVariant. */ public class ElasticsearchEventSerializer implements ElementConverter { private final ObjectMapper objectMapper = new ObjectMapper(); @@ -75,7 +77,7 @@ public class ElasticsearchEventSerializer implements ElementConverter| ,#"; @Override public DataSink createDataSink(Context context) { @@ -188,8 +189,7 @@ private void validateShardingSeparator(String separator) { SHARDING_SUFFIX_SEPARATOR.key())); } - String illegalChars = "\\/*?\"<>| ,#"; - for (char c : illegalChars.toCharArray()) { + for (char c : ES_INDEX_ILLEGAL_CHARS.toCharArray()) { if (separator.indexOf(c) != -1) { throw new ValidationException( String.format( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java index 875fc8091ca..43241ca4b4c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java @@ -1,7 +1,7 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one或多个 + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with - * this work for additional信息 regarding copyright ownership. + * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -10,7 +10,7 @@ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express或 implied. + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ From f407c99ec1b503f46c06dc83087e608634da0e91 Mon Sep 17 00:00:00 2001 From: wangjunbo Date: Thu, 23 Jan 2025 15:48:36 +0800 Subject: [PATCH 5/6] format doc description --- .../docs/connectors/pipeline-connectors/elasticsearch.md | 4 ++-- .../docs/connectors/pipeline-connectors/elasticsearch.md | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md index 61532812b31..2efe0a8960a 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md @@ -169,12 +169,12 @@ Pipeline Connector Options optional (none) Long - 每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认sink表名为test_table${suffix_key}。默认分片字段为第一个分区列。表之间用';'分隔。表和字段之间用‘:’分割。例如,我们设置sharding.suffix.key为'table1:col1;table2:col2'。 + 每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认sink表名为test_table${suffix_key}。默认分片字段为第一个分区列。表之间用';'分隔。表和字段之间用':'分割。例如,我们设置sharding.suffix.key为'table1:col1;table2:col2'。 sharding.suffix.separator optional - `_` + "_" String 用于分割表名称和分片后缀的分隔符。默认是 '_'。如果设置为 '-',那么表名称会是 `test_table-${suffix}`。 diff --git a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md index 6cd458acc28..b6dc47f8016 100644 --- a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md +++ b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md @@ -169,14 +169,14 @@ Pipeline Connector Options optional (none) Long - Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by ':'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'. + Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is 'test_table${suffix_key}'.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by ':'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'. sharding.suffix.separator optional - `_` + "_" String - Separator for sharding suffix in table names, allow defining the separator between table name and sharding suffix. Default value is '_'. For example, if set to '-', the default table name would be `test_table-${suffix}` + Separator for sharding suffix in table names, allow defining the separator between table name and sharding suffix. Default value is '_'. For example, if set to '-', the default table name would be 'test_table-${suffix}' From b51f6c8c6ee059971640f1f631c53fd236c36d4d Mon Sep 17 00:00:00 2001 From: wangjunbo Date: Thu, 23 Jan 2025 15:51:22 +0800 Subject: [PATCH 6/6] format doc description --- .../docs/connectors/pipeline-connectors/elasticsearch.md | 4 ++-- .../docs/connectors/pipeline-connectors/elasticsearch.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md index 2efe0a8960a..0b260c7613e 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md @@ -169,14 +169,14 @@ Pipeline Connector Options optional (none) Long - 每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认sink表名为test_table${suffix_key}。默认分片字段为第一个分区列。表之间用';'分隔。表和字段之间用':'分割。例如,我们设置sharding.suffix.key为'table1:col1;table2:col2'。 + 每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认 sink 表名为 test_table${suffix_key}。默认分片字段为第一个分区列。表之间用';'分隔。表和字段之间用':'分割。例如,我们设置 sharding.suffix.key 为'table1:col1;table2:col2'。 sharding.suffix.separator optional "_" String - 用于分割表名称和分片后缀的分隔符。默认是 '_'。如果设置为 '-',那么表名称会是 `test_table-${suffix}`。 + 用于分割表名称和分片后缀的分隔符。默认是 '_'。如果设置为 '-',那么表名称会是 test_table-${suffix}。 diff --git a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md index b6dc47f8016..3f3d693fee8 100644 --- a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md +++ b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md @@ -169,14 +169,14 @@ Pipeline Connector Options optional (none) Long - Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is 'test_table${suffix_key}'.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by ':'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'. + Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by ':'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'. sharding.suffix.separator optional "_" String - Separator for sharding suffix in table names, allow defining the separator between table name and sharding suffix. Default value is '_'. For example, if set to '-', the default table name would be 'test_table-${suffix}' + Separator for sharding suffix in table names, allow defining the separator between table name and sharding suffix. Default value is '_'. For example, if set to '-', the default table name would be test_table-${suffix}