Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ Pipeline Connector Options
<td>Long</td>
<td>单个记录的最大大小(以byte为单位)。</td>
</tr>
<tr>
<td>sharding.suffix.key</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>每个表的分片后缀字段,允许为多个表设置分片后缀字段。默认 sink 表名为 test_table${suffix_key}。默认分片字段为第一个分区列。表之间用';'分隔。表和字段之间用':'分割。例如,我们设置 sharding.suffix.key 为'table1:col1;table2:col2'。</td>
</tr>
<tr>
<td>sharding.suffix.separator</td>
<td>optional</td>
<td style="word-wrap: break-word;">"_"</td>
<td>String</td>
<td>用于分割表名称和分片后缀的分隔符。默认是 '_'。如果设置为 '-',那么表名称会是 test_table-${suffix}。</td>
</tr>
Comment thread
beryllw marked this conversation as resolved.
</tbody>
</table>
</div>
Expand Down
14 changes: 14 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ Pipeline Connector Options
<td>Long</td>
<td>The maximum size of a single record in bytes.</td>
</tr>
<tr>
<td>sharding.suffix.key</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by ':'.For example, we can set sharding.suffix.key by 'table1:col1;table2:col2'.</td>

@lvyanquan lvyanquan Jan 22, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making the separator configurable with a default value of $

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be '_' is better?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

</tr>
<tr>
<td>sharding.suffix.separator</td>
<td>optional</td>
<td style="word-wrap: break-word;">"_"</td>
<td>String</td>
<td>Separator for sharding suffix in table names, allow defining the separator between table name and sharding suffix. Default value is '_'. For example, if set to '-', the default table name would be test_table-${suffix}</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package org.apache.flink.cdc.connectors.elasticsearch.config;

import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;

import org.apache.http.HttpHost;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_SEPARATOR;

