Skip to content

Commit 4612d1d

Browse files
committed
set backtick by DbType
1 parent b54bea5 commit 4612d1d

File tree

5 files changed

+69
-8
lines changed

5 files changed

+69
-8
lines changed

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.slf4j.LoggerFactory;
1515

1616
import com.alibaba.druid.pool.DruidDataSource;
17+
import com.alibaba.druid.util.JdbcUtils;
1718
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
1819
import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
1920
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
@@ -73,12 +74,19 @@ public Map<String, MirrorDbConfig> getMirrorDbConfigCache() {
7374
@Override
7475
public void init(OuterAdapterConfig configuration, Properties envProperties) {
7576
this.envProperties = envProperties;
77+
78+
// 从jdbc url获取db类型
79+
Map<String, String> properties = configuration.getProperties();
80+
String dbType = JdbcUtils.getDbType(properties.get("jdbc.url"), null);
81+
7682
Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load(envProperties);
7783
// 过滤不匹配的key的配置
7884
rdbMappingTmp.forEach((key, mappingConfig) -> {
7985
if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
8086
|| (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
8187
.equalsIgnoreCase(configuration.getKey()))) {
88+
// 在dbMapping中保存db类型
89+
mappingConfig.getDbMapping().setDbType(dbType);
8290
rdbMapping.put(key, mappingConfig);
8391
}
8492
});
@@ -112,7 +120,6 @@ public void init(OuterAdapterConfig configuration, Properties envProperties) {
112120
}
113121

114122
// 初始化连接池
115-
Map<String, String> properties = configuration.getProperties();
116123
dataSource = new DruidDataSource();
117124
dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
118125
dataSource.setUrl(properties.get("jdbc.url"));
@@ -125,6 +132,8 @@ public void init(OuterAdapterConfig configuration, Properties envProperties) {
125132
dataSource.setTimeBetweenEvictionRunsMillis(60000);
126133
dataSource.setMinEvictableIdleTimeMillis(300000);
127134
dataSource.setUseUnfairLock(true);
135+
dataSource.setDbType(dbType);
136+
128137
// List<String> array = new ArrayList<>();
129138
// array.add("set names utf8mb4;");
130139
// dataSource.setConnectionInitSqls(array);

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import org.apache.commons.lang.StringUtils;
77

8+
import com.alibaba.druid.DbType;
89
import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
910

1011
/**
@@ -109,6 +110,8 @@ public static class DbMapping implements AdapterMapping {
109110
private int readBatch = 5000;
110111
private int commitBatch = 5000; // etl等批量提交大小
111112

113+
private String dbType = DbType.mysql.name();
114+
112115
private Map<String, String> allMapColumns;
113116

114117
public boolean getMirrorDb() {
@@ -221,5 +224,13 @@ public Map<String, String> getAllMapColumns() {
221224
public void setAllMapColumns(Map<String, String> allMapColumns) {
222225
this.allMapColumns = allMapColumns;
223226
}
227+
228+
public String getDbType() {
229+
return dbType;
230+
}
231+
232+
public void setDbType(String dbType) {
233+
this.dbType = dbType;
234+
}
224235
}
225236
}

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
1919
import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
2020
import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
21+
import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
2122
import com.alibaba.otter.canal.client.adapter.support.Dml;
2223

2324
/**
@@ -153,7 +154,15 @@ private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDb
153154
*/
154155
private void executeDdl(MirrorDbConfig mirrorDbConfig, Dml ddl) {
155156
try (Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) {
156-
statement.execute(ddl.getSql());
157+
// 替换反引号
158+
String sql = ddl.getSql();
159+
String backtick = SyncUtil.getBacktickByDbType(mirrorDbConfig.getMappingConfig()
160+
.getDbMapping()
161+
.getDbType());
162+
if (!"`".equals(backtick)) {
163+
sql = sql.replaceAll("`", backtick);
164+
}
165+
statement.execute(sql);
157166
// 移除对应配置
158167
mirrorDbConfig.getTableConfig().remove(ddl.getTable());
159168
if (logger.isTraceEnabled()) {

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,15 +241,17 @@ private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml
241241
}
242242

243243
DbMapping dbMapping = config.getDbMapping();
244+
String dbTypeName = dbMapping.getDbType();
245+
String backtick = SyncUtil.getBacktickByDbType(dbTypeName);
244246

245247
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
246248

247249
StringBuilder insertSql = new StringBuilder();
248250
insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
249251

250-
columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append("`")
252+
columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(backtick)
251253
.append(targetColumnName)
252-
.append("`")
254+
.append(backtick)
253255
.append(","));
254256
int len = insertSql.length();
255257
insertSql.delete(len - 1, len).append(") VALUES (");
@@ -313,6 +315,8 @@ private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml
313315
}
314316

315317
DbMapping dbMapping = config.getDbMapping();
318+
String dbTypeName = dbMapping.getDbType();
319+
String backtick = SyncUtil.getBacktickByDbType(dbTypeName);
316320

317321
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
318322

@@ -332,7 +336,7 @@ private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml
332336
if (!targetColumnNames.isEmpty()) {
333337
hasMatched = true;
334338
for (String targetColumnName : targetColumnNames) {
335-
updateSql.append("`").append(targetColumnName).append("`").append("=?, ");
339+
updateSql.append(backtick).append(targetColumnName).append(backtick).append("=?, ");
336340
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
337341
if (type == null) {
338342
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
@@ -445,14 +449,17 @@ private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sq
445449

446450
private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
447451
List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
452+
String dbTypeName = dbMapping.getDbType();
453+
String backtick = SyncUtil.getBacktickByDbType(dbTypeName);
454+
448455
// 拼接主键
449456
for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
450457
String targetColumnName = entry.getKey();
451458
String srcColumnName = entry.getValue();
452459
if (srcColumnName == null) {
453460
srcColumnName = Util.cleanColumn(targetColumnName);
454461
}
455-
sql.append("`").append(targetColumnName).append("`").append("=? AND ");
462+
sql.append(backtick).append(targetColumnName).append(backtick).append("=? AND ");
456463
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
457464
if (type == null) {
458465
throw new RuntimeException("Target column: " + targetColumnName + " not matched");

client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.alibaba.otter.canal.client.adapter.rdb.support;
22

3+
import com.alibaba.druid.DbType;
34
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
45
import com.alibaba.otter.canal.client.adapter.support.Util;
56
import org.apache.commons.lang.StringUtils;
@@ -257,10 +258,34 @@ public static void setPStmt(int type, PreparedStatement pstmt, Object value, int
257258

258259
public static String getDbTableName(MappingConfig.DbMapping dbMapping) {
259260
String result = "";
261+
String backtick = getBacktickByDbType(dbMapping.getDbType());
260262
if (StringUtils.isNotEmpty(dbMapping.getTargetDb())) {
261-
result += ("`" + dbMapping.getTargetDb() + "`.");
263+
result += (backtick + dbMapping.getTargetDb() + backtick + ".");
262264
}
263-
result += ("`" + dbMapping.getTargetTable() + "`");
265+
result += (backtick + dbMapping.getTargetTable() + backtick);
264266
return result;
265267
}
268+
269+
/**
270+
* 根据DbType返回反引号或空字符串
271+
*
272+
* @param dbTypeName DbType名称
273+
* @return 反引号或空字符串
274+
*/
275+
public static String getBacktickByDbType(String dbTypeName) {
276+
DbType dbType = DbType.of(dbTypeName);
277+
if (dbType == null) {
278+
dbType = DbType.other;
279+
}
280+
281+
// 只有当dbType为MySQL/MariaDB或OceanBase时返回反引号
282+
switch (dbType) {
283+
case mysql:
284+
case mariadb:
285+
case oceanbase:
286+
return "`";
287+
default:
288+
return "";
289+
}
290+
}
266291
}

0 commit comments

Comments
 (0)