Skip to content

Commit c775478

Browse files
whheagapple
andauthored
set backtick by DbType (#3984)
Co-authored-by: agapple <jianghang.loujh@alibaba-inc.com>
1 parent bd1f91c commit c775478

File tree

5 files changed

+77
-30
lines changed

5 files changed

+77
-30
lines changed

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.slf4j.LoggerFactory;
1515

1616
import com.alibaba.druid.pool.DruidDataSource;
17+
import com.alibaba.druid.util.JdbcUtils;
1718
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
1819
import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
1920
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
@@ -73,6 +74,11 @@ public Map<String, MirrorDbConfig> getMirrorDbConfigCache() {
7374
@Override
7475
public void init(OuterAdapterConfig configuration, Properties envProperties) {
7576
this.envProperties = envProperties;
77+
78+
// 从jdbc url获取db类型
79+
Map<String, String> properties = configuration.getProperties();
80+
String dbType = JdbcUtils.getDbType(properties.get("jdbc.url"), null);
81+
7682
Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load(envProperties);
7783
// 过滤不匹配的key的配置
7884
rdbMappingTmp.forEach((key, mappingConfig) -> {
@@ -112,7 +118,6 @@ public void init(OuterAdapterConfig configuration, Properties envProperties) {
112118
}
113119

114120
// 初始化连接池
115-
Map<String, String> properties = configuration.getProperties();
116121
dataSource = new DruidDataSource();
117122
dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
118123
dataSource.setUrl(properties.get("jdbc.url"));
@@ -125,6 +130,8 @@ public void init(OuterAdapterConfig configuration, Properties envProperties) {
125130
dataSource.setTimeBetweenEvictionRunsMillis(60000);
126131
dataSource.setMinEvictableIdleTimeMillis(300000);
127132
dataSource.setUseUnfairLock(true);
133+
dataSource.setDbType(dbType);
134+
128135
// List<String> array = new ArrayList<>();
129136
// array.add("set names utf8mb4;");
130137
// dataSource.setConnectionInitSqls(array);
@@ -226,7 +233,7 @@ public EtlResult etl(String task, List<String> params) {
226233
public Map<String, Object> count(String task) {
227234
MappingConfig config = rdbMapping.get(task);
228235
MappingConfig.DbMapping dbMapping = config.getDbMapping();
229-
String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.getDbTableName(dbMapping);
236+
String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType());
230237
Connection conn = null;
231238
Map<String, Object> res = new LinkedHashMap<>();
232239
try {
@@ -252,7 +259,7 @@ public Map<String, Object> count(String task) {
252259
}
253260
}
254261
}
255-
res.put("targetTable", SyncUtil.getDbTableName(dbMapping));
262+
res.put("targetTable", SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()));
256263

257264
return res;
258265
}

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import javax.sql.DataSource;
1515

16+
import com.alibaba.druid.pool.DruidDataSource;
1617
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
1718
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
1819
import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
@@ -56,8 +57,11 @@ protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> va
5657
DbMapping dbMapping = (DbMapping) mapping;
5758
Map<String, String> columnsMap = new LinkedHashMap<>();
5859
Map<String, Integer> columnType = new LinkedHashMap<>();
60+
DruidDataSource dataSource = (DruidDataSource) srcDS;
5961

60-
Util.sqlRS(targetDS, "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " LIMIT 1 ", rs -> {
62+
Util.sqlRS(targetDS,
63+
"SELECT * FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " LIMIT 1 ",
64+
rs -> {
6165
try {
6266

6367
ResultSetMetaData rsd = rs.getMetaData();
@@ -83,7 +87,9 @@ protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> va
8387
boolean completed = false;
8488

8589
StringBuilder insertSql = new StringBuilder();
86-
insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
90+
insertSql.append("INSERT INTO ")
91+
.append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()))
92+
.append(" (");
8793
columnsMap
8894
.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
8995

@@ -107,7 +113,7 @@ protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> va
107113
// 删除数据
108114
Map<String, Object> pkVal = new LinkedHashMap<>();
109115
StringBuilder deleteSql = new StringBuilder(
110-
"DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
116+
"DELETE FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " WHERE ");
111117
appendCondition(dbMapping, deleteSql, pkVal, rs);
112118
try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
113119
int k = 1;

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@
77
import java.util.List;
88
import java.util.Map;
99

10-
import javax.sql.DataSource;
11-
1210
import org.apache.commons.lang.StringUtils;
1311
import org.slf4j.Logger;
1412
import org.slf4j.LoggerFactory;
1513

14+
import com.alibaba.druid.pool.DruidDataSource;
1615
import com.alibaba.fastjson2.JSON;
1716
import com.alibaba.fastjson2.JSONWriter.Feature;
1817
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
1918
import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
19+
import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
20+
import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
2021
import com.alibaba.otter.canal.client.adapter.support.Dml;
2122

2223
/**
@@ -30,10 +31,10 @@ public class RdbMirrorDbSyncService {
3031
private static final Logger logger = LoggerFactory.getLogger(RdbMirrorDbSyncService.class);
3132

3233
private Map<String, MirrorDbConfig> mirrorDbConfigCache; // 镜像库配置
33-
private DataSource dataSource;
34+
private DruidDataSource dataSource;
3435
private RdbSyncService rdbSyncService; // rdbSyncService代理
3536

36-
public RdbMirrorDbSyncService(Map<String, MirrorDbConfig> mirrorDbConfigCache, DataSource dataSource,
37+
public RdbMirrorDbSyncService(Map<String, MirrorDbConfig> mirrorDbConfigCache, DruidDataSource dataSource,
3738
Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
3839
boolean skipDupException){
3940
this.mirrorDbConfigCache = mirrorDbConfigCache;
@@ -150,7 +151,13 @@ private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDb
150151
*/
151152
private void executeDdl(MirrorDbConfig mirrorDbConfig, Dml ddl) {
152153
try (Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) {
153-
statement.execute(ddl.getSql());
154+
// 替换反引号
155+
String sql = ddl.getSql();
156+
String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
157+
if (!"`".equals(backtick)) {
158+
sql = sql.replaceAll("`", backtick);
159+
}
160+
statement.execute(sql);
154161
// 移除对应配置
155162
mirrorDbConfig.getTableConfig().remove(ddl.getTable());
156163
if (logger.isTraceEnabled()) {

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
import java.util.concurrent.Future;
1616
import java.util.function.Function;
1717

18-
import javax.sql.DataSource;
19-
2018
import org.apache.commons.lang.StringUtils;
2119
import org.slf4j.Logger;
2220
import org.slf4j.LoggerFactory;
2321

22+
import com.alibaba.druid.pool.DruidDataSource;
2423
import com.alibaba.fastjson2.JSON;
2524
import com.alibaba.fastjson2.JSONWriter.Feature;
2625
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
@@ -41,6 +40,7 @@ public class RdbSyncService {
4140

4241
private static final Logger logger = LoggerFactory.getLogger(RdbSyncService.class);
4342

43+
private DruidDataSource dataSource;
4444
// 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
4545
private Map<String, Map<String, Integer>> columnsTypeCache;
4646

@@ -59,13 +59,14 @@ public Map<String, Map<String, Integer>> getColumnsTypeCache() {
5959
return columnsTypeCache;
6060
}
6161

62-
public RdbSyncService(DataSource dataSource, Integer threads, boolean skipDupException){
62+
public RdbSyncService(DruidDataSource dataSource, Integer threads, boolean skipDupException){
6363
this(dataSource, threads, new ConcurrentHashMap<>(), skipDupException);
6464
}
6565

6666
@SuppressWarnings("unchecked")
67-
public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
67+
public RdbSyncService(DruidDataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
6868
boolean skipDupException){
69+
this.dataSource = dataSource;
6970
this.columnsTypeCache = columnsTypeCache;
7071
this.skipDupException = skipDupException;
7172
try {
@@ -251,15 +252,15 @@ private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml
251252
}
252253

253254
DbMapping dbMapping = config.getDbMapping();
254-
255+
String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
255256
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
256257

257258
StringBuilder insertSql = new StringBuilder();
258-
insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
259+
insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" (");
259260

260-
columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append("`")
261+
columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(backtick)
261262
.append(targetColumnName)
262-
.append("`")
263+
.append(backtick)
263264
.append(","));
264265
int len = insertSql.length();
265266
insertSql.delete(len - 1, len).append(") VALUES (");
@@ -323,13 +324,13 @@ private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml
323324
}
324325

325326
DbMapping dbMapping = config.getDbMapping();
326-
327+
String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
327328
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
328329

329330
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
330331

331332
StringBuilder updateSql = new StringBuilder();
332-
updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
333+
updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" SET ");
333334
List<Map<String, ?>> values = new ArrayList<>();
334335
boolean hasMatched = false;
335336
for (String srcColumnName : old.keySet()) {
@@ -342,7 +343,7 @@ private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml
342343
if (!targetColumnNames.isEmpty()) {
343344
hasMatched = true;
344345
for (String targetColumnName : targetColumnNames) {
345-
updateSql.append("`").append(targetColumnName).append("`").append("=?, ");
346+
updateSql.append(backtick).append(targetColumnName).append(backtick).append("=?, ");
346347
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
347348
if (type == null) {
348349
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
@@ -379,11 +380,10 @@ private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml
379380
}
380381

381382
DbMapping dbMapping = config.getDbMapping();
382-
383383
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
384384

385385
StringBuilder sql = new StringBuilder();
386-
sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
386+
sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" WHERE ");
387387

388388
List<Map<String, ?>> values = new ArrayList<>();
389389
// 拼接主键
@@ -402,7 +402,7 @@ private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml
402402
private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {
403403
DbMapping dbMapping = config.getDbMapping();
404404
StringBuilder sql = new StringBuilder();
405-
sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
405+
sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()));
406406
batchExecutor.execute(sql.toString(), new ArrayList<>());
407407
if (logger.isTraceEnabled()) {
408408
logger.trace("Truncate target table, sql: {}", sql);
@@ -426,7 +426,7 @@ private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig
426426
if (columnType == null) {
427427
columnType = new LinkedHashMap<>();
428428
final Map<String, Integer> columnTypeTmp = columnType;
429-
String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE 1=2";
429+
String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " WHERE 1=2";
430430
Util.sqlRS(conn, sql, rs -> {
431431
try {
432432
ResultSetMetaData rsd = rs.getMetaData();
@@ -455,14 +455,16 @@ private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sq
455455

456456
private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
457457
List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
458+
String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
459+
458460
// 拼接主键
459461
for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
460462
String targetColumnName = entry.getKey();
461463
String srcColumnName = entry.getValue();
462464
if (srcColumnName == null) {
463465
srcColumnName = Util.cleanColumn(targetColumnName);
464466
}
465-
sql.append("`").append(targetColumnName).append("`").append("=? AND ");
467+
sql.append(backtick).append(targetColumnName).append(backtick).append("=? AND ");
466468
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
467469
if (type == null) {
468470
throw new RuntimeException("Target column: " + targetColumnName + " not matched");

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.alibaba.otter.canal.client.adapter.rdb.support;
22

3+
import com.alibaba.druid.DbType;
34
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
45
import com.alibaba.otter.canal.client.adapter.support.Util;
56
import org.apache.commons.lang.StringUtils;
@@ -255,12 +256,36 @@ public static void setPStmt(int type, PreparedStatement pstmt, Object value, int
255256
}
256257
}
257258

258-
public static String getDbTableName(MappingConfig.DbMapping dbMapping) {
259+
public static String getDbTableName(MappingConfig.DbMapping dbMapping, String dbType) {
259260
String result = "";
261+
String backtick = getBacktickByDbType(dbType);
260262
if (StringUtils.isNotEmpty(dbMapping.getTargetDb())) {
261-
result += ("`" + dbMapping.getTargetDb() + "`.");
263+
result += (backtick + dbMapping.getTargetDb() + backtick + ".");
262264
}
263-
result += ("`" + dbMapping.getTargetTable() + "`");
265+
result += (backtick + dbMapping.getTargetTable() + backtick);
264266
return result;
265267
}
268+
269+
/**
270+
* 根据DbType返回反引号或空字符串
271+
*
272+
* @param dbTypeName DbType名称
273+
* @return 反引号或空字符串
274+
*/
275+
public static String getBacktickByDbType(String dbTypeName) {
276+
DbType dbType = DbType.of(dbTypeName);
277+
if (dbType == null) {
278+
dbType = DbType.other;
279+
}
280+
281+
// 只有当dbType为MySQL/MariaDB或OceanBase时返回反引号
282+
switch (dbType) {
283+
case mysql:
284+
case mariadb:
285+
case oceanbase:
286+
return "`";
287+
default:
288+
return "";
289+
}
290+
}
266291
}

0 commit comments

Comments
 (0)