/** Elasticsearch DataSink Options reference {@link ElasticsearchSinkOptions}. */
public class ElasticsearchSinkOptions implements Serializable {
Expand All @@ -37,6 +42,8 @@ public class ElasticsearchSinkOptions implements Serializable {
private final int version;
private final String username;
private final String password;
private final Map<TableId, String> shardingKey;
private final String shardingSeparator;

/** Constructor for ElasticsearchSinkOptions. */
public ElasticsearchSinkOptions(
Expand All @@ -50,6 +57,34 @@ public ElasticsearchSinkOptions(
int version,
String username,
String password) {
this(
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes,
networkConfig,
version,
username,
password,
Collections.emptyMap(),
SHARDING_SUFFIX_SEPARATOR.defaultValue());
}

public ElasticsearchSinkOptions(
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
NetworkConfig networkConfig,
int version,
String username,
String password,
Map<TableId, String> shardingKey,
String shardingSeparator) {
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
Expand All @@ -60,6 +95,8 @@ public ElasticsearchSinkOptions(
this.version = version;
this.username = username;
this.password = password;
this.shardingKey = shardingKey;
this.shardingSeparator = shardingSeparator;
}

/** @return the maximum batch size */
Expand Down Expand Up @@ -113,4 +150,12 @@ public String getUsername() {
public String getPassword() {
return password;
}

public Map<TableId, String> getShardingKey() {
return shardingKey;
}

public String getShardingSeparator() {
return shardingSeparator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_SEPARATOR;

/** A serializer for Event to BulkOperationVariant. */
public class ElasticsearchEventSerializer implements ElementConverter<Event, BulkOperationVariant> {
private final ObjectMapper objectMapper = new ObjectMapper();
Expand All @@ -70,8 +73,18 @@ public class ElasticsearchEventSerializer implements ElementConverter<Event, Bul
/** ZoneId from pipeline config to support timestamp with local time zone. */
private final ZoneId pipelineZoneId;

private final Map<TableId, String> shardingKey;
private final String shardingSeparator;

public ElasticsearchEventSerializer(ZoneId zoneId) {
this(zoneId, Collections.emptyMap(), SHARDING_SUFFIX_SEPARATOR.defaultValue());
}

public ElasticsearchEventSerializer(
ZoneId zoneId, Map<TableId, String> shardingKey, String shardingSeparator) {
this.pipelineZoneId = zoneId;
this.shardingKey = shardingKey;
this.shardingSeparator = shardingSeparator;
}

@Override
Expand Down Expand Up @@ -145,7 +158,7 @@ private BulkOperationVariant createBulkOperationVariant(DataChangeEvent event)
case UPDATE:
valueMap = serializeRecord(tableId, event.after(), schema, pipelineZoneId);
return new IndexOperation.Builder<>()
.index(tableId.toString())
.index(tableSharding(tableId, schema, valueMap))
.id(id)
.document(valueMap)
.build();
Expand All @@ -156,6 +169,16 @@ private BulkOperationVariant createBulkOperationVariant(DataChangeEvent event)
}
}

public String tableSharding(TableId tableId, Schema schema, Map<String, Object> valueMap) {
Object value = null;
if (shardingKey.containsKey(tableId)) {
Comment thread
beryllw marked this conversation as resolved.
value = valueMap.get(shardingKey.get(tableId));
} else if (!schema.partitionKeys().isEmpty()) {
value = valueMap.get(schema.partitionKeys().get(0));
}
return value != null ? tableId.toString() + shardingSeparator + value : tableId.toString();
}

private Object[] generateUniqueId(RecordData recordData, Schema schema, TableId tableId) {
List<String> primaryKeys = schema.primaryKeys();
List<ElasticsearchRowConverter.SerializationConverter> converters =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ public EventSinkProvider getEventSinkProvider() {
}

private EventSinkProvider getElasticsearch6SinkProvider() {
ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(zoneId);
ElasticsearchEventSerializer serializer =
new ElasticsearchEventSerializer(
zoneId, esOptions.getShardingKey(), esOptions.getShardingSeparator());
org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts =
esOptions.getHosts().stream()
.map(
Expand Down Expand Up @@ -128,7 +130,11 @@ private EventSinkProvider getElasticsearch8SinkProvider() {
Elasticsearch8AsyncSinkBuilder<Event> sinkBuilder =
new Elasticsearch8AsyncSinkBuilder<Event>()
.setHosts(esOptions.getHosts().toArray(new HttpHost[0]))
.setElementConverter(new ElasticsearchEventSerializer(zoneId))
.setElementConverter(
new ElasticsearchEventSerializer(
zoneId,
esOptions.getShardingKey(),
esOptions.getShardingSeparator()))
.setMaxBatchSize(esOptions.getMaxBatchSize())
.setMaxInFlightRequests(esOptions.getMaxInFlightRequests())
.setMaxBufferedRequests(esOptions.getMaxBufferedRequests())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
Expand All @@ -31,8 +32,10 @@

import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -45,13 +48,16 @@
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_RECORD_SIZE_IN_BYTES;
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_TIME_IN_BUFFER_MS;
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_KEY;
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.SHARDING_SUFFIX_SEPARATOR;
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.USERNAME;
import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.VERSION;

/** Factory for creating {@link ElasticsearchDataSink}. */
public class ElasticsearchDataSinkFactory implements DataSinkFactory {

public static final String IDENTIFIER = "elasticsearch";
private static final String ES_INDEX_ILLEGAL_CHARS = "\\/*?\"<>| ,#";

@Override
public DataSink createDataSink(Context context) {
Expand Down Expand Up @@ -85,6 +91,25 @@ private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConf
String username = cdcConfig.get(USERNAME);
String password = cdcConfig.get(PASSWORD);
int version = cdcConfig.get(VERSION);
Map<TableId, String> shardingMaps = new HashMap<>();
String shardingKey = cdcConfig.get(SHARDING_SUFFIX_KEY);
String shardingSeparator = cdcConfig.get(SHARDING_SUFFIX_SEPARATOR);
if (!shardingKey.isEmpty()) {
for (String tables : shardingKey.split(";")) {
String[] splits = tables.split(":");
if (splits.length == 2) {
TableId tableId = TableId.parse(splits[0]);
shardingMaps.put(tableId, splits[1].trim());
} else {
throw new IllegalArgumentException(
String.format(
"%s is malformed, please refer to the documents",
SHARDING_SUFFIX_KEY.key()));
}
}
}
validateShardingSeparator(shardingSeparator);

NetworkConfig networkConfig =
new NetworkConfig(hosts, username, password, null, null, null);
return new ElasticsearchSinkOptions(
Expand All @@ -97,7 +122,9 @@ private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConf
networkConfig,
version,
username,
password);
password,
shardingMaps,
shardingSeparator);
}

private List<HttpHost> parseHosts(String hostsStr) {
Expand Down Expand Up @@ -130,6 +157,8 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(MAX_RECORD_SIZE_IN_BYTES);
optionalOptions.add(USERNAME);
optionalOptions.add(PASSWORD);
optionalOptions.add(SHARDING_SUFFIX_KEY);
optionalOptions.add(SHARDING_SUFFIX_SEPARATOR);
return optionalOptions;
}

Expand All @@ -151,4 +180,22 @@ private void validateRequiredOptions(Configuration configuration) {
.collect(Collectors.joining("\n"))));
}
}

private void validateShardingSeparator(String separator) {
if (!separator.equals(separator.toLowerCase())) {
throw new ValidationException(
String.format(
"%s is malformed, elasticsearch index only support lowercase.",
SHARDING_SUFFIX_SEPARATOR.key()));
}

for (char c : ES_INDEX_ILLEGAL_CHARS.toCharArray()) {
if (separator.indexOf(c) != -1) {
throw new ValidationException(
String.format(
"%s is malformed, elasticsearch index cannot include \\, /, *, ?, \", <, >, |, ` ` (space character), ,, #",
SHARDING_SUFFIX_SEPARATOR.key()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ public class ElasticsearchDataSinkOptions {
.noDefaultValue()
.withDescription("The password for Elasticsearch authentication.");

/** The sharding for Elasticsearch index, default sink table name is test_table_${suffix}. */
public static final ConfigOption<String> SHARDING_SUFFIX_KEY =
ConfigOptions.key("sharding.suffix.key")
.stringType()
.defaultValue("")
.withDescription(
"Sharding suffix key for each table, allow setting sharding suffix key for multiTables.Default sink table name is test_table${suffix_key}.Default sharding column is first partition column.Tables are separated by ';'.Table and column are separated by '$'.For example, we can set sharding.suffix.key by 'table1$col1;table2$col2'");

/** The sharding for Elasticsearch index, default sink table name is test_table_${suffix}. */
public static final ConfigOption<String> SHARDING_SUFFIX_SEPARATOR =
ConfigOptions.key("sharding.suffix.separator")
.stringType()
.defaultValue("_")
.withDescription(
"Separator for sharding suffix in table names, allow defining the separator between table name and sharding suffix. Default value is '_'. For example, if set to '-', the default table name would be test_table-${suffix}");

private ElasticsearchDataSinkOptions() {
// This class should not be instantiated
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,8 +130,48 @@ void testPrefixedRequiredOption() {
Assertions.assertThat(dataSink).isInstanceOf(ElasticsearchDataSink.class);
}

/**
* Test the `validateShardingSeparator` method with illegal sharding separators. This test
* checks two scenarios: 1. Separators containing illegal characters. 2. A separator with
* uppercase letters.
*/
@Test
void testIllegalShardingSeparator()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
ElasticsearchDataSinkFactory sinkFactory =
(ElasticsearchDataSinkFactory) getElasticsearchDataSinkFactory();
Method method =
ElasticsearchDataSinkFactory.class.getDeclaredMethod(
"validateShardingSeparator", String.class);
method.setAccessible(true);

// Test an array of invalid separators with illegal characters
String[] invalidSeparators = {"*", " ", ">", "<", "|", "?", "\"", ",", "#", "\\"};
for (String invalidSeparator : invalidSeparators) {
Throwable thrown =
Assertions.catchThrowable(() -> method.invoke(sinkFactory, invalidSeparator));
Assertions.assertThat(extractException(thrown))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"sharding.suffix.separator is malformed, elasticsearch index cannot include \\, /, *, ?, \", <, >, |, ` ` (space character), ,, #");
}

// Test a separator with uppercase letters
Throwable thrown = Assertions.catchThrowable(() -> method.invoke(sinkFactory, "_TEST"));
Assertions.assertThat(extractException(thrown))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"sharding.suffix.separator is malformed, elasticsearch index only support lowercase.");
}

// Helper methods

/** Helper method to extract the actual cause from an InvocationTargetException. */
private Throwable extractException(Throwable ex) {
Assertions.assertThat(ex).isInstanceOf(InvocationTargetException.class);
return ex.getCause();
}

private DataSinkFactory getElasticsearchDataSinkFactory() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
Expand Down
Loading