Skip to content

Commit 0a2254e

Browse files
364102729agapple
andauthored
add tablestore adapter (#3754)
* add tablestore adapter * add tablestore adapter * fix bug add add log when etl fail * fix bug,tinyint(1)以及rowupdatechange Co-authored-by: agapple <jianghang115@gmail.com>
1 parent 40d8403 commit 0a2254e

File tree

17 files changed

+1634
-2
lines changed

17 files changed

+1634
-2
lines changed

client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class CanalClientConfig {
5454
// canal adapters 配置
5555
private List<CanalAdapter> canalAdapters;
5656

57+
private Boolean terminateOnException = false;
58+
5759
public String getCanalServerHost() {
5860
return canalServerHost;
5961
}
@@ -222,6 +224,14 @@ public void setNamespace(String namespace) {
222224
this.namespace = namespace;
223225
}
224226

227+
public Boolean getTerminateOnException() {
228+
return terminateOnException;
229+
}
230+
231+
public void setTerminateOnException(Boolean terminateOnException) {
232+
this.terminateOnException = terminateOnException;
233+
}
234+
225235
public static class CanalAdapter {
226236

227237
private String instance; // 实例名

client-adapter/launcher/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,19 @@
141141
<classifier>jar-with-dependencies</classifier>
142142
<scope>provided</scope>
143143
</dependency>
144+
<dependency>
145+
<groupId>com.alibaba.otter</groupId>
146+
<artifactId>client-adapter.tablestore</artifactId>
147+
<version>${project.version}</version>
148+
<exclusions>
149+
<exclusion>
150+
<artifactId>*</artifactId>
151+
<groupId>*</groupId>
152+
</exclusion>
153+
</exclusions>
154+
<classifier>jar-with-dependencies</classifier>
155+
<scope>provided</scope>
156+
</dependency>
144157
<!-- connector plugin -->
145158
<dependency>
146159
<groupId>com.alibaba.otter</groupId>

client-adapter/launcher/src/main/assembly/dev.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@
6464
<exclude>META-INF/**</exclude>
6565
</excludes>
6666
</fileSet>
67+
<fileSet>
68+
<directory>../tablestore/src/main/resources/</directory>
69+
<outputDirectory>/conf</outputDirectory>
70+
<excludes>
71+
<exclude>META-INF/**</exclude>
72+
</excludes>
73+
</fileSet>
6774
<fileSet>
6875
<directory>target</directory>
6976
<outputDirectory>logs</outputDirectory>

client-adapter/launcher/src/main/assembly/release.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@
6565
<exclude>META-INF/**</exclude>
6666
</excludes>
6767
</fileSet>
68+
<fileSet>
69+
<directory>../tablestore/src/main/resources/</directory>
70+
<outputDirectory>/conf</outputDirectory>
71+
<excludes>
72+
<exclude>META-INF/**</exclude>
73+
</excludes>
74+
</fileSet>
6875
<fileSet>
6976
<directory>target</directory>
7077
<outputDirectory>logs</outputDirectory>

client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AdapterProcessor.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.concurrent.Future;
88
import java.util.concurrent.TimeUnit;
99
import java.util.concurrent.TimeoutException;
10+
import java.util.stream.Collectors;
1011

1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
@@ -217,8 +218,15 @@ private void process() {
217218
canalMsgConsumer.rollback(); // 处理失败, 回滚数据
218219
logger.error(e.getMessage() + " Error sync and rollback, execute times: " + (i + 1));
219220
} else {
220-
canalMsgConsumer.ack();
221-
logger.error(e.getMessage() + " Error sync but ACK!");
221+
if (canalClientConfig.getTerminateOnException()) {
222+
canalMsgConsumer.rollback();
223+
logger.error("Retry fail, turn switch off and abort data transfer.");
224+
syncSwitch.off(canalDestination);
225+
logger.error("finish turn off switch of destination:" + canalDestination);
226+
} else {
227+
canalMsgConsumer.ack();
228+
logger.error(e.getMessage() + " Error sync but ACK!");
229+
}
222230
}
223231
Thread.sleep(500);
224232
}

client-adapter/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
<module>escore</module>
3434
<module>kudu</module>
3535
<module>phoenix</module>
36+
<module>tablestore</module>
3637
</modules>
3738

3839
<licenses>
@@ -251,6 +252,25 @@
251252
<version>1.9.0</version>
252253
<scope>test</scope>
253254
</dependency>
255+
256+
<dependency>
257+
<groupId>com.aliyun.openservices</groupId>
258+
<artifactId>tablestore</artifactId>
259+
<version>5.10.3</version>
260+
<classifier>jar-with-dependencies</classifier>
261+
<exclusions>
262+
<exclusion>
263+
<groupId>com.google.protobuf</groupId>
264+
<artifactId>protobuf-java</artifactId>
265+
</exclusion>
266+
<exclusion>
267+
<groupId>org.apache.httpcomponents</groupId>
268+
<artifactId>httpasyncclient</artifactId>
269+
</exclusion>
270+
</exclusions>
271+
</dependency>
272+
273+
254274
</dependencies>
255275
</dependencyManagement>
256276

client-adapter/tablestore/pom.xml

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<parent>
4+
<artifactId>canal.client-adapter</artifactId>
5+
<groupId>com.alibaba.otter</groupId>
6+
<version>1.1.6-SNAPSHOT</version>
7+
<relativePath>../pom.xml</relativePath>
8+
</parent>
9+
<modelVersion>4.0.0</modelVersion>
10+
<groupId>com.alibaba.otter</groupId>
11+
<artifactId>client-adapter.tablestore</artifactId>
12+
<packaging>jar</packaging>
13+
<name>canal client adapter rdb module for otter ${project.version}</name>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>com.alibaba.otter</groupId>
18+
<artifactId>client-adapter.common</artifactId>
19+
<version>${project.version}</version>
20+
<scope>provided</scope>
21+
</dependency>
22+
23+
<dependency>
24+
<groupId>junit</groupId>
25+
<artifactId>junit</artifactId>
26+
<version>4.12</version>
27+
<scope>test</scope>
28+
</dependency>
29+
30+
<dependency>
31+
<groupId>com.aliyun.openservices</groupId>
32+
<artifactId>tablestore</artifactId>
33+
<version>5.10.3</version>
34+
<classifier>jar-with-dependencies</classifier>
35+
<exclusions>
36+
<exclusion>
37+
<groupId>com.google.protobuf</groupId>
38+
<artifactId>protobuf-java</artifactId>
39+
</exclusion>
40+
<exclusion>
41+
<groupId>org.apache.httpcomponents</groupId>
42+
<artifactId>httpasyncclient</artifactId>
43+
</exclusion>
44+
</exclusions>
45+
</dependency>
46+
47+
48+
</dependencies>
49+
50+
<build>
51+
<plugins>
52+
<plugin>
53+
<groupId>org.apache.maven.plugins</groupId>
54+
<artifactId>maven-assembly-plugin</artifactId>
55+
<version>2.4</version>
56+
<configuration>
57+
<descriptorRefs>
58+
<descriptorRef>jar-with-dependencies</descriptorRef>
59+
</descriptorRefs>
60+
</configuration>
61+
<executions>
62+
<execution>
63+
<id>make-assembly</id>
64+
<phase>package</phase>
65+
<goals>
66+
<goal>single</goal>
67+
</goals>
68+
</execution>
69+
</executions>
70+
</plugin>
71+
<plugin>
72+
<artifactId>maven-antrun-plugin</artifactId>
73+
<executions>
74+
<execution>
75+
<phase>package</phase>
76+
<goals>
77+
<goal>run</goal>
78+
</goals>
79+
<configuration>
80+
<tasks>
81+
<copy todir="${project.basedir}/../launcher/target/classes/tablestore" overwrite="true">
82+
<fileset dir="${project.basedir}/target/classes/tablestore" erroronmissingdir="true">
83+
<include name="*.yml" />
84+
</fileset>
85+
</copy>
86+
</tasks>
87+
</configuration>
88+
</execution>
89+
</executions>
90+
</plugin>
91+
</plugins>
92+
</build>
93+
</project>

0 commit comments

Comments
 (0)