diff --git a/README.md b/README.md
index a873fb94f4..7004fec5b1 100644
--- a/README.md
+++ b/README.md
@@ -231,6 +231,8 @@ reader和writer包括name和parameter,分别表示插件名称和插件参数
* [MongoDB读取插件](docs/mongodbreader.md)
* [Stream读取插件](docs/streamreader.md)
* [Carbondata读取插件](docs/carbondatareader.md)
+* [MySQL binlog读取插件](docs/binlog.md)
+* [KafKa读取插件](docs/kafkareader.md)
### 5.2 写入插件
@@ -244,10 +246,23 @@ reader和writer包括name和parameter,分别表示插件名称和插件参数
* [Redis写入插件](docs/rediswriter.md)
* [Stream写入插件](docs/streamwriter.md)
* [Carbondata写入插件](docs/carbondatawriter.md)
+* [Kafka写入插件](docs/kafkawriter.md)
+* [Hive写入插件](docs/hivewriter.md)
+
+[断点续传和实时采集功能介绍](docs/restore.md)
+
+[数据源开启Kerberos](docs/kerberos.md)
+
+[统计指标说明](docs/statistics.md)
## 6.版本说明
- 1.flinkx的分支版本跟flink的版本对应,比如:flinkx v1.4.0 对应 flink1.4.0,现在支持flink1.4和1.5
+ 1.flinkx的分支版本跟flink的版本对应,比如:flinkx v1.5.0 对应 flink1.5.0,版本说明:
+
+| 插件版本 | flink版本 |
+| ----- | ------- |
+| 1.5.x | 1.5.4 |
+| 1.8.x | 1.8.1 |
## 7.招聘信息
diff --git a/docs/binlog.md b/docs/binlog.md
new file mode 100644
index 0000000000..d3c591f96f
--- /dev/null
+++ b/docs/binlog.md
@@ -0,0 +1,191 @@
+# MySQL binlog读取插件(*reader)
+
+## 1. 配置样例
+
+```json
+{
+ "job": {
+ "content": [{
+ "reader": {
+ "parameter": {
+ "jdbcUrl" : "jdbc:mysql://127.0.0.1:3306/test?charset=utf8",
+ "username" : "username",
+ "password" : "password",
+ "host" : "127.0.0.1",
+ "port": 3306,
+ "table" : [ "test_sink" ],
+ "filter" : "",
+ "cat" : "insert,update,delete",
+ "start" : {
+ "journalName" : "bin.000004",
+ "timestamp" : 123123
+ },
+ "pavingData" : false,
+ "bufferSize" : 1024
+ },
+ "name": "binlogreader"
+ },
+ "writer": {
+
+ }
+ }]
+ },
+ "setting": {
+
+ }
+}
+```
+
+## 2. 参数说明
+
+* **name**
+
+ * 描述:插件名,此处填写插件名称。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **jdbcUrl**
+
+ * 描述:MySQL数据库的jdbc连接字符串,参考文档:[Mysql官方文档](http://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html)
+
+ * 必选:是
+
+ * 默认值:无
+
+* **username**
+
+ * 描述:数据源的用户名
+
+ * 必选:是
+
+ * 默认值:无
+
+* **password**
+
+ * 描述:数据源指定用户名的密码
+
+ * 必选:是
+
+ * 默认值:无
+
+* **host**
+
+ * 描述:启动MySQL slave的机器ip。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **port**
+
+ * 描述:启动MySQL slave的端口
+
+ * 必选:否
+
+ * 默认值:3306
+
+* **table**
+
+ * 描述:需要解析的数据表。
+
+ * 注意:指定此参数后filter参数将无效。
+
+ * 必选:否
+
+ * 默认值:无
+
+* **filter**
+
+ * 描述:过滤表名的Perl正则表达式。
+
+ * 例子:
+
+ * 所有表:.* or .*\\..*
+
+ * canal schema下所有表: canal\\..*
+
+ * canal下的以canal打头的表:canal\\.canal.*
+
+ * canal schema下的一张表:canal\\.test1
+
+ * 必选:否
+
+ * 默认值:无
+
+* **cat**
+
+ * 描述:需要解析的数据更新类型,包括insert、update、delete三种。
+
+ * 注意:以英文逗号分割的格式填写。
+
+ * 必选:否
+
+ * 默认值:null
+
+* **start**
+
+ * 描述:要读取的binlog文件的开始位置。
+
+ * 参数:
+
+ * journalName:采集起点按文件开始时的文件名称;
+
+ * timestamp:采集七点按时间开始时的时间戳;
+
+ * 默认值:无
+
+* **pavingData**
+
+ * 描述:是否将解析出的json数据拍平
+
+ * 示例:假设解析的表为tb1,数据库为test,对tb1中的id字段做update操作,id原来的值为1,更新后为2,则pavingData为true时数据格式为:
+
+ ```json
+ {
+ "type":"update",
+ "schema":"test",
+ "table":"tb1",
+ "ts":1231232,
+ "ingestion":123213,
+ "before_id":1,
+ "after_id":2
+ }
+ ```
+
+ pavingData为false时:
+
+ ```json
+ {
+ "message":{
+ "type":"update",
+ "schema":"test",
+ "table":"tb1",
+ "ts":1231232,
+ "ingestion":123213,
+ "before_id":{
+ "id":1
+ },
+ "after_id":{
+ "id":2
+ }
+ }
+ }
+ ```
+
+ 其中”ts“是数据变更时间,ingestion是插件解析这条数据的纳秒时间
+
+ * 必选:否
+
+ * 默认值:false
+
+* **bufferSize**
+
+ * 描述:并发缓存大小
+
+ * 注意:必须为2的幂
+
+ * 必选:否
+
+ * 默认值:1024
diff --git a/docs/hbasereader.md b/docs/hbasereader.md
index a076698337..cdf1c68a04 100644
--- a/docs/hbasereader.md
+++ b/docs/hbasereader.md
@@ -2,42 +2,48 @@
## 1. 配置样例
-```
+```json
{
- "job": {
- "setting": {},
- "content": [{
- "reader": {
- "name": "hbasereader",
- "parameter": {
- "hbaseConfig": {
- "hbase.zookeeper.property.clientPort": "2181",
- "hbase.rootdir": "hdfs://ns1/hbase",
- "hbase.cluster.distributed": "true",
- "hbase.zookeeper.quorum": "host1,host2,host3",
- "zookeeper.znode.parent": "/hbase"
- },
- "table": "tableTest",
- "encodig": "utf-8",
- "column": [{
- "name": "rowkey",
- "type": "string"
- },
- {
- "name": "cf1:id",
- "type": "string"
- }
- ],
- "range": {
- "startRowkey": "",
- "endRowkey": "",
- "isBinaryRowkey": true
- }
- }
- },
- "writer": {}
- }]
- }
+ "job": {
+ "setting": {},
+ "content": [{
+ "reader": {
+ "name": "hbasereader",
+ "parameter": {
+ "hbaseConfig": {
+ "hbase.zookeeper.property.clientPort": "2181",
+ "hbase.rootdir": "hdfs://ns1/hbase",
+ "hbase.cluster.distributed": "true",
+ "hbase.zookeeper.quorum": "host1,host2,host3",
+ "zookeeper.znode.parent": "/hbase",
+ "hbase.security.authentication":"Kerberos",
+ "hbase.security.authorization":true,
+ "hbase.master.kerberos.principal":"hbase/node1@TEST.COM",
+ "hbase.master.keytab.file":"hbase.keytab",
+ "hbase.regionserver.keytab.file":"hbase.keytab",
+ "hbase.regionserver.kerberos.principal":"hbase/node1@TEST.COM"
+ },
+ "table": "tableTest",
+ "encodig": "utf-8",
+ "column": [{
+ "name": "rowkey",
+ "type": "string"
+ },
+ {
+ "name": "cf1:id",
+ "type": "string"
+ }
+ ],
+ "range": {
+ "startRowkey": "",
+ "endRowkey": "",
+ "isBinaryRowkey": true
+ }
+ }
+ },
+ "writer": {}
+ }]
+ }
}
```
@@ -45,7 +51,7 @@
* **hbaseConfig**
- * 描述:hbase的连接配置,以json的形式组织 (见hbase-site.xml)
+ * 描述:hbase的连接配置,以json的形式组织 (见hbase-site.xml),开启kerberos的话参考文档[数据源开启Kerberos](kerberos.md)
* 必选:是
diff --git a/docs/hbasewriter.md b/docs/hbasewriter.md
index ea2579fb85..52a6948578 100644
--- a/docs/hbasewriter.md
+++ b/docs/hbasewriter.md
@@ -2,7 +2,7 @@
## 1. 配置样例
-```
+```json
{
"job": {
"setting": {
@@ -17,7 +17,13 @@
"hbase.rootdir": "hdfs://ns1/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "host1,host2,host3",
- "zookeeper.znode.parent": "/hbase"
+ "zookeeper.znode.parent": "/hbase",
+ "hbase.security.authentication":"Kerberos",
+ "hbase.security.authorization":true,
+ "hbase.master.kerberos.principal":"hbase/node1@TEST.COM",
+ "hbase.master.keytab.file":"hbase.keytab",
+ "hbase.regionserver.keytab.file":"hbase.keytab",
+ "hbase.regionserver.kerberos.principal":"hbase/node1@TEST.COM"
},
"table": "tableTest",
"rowkeyColumn": [{
@@ -50,7 +56,7 @@
* **hbaseConfig**
- * 描述:hbase的连接配置,以json的形式组织 (见hbase-site.xml)
+ * 描述:hbase的连接配置,以json的形式组织 (见hbase-site.xml),开启kerberos的话参考文档[数据源开启Kerberos](kerberos.md)
* 必选:是
diff --git a/docs/hdfsreader.md b/docs/hdfsreader.md
index 457d4ee8d4..3ad7e03d0d 100644
--- a/docs/hdfsreader.md
+++ b/docs/hdfsreader.md
@@ -41,7 +41,7 @@
* **path**
- * 描述:要读取的文件路径,多个路径可以用逗号隔开
+ * 描述:要读取的文件路径,多个路径可以用逗号隔开,开启kerberos的话参考文档[数据源开启Kerberos](kerberos.md)。
* 必选:是
diff --git a/docs/hdfswriter.md b/docs/hdfswriter.md
index c42839d726..0c2a81aae3 100644
--- a/docs/hdfswriter.md
+++ b/docs/hdfswriter.md
@@ -43,7 +43,7 @@
* **defaultFS**
- * 描述:Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:端口;例如:hdfs://127.0.0.1:9000
+ * 描述:Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:端口;例如:hdfs://127.0.0.1:9
* 必选:是
diff --git a/docs/hivewriter.md b/docs/hivewriter.md
new file mode 100644
index 0000000000..9d381ec564
--- /dev/null
+++ b/docs/hivewriter.md
@@ -0,0 +1,107 @@
+# Hive写入插件(hivewriter)
+
+## 1. 配置样例
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+
+ },
+ "writer": {
+ "parameter": {
+ "hadoopConfig": {
+ "dfs.ha.namenodes.ns1" : "nn1,nn2",
+ "fs.defaultFS" : "hdfs://ns1",
+ "dfs.namenode.rpc-address.ns1.nn2" : "node002:9000",
+ "dfs.client.failover.proxy.provider.ns1" : "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
+ "dfs.namenode.rpc-address.ns1.nn1" : "node001:9000",
+ "dfs.nameservices" : "ns1",
+ "fs.hdfs.impl.disable.cache" : "true",
+ "fs.hdfs.impl" : "org.apache.hadoop.hdfs.DistributedFileSystem"
+ },
+
+ "fieldDelimiter": "\u0001",
+ "encoding": "utf-8",
+ "fileType": "orc",
+
+ "partitionType" : "MINUTE",
+ "partition" : "pt",
+
+ "writeMode" : "append",
+
+ "analyticalRules" : "stream_${schema}_${table}_flinkxtest",
+ "password" : "",
+ "tablesColumn" : "{\"date_test\":[{\"type\":\"INT\",\"key\":\"before_id\",\"comment\":\"\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_id\"},{\"type\":\"DATETIME\",\"key\":\"before_datetime1\",\"comment\":\"\"},{\"comment\":\"\",\"type\":\"DATETIME\",\"key\":\"after_datetime1\"},{\"type\":\"TIMESTAMP\",\"key\":\"before_timestamp\",\"comment\":\"\"},{\"comment\":\"\",\"type\":\"TIMESTAMP\",\"key\":\"after_timestamp\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"type\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"schema\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"table\"},{\"comment\":\"\",\"type\":\"bigint\",\"key\":\"ts\"}]}",
+ "jdbcUrl" : "jdbc:hive2://node001:10000/data_map",
+
+ "charsetName" : "utf-8",
+ "username" : ""
+ },
+ "name": "hivewriter"
+ }
+ }
+ ]
+ }
+}
+```
+
+## 2. 参数说明
+
+* **name**
+
+ * 描述:插件名称hivewriter,hivewriter一般结合mysql binlog插件使用,hive插件底层使用的是hdfswriter插件的共,所以需要填写hdfswriter插件需要的参数。hivewriter插件支持同时写入多张表的多个分区,以及自动创建hive表。开启kerberos的话参考文档[数据源开启Kerberos](kerberos.md)。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **partitionType**
+
+ * 描述:分区类型,包括 DAY、HOUR、MINUTE三种
+
+ * DAY:天分区
+
+ * HOUR:小时分区
+
+ * MINUTE:分钟分区
+
+ * 必选:是
+
+ * 默认值:无
+
+* **analyticalRules**
+
+ * 描述:表名映射规则。以“stream_\${schema}_\${table}_flinkxtest”为列,创建表时会将规则中的schema和table替换
+
+ * 必选:否
+
+ * 默认值:无
+
+* **tablesColumn**
+
+ * 描述:写入hive表的表结构信息,示例:
+
+ ```json
+ {
+ "date_test": [ //表名
+
+ {
+ "type": "INT",
+ "key": "before_id",
+ "comment": ""
+ },
+ {
+ "comment": "",
+ "type": "INT",
+ "key": "after_id"
+ }
+ ]
+ }
+ ```
+
+ * 必选:是
+
+ * 默认值:无
diff --git a/docs/images/restore1.png b/docs/images/restore1.png
new file mode 100644
index 0000000000..7104d7f2b1
Binary files /dev/null and b/docs/images/restore1.png differ
diff --git a/docs/images/restore2.png b/docs/images/restore2.png
new file mode 100644
index 0000000000..fdcb071904
Binary files /dev/null and b/docs/images/restore2.png differ
diff --git a/docs/images/restore3.jpg b/docs/images/restore3.jpg
new file mode 100644
index 0000000000..c1810b092e
Binary files /dev/null and b/docs/images/restore3.jpg differ
diff --git a/docs/images/restore4.png b/docs/images/restore4.png
new file mode 100644
index 0000000000..39aa8ca0cf
Binary files /dev/null and b/docs/images/restore4.png differ
diff --git a/docs/images/restore5.png b/docs/images/restore5.png
new file mode 100644
index 0000000000..597bef156e
Binary files /dev/null and b/docs/images/restore5.png differ
diff --git a/docs/kafkareader.md b/docs/kafkareader.md
new file mode 100644
index 0000000000..4a43a235f3
--- /dev/null
+++ b/docs/kafkareader.md
@@ -0,0 +1,82 @@
+# Kafka读取插件(**reader)
+
+## 1. 配置样例
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "parameter": {
+ "topic" : "yxabc",
+ "codec" : "plain",
+ "encoding" : "utf-8",
+ "consumerSettings" : {
+ "zookeeper.connect" : "127.0.0.1:2181/kafka",
+ "group.id" : "default",
+ "auto.commit.interval.ms" : "1000",
+ "auto.offset.reset" : "smallest"
+ }
+ },
+ "name": "kafka09reader"
+ },
+ "writer": {
+
+ }
+ }
+ ],
+ "setting": {
+ "errorLimit": {
+ "record": 1
+ },
+ "speed": {
+ "bytes": 1048576,
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+## 2. 参数说明
+
+* **name**
+
+ * 描述:插件名,目前支持版本09、10、11,名称分别为 kafka09reader、kafka10reader、kafka11reader。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **topic**
+
+ * 描述:要消费的topic。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **encoding**
+
+ * 描述:编码
+
+ * 必选:否
+
+ * 默认值:utf-8
+
+* **codec**
+
+ * 描述:编码解码器类型,支持 json、plain
+
+ * 必选:否
+
+ * 默认值:plain
+
+* **consumerSettings**
+
+ * 描述:kafka连接配置
+
+ * 必选:是
+
+ * 默认值:无
diff --git a/docs/kafkawriter.md b/docs/kafkawriter.md
new file mode 100644
index 0000000000..adf6998f7e
--- /dev/null
+++ b/docs/kafkawriter.md
@@ -0,0 +1,94 @@
+# Kafka写入插件(**writer)
+
+## 1. 配置样例
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+
+ },
+ "writer": {
+ "parameter": {
+ "timezone" : "",
+
+ "encoding" : "utf-8",
+
+ "producerSettings" : {
+
+ "zookeeper.connect" : "127.0.0.1:2181/kafka"
+
+ },
+ "topic" : "mufeng_est",
+
+ "brokerList" : "172.16.8.107:9092"
+
+ },
+ "name": "kafka09writer"
+ }
+ }
+ ],
+ "setting": {
+ "errorLimit": {
+ "record": 1
+ },
+ "speed": {
+ "bytes": 1048576,
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+## 2. 参数说明
+
+* **name**
+
+ * 描述:插件名,目前支持版本09、10、11,名称分别为 kafka09writer、kafka10writer、kafka11writer。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **topic**
+
+ * 描述:topic。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **encoding**
+
+ * 描述:编码
+
+ * 必选:否
+
+ * 默认值:utf-8
+
+* **brokerList**
+
+ * 描述:kafka broker地址列表
+
+ * 必选:是
+
+ * 默认值:无
+
+* **timezone**
+
+ * 描述:时区
+
+ * 必选:是
+
+ * 默认值:无
+
+* **producerSettings**
+
+ * 描述:kafka连接配置
+
+ * 必选:是
+
+ * 默认值:无
diff --git a/docs/kerberos.md b/docs/kerberos.md
new file mode 100644
index 0000000000..f8c137bfdf
--- /dev/null
+++ b/docs/kerberos.md
@@ -0,0 +1,115 @@
+# 数据源开启Kerberos安全认证
+
+目前FlinkX的部分插件支持了kerberos认证,有Hive、Hbase、HDFS三个插件。
+
+### 1.Kerberos证书加载方式
+
+目前支持两种方式,一种是从本地加载,即任务运行的机器上对应的目录必须存在配置里指定的证书文件,另一种是从sftp服务器下载,需要配置sftp服务器的配置信息。
+
+使用本地配置示例:
+
+```json
+"hbaseConfig": {
+ "hbase.zookeeper.property.clientPort": "2181",
+ "hbase.rootdir": "hdfs://ns1/hbase",
+ "hbase.cluster.distributed": "true",
+ "hbase.zookeeper.quorum": "host1,host2,host3",
+ "zookeeper.znode.parent": "/hbase",
+ "hbase.security.authentication":"Kerberos",
+ "hbase.security.authorization":true,
+ "hbase.master.kerberos.principal":"hbase/node1@TEST.COM",
+ "hbase.master.keytab.file":"hbase.keytab",
+ "hbase.regionserver.keytab.file":"hbase.keytab",
+ "hbase.regionserver.kerberos.principal":"hbase/node1@TEST.COM",
+ "java.security.krb5.conf":"krb5.conf",
+ "useLocalFile":true
+ }
+```
+
+从sftp下载配置示例:
+
+```json
+"hbaseConfig": {
+ "hbase.zookeeper.property.clientPort": "2181",
+ "hbase.rootdir": "hdfs://ns1/hbase",
+ "hbase.cluster.distributed": "true",
+ "hbase.zookeeper.quorum": "host1,host2,host3",
+ "zookeeper.znode.parent": "/hbase",
+ "hbase.security.authentication":"Kerberos",
+ "hbase.security.authorization":true,
+ "hbase.master.kerberos.principal":"hbase/node1@TEST.COM",
+ "hbase.master.keytab.file":"hbase.keytab",
+ "hbase.regionserver.keytab.file":"hbase.keytab",
+ "hbase.regionserver.kerberos.principal":"hbase/node1@TEST.COM",
+ "remoteDir":"/sftp/flinkx/keytab/hbase",
+ "sftp":{
+ "host":"127.0.0.1",
+ "port":"22",
+ "username":"",
+ "password":""
+ }
+ }
+```
+
+从sftp下载时的查找顺序:
+
+1.在/sftp/flinkx/keytab/hbase目录下查找hbase.keytab文件,如果找不到则2
+
+2.假设任务运行在node1机器上,则在/sftp/flinkx/keytab/hbase/node1下找hbase.keytab文件,找不到则报错;
+
+### 2.各数据源的配置
+
+#### hbase
+
+```json
+"hbaseConfig": {
+ "hbase.zookeeper.property.clientPort": "2181",
+ "hbase.rootdir": "hdfs://ns1/hbase",
+ "hbase.cluster.distributed": "true",
+ "hbase.zookeeper.quorum": "host1,host2,host3",
+ "zookeeper.znode.parent": "/hbase",
+ "hbase.security.authentication":"Kerberos",
+ "hbase.security.authorization":true,
+ "hbase.master.kerberos.principal":"hbase/node1@TEST.COM",
+ "hbase.master.keytab.file":"hbase.keytab",
+ "hbase.regionserver.keytab.file":"hbase.keytab",
+ "hbase.regionserver.kerberos.principal":"hbase/node1@TEST.COM",
+ "java.security.krb5.conf":"krb5.conf"
+ }
+```
+
+#### hive
+
+```json
+"hadoopConf":{
+ "dfs.ha.namenodes.ns1": "nn1,nn2",
+ "dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
+ "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
+ "dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
+ "dfs.nameservices": "ns1"
+ "hadoop.security.authorization": "true",
+ "hadoop.security.authentication": "Kerberos",
+ "dfs.namenode.kerberos.principal": "hdfs/_HOST@HADOOP.COM",
+ "dfs.namenode.keytab.file": "hdfs.keytab",
+ "java.security.krb5.conf": "krb5.conf"
+}
+```
+
+jdbcUrl格式:jdbc:hive2://127.0.0.1:10000/default;principal=hive/node1@HADOOP.COM
+
+#### hdfs
+
+```json
+"hadoopConf":{
+ "dfs.ha.namenodes.ns1": "nn1,nn2",
+ "dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
+ "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
+ "dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
+ "dfs.nameservices": "ns1"
+ "hadoop.security.authorization": "true",
+ "hadoop.security.authentication": "Kerberos",
+ "dfs.namenode.kerberos.principal": "hdfs/_HOST@HADOOP.COM",
+ "dfs.namenode.keytab.file": "hdfs.keytab",
+ "java.security.krb5.conf": "krb5.conf"
+}
+```
diff --git a/docs/rdbreader.md b/docs/rdbreader.md
index f30a661b58..afa8c07e1f 100644
--- a/docs/rdbreader.md
+++ b/docs/rdbreader.md
@@ -37,7 +37,8 @@
"requestAccumulatorInterval": 2,
"increColumn": "id",
"startLocation": null,
- "useMaxFunc": true
+ "useMaxFunc": true,
+ "orderByColumn": "id"
},
"name": "mysqlreader"
},
@@ -56,7 +57,7 @@
* **name**
- * 描述:插件名,此处填写插件名称,当前支持的关系数据库插件包括:mysqlreader,oraclereader,sqlserverreader,postgresqlreader,db2reader。
+ * 描述:插件名,此处填写插件名称,当前支持的关系数据库插件包括:mysqlreader,oraclereader,sqlserverreader,postgresqlreader,db2reader,gbasereader。
* 必选:是
* 默认值:无
@@ -76,6 +77,8 @@
- [PostgreSql官方文档](https://jdbc.postgresql.org/documentation/head/connect.html)
- [Db2官方文档](https://www.ibm.com/analytics/us/en/db2/)
+
+ - [Gbase官方文档](http://www.gbase.cn/download.html)
* 必选:是
@@ -219,10 +222,10 @@
```
"column": [{
- "name": "col",
- "type": "datetime",
- "format": "yyyy-MM-dd hh:mm:ss",
- "value": "value"
+ "name": "col",
+ "type": "datetime",
+ "format": "yyyy-MM-dd hh:mm:ss",
+ "value": "value"
}]
```
@@ -239,3 +242,11 @@
* 必选:是
* 默认值:无
+
+* **orderByColumn**
+
+ * 描述:排序字段,读取PostgreSQL数据时,如果中途任务失败,没有关闭事务,会导致表里的数据顺序改变,再次运行任务时由于数据顺序不对会影响数据的准确性,因此使用orderByColumn指定的字段进行排序避免这种情况。
+
+ * 必选:否
+
+ * 默认值:无
diff --git a/docs/rdbwriter.md b/docs/rdbwriter.md
index 092fccdd2a..c8003af66c 100644
--- a/docs/rdbwriter.md
+++ b/docs/rdbwriter.md
@@ -4,33 +4,33 @@
```
{
- "job": {
- "content": [{
- "reader": {},
- "writer": {
- "name": "*writer",
-
- "parameter": {
- "connection": [{
- "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useCursorFetch=true",
- "table": [
- "tableTest"
- ]
- }],
- "username": "username",
- "password": "password",
- "column": [],
-
- "writeMode": "insert",
- "batchSize": 1024,
- "preSql": "",
- "postSql": "",
- "updateKey": ""
- }
- }
- }]
- },
- "setting": {}
+ "job": {
+ "content": [{
+ "reader": {},
+ "writer": {
+ "name": "*writer",
+
+ "parameter": {
+ "connection": [{
+ "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useCursorFetch=true",
+ "table": [
+ "tableTest"
+ ]
+ }],
+ "username": "username",
+ "password": "password",
+ "column": [],
+
+ "writeMode": "insert",
+ "batchSize": 1024,
+ "preSql": "",
+ "postSql": "",
+ "updateKey": ""
+ }
+ }
+ }]
+ },
+ "setting": {}
}
```
@@ -38,7 +38,8 @@
* **name**
- * 描述:插件名,此处可填写:mysqlwriter,oraclewriter,sqlserverwriter,postgresqlwriter,db2writer
+ * 描述:插件名,此处可填写:mysqlwriter,oraclewriter,sqlserverwriter,postgresqlwriter,db2writer,gbasewriter
+
* 必选:是
默认值:无
diff --git a/docs/restore.md b/docs/restore.md
index c664be8625..682ad95036 100644
--- a/docs/restore.md
+++ b/docs/restore.md
@@ -1,90 +1,230 @@
-# 断点续传
+### 1.功能介绍
-## 1.什么是断点续传
+#### **1.1** **断点续传**
-**断点续传**是指同步任务在运行过程中因某种原因导致任务失败,无需重跑整个任 务,只需从上次失败的位置继续同步即可,类似于下载文件时因网络原因失败,不 需要重新下载文件,只需要继续下载就行。
+断点续传是指数据同步任务在运行过程中因各种原因导致任务失败,不需要重头同步数据,只需要从上次失败的位置继续同步即可,类似于下载文件时因网络原因失败,不需要重新下载文件,只需要继续下载就行,可以大大节省时间和计算资源。
-同步任务支持断点续传的好处:
+#### **2.1** **实时采集**
-- 节省时间:不需要重跑任务,只需要从断点继续同步,节省重跑的时间;
+根据数据源的数据是否实时变化可以把数据同步分为离线数据同步和实时数据同步,上面介绍的断点续传就是离线数据同步里的功能,实时采集其实就是实时数据同步,当数据源里的数据发生了增删改操作,同步任务监听到这些变化,将变化的数据实时同步到目标数据源。除了数据实时变化外,实时采集和离线数据同步的另一个区别是:实时采集任务是不会停止的,任务会一直监听数据源是否有变化。
-- 节省资源:如果数据量巨大,重跑整个任务会占用集群资源,影响其它任务运 行;
+### 2.Flink**中的checkpoint机制**
-- 减少运维成本:断点续传结合任务的失败重试机制,可以让任务自己运行完成, 不人为干涉;
+断点续传和实时采集都依赖于flink的checkpoint机制,这里简单介绍一下。 Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。
-## 2.工作原理
+
+

+
-##### 2.1 checkpoint
+Checkpoint触发时,会向多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会随着Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。
-断点续传基于flink的checkpoint功能实现,Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地对任务中的Operator/task的状态生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。支持快照恢复的数据源必须支持在一定时间内重放事件,比如Kafka。
+### **3.断点续传**
-##### 2.2 从断点读取
+#### **3.1** **前提条件**
-当前支持的数据源中可以实现数据重放的有关系数据库,ES,MongoDb等,只要是能根据过滤条件查询的数据源都满足条件。对于关系数据库,任务的状态就是恢复字段的读取位置,任务恢复就是拿到待恢复的快照信息,将里面的位置信息拼接到查询sql的过滤条件里,然后进行数据查询。目前只实现了关系数据库的失败恢复。
+同步任务要支持断点续传,对数据源有一些强制性的要求:
-```
-select
-id, name, address, email
-from
-student
-where
-id > ${restoreLocatio}
-```
+- 数据源(这里特指关系数据库)中必须包含一个升序的字段,比如主键或者日期类型的字段,同步过程中会使用checkpoint机制记录这个字段的值,任务恢复运行时使用这个字段构造查询条件过滤已经同步过的数据,如果这个字段的值不是升序的,那么任务恢复时过滤的数据就是错误的,最终导致数据的缺失或重复;
+
+- 数据源必须支持数据过滤,如果不支持的话,任务就无法从断点处恢复运行,会导致数据重复;
+
+- 目标数据源必须支持事务,比如关系数据库,文件类型的数据源也可以通过临时文件的方式支持;
-##### 2.3 写入控制
+#### **3.2** **任务运行的详细过程**
-**写入hdfs和ftp文件**时,是先将数据写入临时文件,flink出发checkpoint生成时将临时文件转为正式的数据文件;
+我们用一个具体的任务详细介绍一下整个过程,任务详情如下:
-**写入关系数据库**是通过事务控制的,关闭事务自动提交功能,flink出发checkpoint生成时提交事务;
+| **数据源** | mysql表,假设表名data_test,表中包含主键字段id |
+| -------------------- | ---------------------------------------------------------------------- |
+| **目标数据源** | hdfs文件系统,假设写入路径为 /data_test |
+| **并发数** | 2 |
+| **checkpoint****配置** | 时间间隔为60s,checkpoint的StateBackend为FsStateBackend,路径为 /flinkx/checkpoint |
+| **jobId** | 用来构造数据文件的名称,假设为 abc123 |
-##### 2.4 要求
+##### **1)** **读取数据**
-数据要求:断点续传要求数据按照某个字段升序排列,比如id字段或时间字段。
+读取数据时首先要构造数据分片,构造数据分片就是根据通道索引和checkpoint记录的位置构造查询sql,sql模板如下:
-数据库:要求必须支持事务。
+```sql
+select * from data_test
+where id mod ${channel_num}=${channel_index}
+and id > ${offset}
+```
+
+如果是第一次运行,或者上一次任务失败时还没有触发checkpoint,那么offset就不存在,根据offset和通道可以确定具体的查询sql:
-## 3.如何配置
+offset存在时
-3.1 启动参数
+```sql
+第一个通道:
+select * from data_test
+where id mod 2=0
+and id > ${offset_0};
+第二个通道
+select * from data_test
+where id mod 2=1
+and id > ${offset_1};
```
--confProp
-"{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}"
--s
-/flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
+offset不存在时
+
+```sql
+第一个通道:
+select * from data_test
+where id mod 2=0;
+
+第二个通道
+select * from data_test
+where id mod 2=1;
```
-- confProp:checkpoint的配置参数
-
- - flink.checkpoint.interval:checkpoint时间间隔,不填此参数,checkpoint不会弃用;
+数据分片构造好之后,每个通道就根据自己的数据分片去读数据了。
+
+##### **2**)写数据
+
+写数据前会先做几个操作:
+
+1. 检测 /data_test 目录是否存在,如果目录不存在,则创建这个目录,如果目录存在,进行2操作;
+
+2. 判断是不是以覆盖模式写数据,如果是,则删除 /data_test目录,然后再创建目录,如果不是,则进行3操作;
+
+3. 检测 /data_test/.data 目录是否存在,如果存在就先删除,再创建,确保没有其它任务因异常失败遗留的脏数据文件;
+
+ 数据写入hdfs是单条写入的,不支持批量写入。数据会先写入/data_test/.data/目录下,数据文件的命名格式为: channelIndex.jobId.fileIndex,包含通道索引,jobId,文件索引三个部分,文件最掐灭。
+
+##### **3**)checkpoint触发时
+
+在FlinkX中“状态”表示的是标识字段id的值,我们假设checkpoint触发时两个通道的读取和写入情况如图中所示:
+
+
+

+
+
+checkpoint触发后,两个reader先生成Snapshot记录读取状态,通道0的状态为 id=12,通道1的状态为 id=11。Snapshot生成之后向数据流里面插入barrier,barrier随数据流向Writer。以Writer_0为例,以Writer_0接收Reader_0和Reader_1的发来的数据,假设先收到了Reader_0的barrier,这个时候Writer_0停止写出数据到HDFS,将接收到的数据先放到 InputBuffer里面,一直等待Reader_1的barrier到达之后再将Buffer里的数据全部写出,然后生成Writer的Snapshot,整个checkpoint结束后,记录的任务状态为:
+
+> Reader_0:id=12
+>
+> Reader_1:id=11
+>
+> Writer_0:id=无法确定
+>
+> Writer_1:id=无法确定
+
+任务状态会记录到配置的HDFS目录/flinkx/checkpoint/abc123下。因为每个Writer会接收两个Reader的数据,以及各个通道的数据读写速率可能不一样,所以导致writer接收到的数据顺序是不确定的,但是这不影响数据的准确性,因为读取数据时只需要Reader记录的状态就可以构造查询sql,我们只要确保这些数据真的写到HDF就行了。在Writer生成Snapshot之前,会做一系列操作保证接收到的数据全部写入HDFS:
+
+- close写入HDFS文件的数据流,这时候会在/data_test/.data目录下生成两个两个文件:
- - flink.checkpoint.timeout:超时时间;
+ - /data_test/.data/0.abc123.0
- - flink.checkpoint.stateBackend:checkpoint的路径,目前只支持本地文件系统和hdfs路径;
+ - /data_test/.data/1.abc123.0
-- s:恢复任务时checkpoint的路径
+- 将生成的两个数据文件移动到/data_test目录下;
-3.2 任务配置
+- 更新文件名称模板更新为:channelIndex.abc123.1;
+快照生成后任务继续读写数据,如果生成快照的过程中有任何异常,任务会直接失败,这样这次快照就不会生成,任务恢复时会从上一个成功的快照恢复。
+
+##### **4**)任务正常结束
+
+任务正常结束时也会做和生成快照时同样的操作,close文件流,移动临时数据文件等。
+
+##### **5**)任务异常终止
+
+任务如果异常结束,假设任务结束时最后一个checkpoint记录的状态为:
+
+> Reader_0:id=12
+>
+> Reader_1:id=11
+
+那么任务恢复的时候就会把各个通道记录的状态赋值给offset,再次读取数据时构造的sql为:
+
+```sql
+第一个通道:
+select * from data_test
+where id mod 2=0
+and id > 12;
+
+第二个通道
+select * from data_test
+where id mod 2=1
+and id > 11;
```
-"restore": {
- "isRestore": false,
- "restoreColumnName": "",
- "restoreColumnIndex": 0
- }
-```
-- isRestore:断点续传开关,默认为false
+这样就可以从上一次失败的位置继续读取数据了。
+
+#### **3.3** **支持断点续传的插件**
+
+理论上只要支持过滤数据的数据源,和支持事务的数据源都可以支持断点续传的功能,目前FlinkX支持的插件如下:
+
+| Reader | Writer |
+| -------------- | --------------- |
+| mysql等关系数据读取插件 | HDFS |
+| | FTP |
+| | mysql等关系数据库写入插件 |
+
+### **4.基于binlog的实时采集**
+
+目前FlinkX支持实时采集的插件有KafKa,binlog插件,binlog插件是专门针对mysql数据库做实时采集的,如果要支持其它的数据源,只需要把数据打到Kafka,然后再用FlinkX的Kafka插件消费数据即可,比如oracle,只需要使用oracle的ogg将数据打到Kafka。这里我们专门讲解一下mysql的实时采集插件binlog。
+
+#### **4.1 binlog**
+
+binlog是Mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中。
+
+binlog的作用主要有:
+
+- 复制:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves并回放来达到master-slave数据一致的目的;
+
+- 数据恢复:通过mysqlbinlog工具恢复数据;
+
+- 增量备份;
+
+#### **4.2 MySQL**主备复制
+
+有了记录数据变化的binlog日志还不够,我们还需要借助MySQL的主备复制功能:主备复制是指 一台服务器充当主数据库服务器,另一台或多台服务器充当从数据库服务器,主服务器中的数据自动复制到从服务器之中。
+
+
+

+
+
+主备复制的过程:
+
+1. MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看);
+
+2. MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log);
+
+3. MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据;
+
+#### **4.3 canal**
+
+有了binlog日志数据和MySQL的主备复制功能,我们只需要模拟一台Slave,将接收到的binlog数据解析出来就可以做到实时采集MySQL的数据变化,阿里巴巴贡献的canal组件就实现了这样的功能。
+
+
+

+
+
+canal工作原理:
+
+1. canal模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
+
+2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
+
+3. canal解析 binary log 对象(原始为 byte 流)
+
+#### **4.4** **写入Hive**
+
+binlog插件可以监听多张表的数据变更情况,解析出的数据中包含表名称信息,读取到的数据可以全部写入目标数据库的一张表,也可以根据数据中包含的表名信息写入不同的表,目前只有Hive插件支持这个功能。Hive插件目前只有写入插件,功能基于HDFS的写入插件实现,也就是说从binlog读取,写入hive也支持失败恢复的功能。
+
+
+

+
+
+写入Hive的过程:
-- restoreColumnName:用来做断点续传的字段名称
+1. 从数据中解析出MySQL的表名,然后根据表名映射规则转换成对应的Hive表名;
-- restoreColumnIndex:字段索引
+2. 检查Hive表是否存在,如果不存在就创建Hive表;
-## 4 支持的插件
+3. 查询Hive表的相关信息,构造HdfsOutputFormat;
-| reader | writer |
-| ------------------------------------------- | ------------------------------------------- |
-| mysql,oracle,sqlserver,db2,postgresql,gbase | hdfs |
-| | ftp |
-| | mysql,oracle,sqlserver,db2,postgresql,gbase |
+4. 调用HdfsOutputFormat将数据写入HDFS;
diff --git a/docs/statistics.md b/docs/statistics.md
new file mode 100644
index 0000000000..8c07821462
--- /dev/null
+++ b/docs/statistics.md
@@ -0,0 +1,161 @@
+FlinkX使用了flink内置Accumulator和Metric来记录任务的一些统计指标:
+
+| 指标名称 | 含义 |
+| ---------------- | ----------- |
+| numRead | 累计读取数据条数 |
+| byteRead | 累计读取数据字节数 |
+| readDuration | 读取数据的总时间 |
+| | |
+| numWrite | 累计写入数据条数 |
+| byteWrite | 累计写入数据字节数 |
+| writeDuration | 写入数据的总时间 |
+| nErrors | 累计错误记录数 |
+| nullErrors | 累计空指针错误记录数 |
+| duplicateErrors | 累计主键冲突错误记录数 |
+| conversionErrors | 累计类型转换错误记录数 |
+| otherErrors | 累计其它错误记录数 |
+
+### 获取统计指标的方式
+
+#### 1.Local模式运行
+
+local模式运行时,任务结束后会在控制台打印这些指标:
+
+```
+---------------------------------
+numWrite | 100
+last_write_num_0 | 0
+conversionErrors | 0
+writeDuration | 12251
+numRead | 100
+duplicateErrors | 0
+snapshotWrite | 0
+readDuration | 12247
+otherErrors | 0
+byteRead | 2329
+last_write_location_0 | 0
+byteWrite | 2329
+nullErrors | 0
+nErrors | 0
+---------------------------------
+```
+
+#### 2.yarn模式运行
+
+##### 2.1 通过Flink REST接口获取
+
+任务运行期间,可以通过Flink REST接口获取Accumulator数据,名称和上面给出的一致。
+
+api:http://host:8088/proxy/application_1569335225689_4172//jobs/d5582272d29ff38e10416a4043a86cad/accumulators
+
+返回数据示例:
+
+```json
+{
+ "job-accumulators": [],
+ "user-task-accumulators": [
+ {
+ "name": "numWrite",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "last_write_num_0",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "conversionErrors",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "writeDuration",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "numRead",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "duplicateErrors",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "snapshotWrite",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "readDuration",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "otherErrors",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "byteRead",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "last_write_location_0",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "byteWrite",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "nullErrors",
+ "type": "LongCounter",
+ "value": "0"
+ },
+ {
+ "name": "nErrors",
+ "type": "LongCounter",
+ "value": "0"
+ }
+ ],
+ "serialized-user-task-accumulators": {}
+}
+```
+
+##### 2.2 将指标输出到其它系统
+
+比如将指标输出到prometheus,在flink的配置文件里增加配置即可:
+
+```
+metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
+metrics.reporter.promgateway.interval: 500 MILLISECONDS
+metrics.reporter.promgateway.host: 127.0.0.1
+metrics.reporter.promgateway.port: 9091
+metrics.reporter.promgateway.jobName: testjob
+metrics.reporter.promgateway.randomJobNameSuffix: true
+metrics.reporter.promgateway.deleteOnShutdown: false
+```
+
+通过prometheus获取数据时的名称为:
+
+| FlinkX中指标名称 | prometheus中指标名称 |
+| ---------------- | ----------------------------------------------------------- |
+| numRead | flink_taskmanager_job_task_operator_flinkx_byteRead |
+| byteRead | flink_taskmanager_job_task_operator_flinkx_byteRead |
+| readDuration | flink_taskmanager_job_task_operator_flinkx_readDuration |
+| | |
+| numWrite | flink_taskmanager_job_task_operator_flinkx_numWrite |
+| byteWrite | flink_taskmanager_job_task_operator_flinkx_byteWrite |
+| writeDuration | flink_taskmanager_job_task_operator_flinkx_writeDuration |
+| nErrors | flink_taskmanager_job_task_operator_flinkx_nErrors |
+| nullErrors | flink_taskmanager_job_task_operator_flinkx_nullErrors |
+| duplicateErrors | flink_taskmanager_job_task_operator_flinkx_duplicateErrors |
+| conversionErrors | flink_taskmanager_job_task_operator_flinkx_conversionErrors |
+| otherErrors | flink_taskmanager_job_task_operator_flinkx_otherErrors |
diff --git a/flinkx-binlog/flinkx-binlog-reader/pom.xml b/flinkx-binlog/flinkx-binlog-reader/pom.xml
index 701e001b3d..d9f3b89e9b 100644
--- a/flinkx-binlog/flinkx-binlog-reader/pom.xml
+++ b/flinkx-binlog/flinkx-binlog-reader/pom.xml
@@ -34,9 +34,9 @@
- org.slf4j:slf4j-log4j12
- log4j:log4j
org.slf4j:slf4j-api
+ log4j:log4j
+
netty-all:io.netty
com.google.guava:guava
diff --git a/flinkx-carbondata/flinkx-carbondata-core/pom.xml b/flinkx-carbondata/flinkx-carbondata-core/pom.xml
index dbbbec4c64..e04bc70cc3 100644
--- a/flinkx-carbondata/flinkx-carbondata-core/pom.xml
+++ b/flinkx-carbondata/flinkx-carbondata-core/pom.xml
@@ -12,11 +12,6 @@
flinkx-carbondata-core
-
- com.dtstack.flinkx
- flinkx-core
- 1.6
-
org.apache.carbondata
diff --git a/flinkx-carbondata/flinkx-carbondata-reader/pom.xml b/flinkx-carbondata/flinkx-carbondata-reader/pom.xml
index bdd98d40c6..a0951fece0 100644
--- a/flinkx-carbondata/flinkx-carbondata-reader/pom.xml
+++ b/flinkx-carbondata/flinkx-carbondata-reader/pom.xml
@@ -34,18 +34,14 @@
-
- com.dtstack.flinkx:flinkx-core
- com.dtstack.flinkx:flinkx-rdb
-
-
- org.apache.flink:*
com.data-artisans:*
org.scala-lang:*
io.netty:*
- org.slf4j:slf4j-api
org.apache.hadoop:*
xerces:xercesImpl
+ org.slf4j:slf4j-api
+ org.slf4j:slf4j-log4j12
+ ch.qos.logback:*
diff --git a/flinkx-carbondata/flinkx-carbondata-writer/pom.xml b/flinkx-carbondata/flinkx-carbondata-writer/pom.xml
index ba853b35c0..e5d7b1f848 100644
--- a/flinkx-carbondata/flinkx-carbondata-writer/pom.xml
+++ b/flinkx-carbondata/flinkx-carbondata-writer/pom.xml
@@ -36,18 +36,15 @@
- org.apache.flink:flink-jdbc
-
- com.dtstack.flinkx:flinkx-core
- com.dtstack.flinkx:flinkx-rdb
-
com.google.code.gson:*
- org.apache.flink:*
com.data-artisans:*
org.scala-lang:*
io.netty:*
org.apache.hadoop:*
xerces:xercesImpl
+ org.slf4j:slf4j-api
+
+ ch.qos.logback:*
diff --git a/flinkx-carbondata/pom.xml b/flinkx-carbondata/pom.xml
index f7682edf3b..16cd07a911 100644
--- a/flinkx-carbondata/pom.xml
+++ b/flinkx-carbondata/pom.xml
@@ -17,4 +17,13 @@
flinkx-carbondata-reader
+
+
+ com.dtstack.flinkx
+ flinkx-core
+ 1.6
+ provided
+
+
+
\ No newline at end of file
diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java b/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java
index 53c734694a..cd4bcb17c5 100644
--- a/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java
+++ b/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java
@@ -151,12 +151,6 @@ private static StreamExecutionEnvironment openCheckpointConf(StreamExecutionEnvi
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- String backendPath = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_DATAURI_KEY);
- if(backendPath != null){
- //set checkpoint save path on file system,hdfs://, file://
- env.setStateBackend(new FsStateBackend(backendPath.trim()));
- LOG.info("Set StateBackend:" + backendPath);
- }
}
return env;
}
diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/authenticate/KerberosUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/authenticate/KerberosUtil.java
index 2acb1101f8..d914e64f75 100644
--- a/flinkx-core/src/main/java/com/dtstack/flinkx/authenticate/KerberosUtil.java
+++ b/flinkx-core/src/main/java/com/dtstack/flinkx/authenticate/KerberosUtil.java
@@ -62,7 +62,7 @@ public class KerberosUtil {
}
}
- public static void login(Configuration conf, String principal, String keytab) throws IOException {
+ public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException {
if (conf == null) {
throw new IllegalArgumentException("kerberos conf can not be null");
}
@@ -83,7 +83,7 @@ public static void login(Configuration conf, String principal, String keytab) th
UserGroupInformation.setConfiguration(conf);
LOG.info("login user:{} with keytab:{}", principal, keytab);
- UserGroupInformation.loginUserFromKeytab(principal, keytab);
+ return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
}
private static void reloadKrb5Conf(Configuration conf){
diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/constants/ConfigConstrant.java b/flinkx-core/src/main/java/com/dtstack/flinkx/constants/ConfigConstrant.java
index baa2362e27..502b3327f2 100644
--- a/flinkx-core/src/main/java/com/dtstack/flinkx/constants/ConfigConstrant.java
+++ b/flinkx-core/src/main/java/com/dtstack/flinkx/constants/ConfigConstrant.java
@@ -31,10 +31,4 @@ public class ConfigConstrant {
public static final String FLINK_CHECKPOINT_INTERVAL_KEY = "flink.checkpoint.interval";
public static final String FLINK_CHECKPOINT_TIMEOUT_KEY = "flink.checkpoint.timeout";
-
- public static final String FLINK_MAXCONCURRENTCHECKPOINTS_KEY = "flink.max.concurrent.checkpoints";
-
- public static final String FLINK_CHECKPOINT_CLEANUPMODE_KEY = "flink.checkpoint.cleanup.mode";
-
- public static final String FLINK_CHECKPOINT_DATAURI_KEY = "flink.checkpoint.stateBackend";
}
diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/FileOutputFormat.java b/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/FileOutputFormat.java
index 02976364da..8202b20a76 100644
--- a/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/FileOutputFormat.java
+++ b/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/FileOutputFormat.java
@@ -21,6 +21,7 @@
import com.dtstack.flinkx.exception.WriteRecordException;
import com.dtstack.flinkx.restore.FormatState;
+import com.dtstack.flinkx.util.ExceptionUtil;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.types.Row;
@@ -126,11 +127,12 @@ protected void actionBeforeWriteData(){
try{
// 覆盖模式并且不是从检查点恢复时先删除数据目录
- if(!APPEND_MODE.equalsIgnoreCase(writeMode) && formatState.getState() == null){
+ if(!APPEND_MODE.equalsIgnoreCase(writeMode) && formatState != null && formatState.getState() == null){
coverageData();
}
} catch (Exception e){
- throw new RuntimeException("");
+ LOG.error("e = {}", ExceptionUtil.getErrorMessage(e));
+ throw new RuntimeException(e);
}
try {
diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/FileSystemUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/FileSystemUtil.java
index bc17d38fdf..f692dbf6b6 100644
--- a/flinkx-core/src/main/java/com/dtstack/flinkx/util/FileSystemUtil.java
+++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/FileSystemUtil.java
@@ -25,7 +25,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import java.security.PrivilegedAction;
import java.util.Map;
/**
@@ -45,7 +47,7 @@ public class FileSystemUtil {
public static FileSystem getFileSystem(Map hadoopConfig, String defaultFS, String jobId, String plugin) throws Exception {
if(openKerberos(hadoopConfig)){
- loginHdfs(hadoopConfig, jobId, plugin);
+ return getFsWithKerberos(hadoopConfig, jobId, plugin, defaultFS);
}
return FileSystem.get(getConfiguration(hadoopConfig, defaultFS));
@@ -59,7 +61,7 @@ private static boolean openKerberos(Map hadoopConfig){
return AUTHENTICATION_TYPE.equalsIgnoreCase(MapUtils.getString(hadoopConfig, KEY_HADOOP_SECURITY_AUTHENTICATION));
}
- private static void loginHdfs(Map hadoopConfig, String jobId, String plugin) throws Exception{
+ private static FileSystem getFsWithKerberos(Map hadoopConfig, String jobId, String plugin, String defaultFS) throws Exception{
String keytab = getKeytab(hadoopConfig);
String principal = getPrincipal(hadoopConfig);
@@ -67,7 +69,17 @@ private static void loginHdfs(Map hadoopConfig, String jobId, St
principal = KerberosUtil.findPrincipalFromKeytab(principal, keytab);
KerberosUtil.loadKrb5Conf(hadoopConfig, jobId, plugin);
- KerberosUtil.login(getConfiguration(hadoopConfig, null), principal, keytab);
+ UserGroupInformation ugi = KerberosUtil.loginAndReturnUGI(getConfiguration(hadoopConfig, defaultFS), principal, keytab);
+ return ugi.doAs(new PrivilegedAction() {
+ @Override
+ public FileSystem run(){
+ try {
+ return FileSystem.get(getConfiguration(hadoopConfig, defaultFS));
+ } catch (Exception e){
+ throw new RuntimeException("Get FileSystem with kerberos error:", e);
+ }
+ }
+ });
}
private static String getPrincipal(Map hadoopConfig){
diff --git a/flinkx-db2/flinkx-db2-reader/pom.xml b/flinkx-db2/flinkx-db2-reader/pom.xml
index 81b3a15af4..67ccad55ba 100644
--- a/flinkx-db2/flinkx-db2-reader/pom.xml
+++ b/flinkx-db2/flinkx-db2-reader/pom.xml
@@ -40,6 +40,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-db2/flinkx-db2-writer/pom.xml b/flinkx-db2/flinkx-db2-writer/pom.xml
index f866a3e96a..35a097cad5 100644
--- a/flinkx-db2/flinkx-db2-writer/pom.xml
+++ b/flinkx-db2/flinkx-db2-writer/pom.xml
@@ -40,6 +40,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-es/flinkx-es-reader/pom.xml b/flinkx-es/flinkx-es-reader/pom.xml
index 9f617b5854..5f5bfc50f6 100644
--- a/flinkx-es/flinkx-es-reader/pom.xml
+++ b/flinkx-es/flinkx-es-reader/pom.xml
@@ -37,7 +37,9 @@
com.data-artisans:*
org.scala-lang:*
io.netty:*
-
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
diff --git a/flinkx-es/flinkx-es-writer/pom.xml b/flinkx-es/flinkx-es-writer/pom.xml
index 885ec9737c..a40d8e428f 100644
--- a/flinkx-es/flinkx-es-writer/pom.xml
+++ b/flinkx-es/flinkx-es-writer/pom.xml
@@ -35,10 +35,12 @@
com.google.code.gson:*
-
com.data-artisans:*
org.scala-lang:*
io.netty:*
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
diff --git a/flinkx-examples/examples/mysql_to_hbase.json b/flinkx-examples/examples/mysql_to_hbase.json
index 51618b0bd7..253af486a9 100644
--- a/flinkx-examples/examples/mysql_to_hbase.json
+++ b/flinkx-examples/examples/mysql_to_hbase.json
@@ -44,16 +44,7 @@
"zookeeper.znode.parent": "/hbase"
},
"table": "tb1",
- "rowkeyColumn": [
- {
- "index": 0,
- "type": "string"
- },
- {
- "value": "_postfix",
- "type": "string"
- }
- ],
+ "rowkeyColumn": "col1#col2",
"column": [
{
"name": "cf1:id",
diff --git a/flinkx-ftp/flinkx-ftp-reader/pom.xml b/flinkx-ftp/flinkx-ftp-reader/pom.xml
index 7ca313e801..29e9474c29 100644
--- a/flinkx-ftp/flinkx-ftp-reader/pom.xml
+++ b/flinkx-ftp/flinkx-ftp-reader/pom.xml
@@ -53,12 +53,12 @@ under the License.
- com.dtstack.flinkx:flinkx-core
- org.apache.flink:*
com.data-artisans:*
org.scala-lang:*
io.netty:*
org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
diff --git a/flinkx-ftp/flinkx-ftp-writer/pom.xml b/flinkx-ftp/flinkx-ftp-writer/pom.xml
index 2022d3ead1..29db430811 100644
--- a/flinkx-ftp/flinkx-ftp-writer/pom.xml
+++ b/flinkx-ftp/flinkx-ftp-writer/pom.xml
@@ -52,12 +52,13 @@ under the License.
- com.dtstack.flinkx:flinkx-core
com.google.code.gson:*
- org.apache.flink:*
com.data-artisans:*
org.scala-lang:*
io.netty:*
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
diff --git a/flinkx-gbase/flinkx-gbase-reader/pom.xml b/flinkx-gbase/flinkx-gbase-reader/pom.xml
index 20dd0f9bcb..7d2067b424 100644
--- a/flinkx-gbase/flinkx-gbase-reader/pom.xml
+++ b/flinkx-gbase/flinkx-gbase-reader/pom.xml
@@ -39,6 +39,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-gbase/flinkx-gbase-writer/pom.xml b/flinkx-gbase/flinkx-gbase-writer/pom.xml
index caaea83b91..504a17703b 100644
--- a/flinkx-gbase/flinkx-gbase-writer/pom.xml
+++ b/flinkx-gbase/flinkx-gbase-writer/pom.xml
@@ -39,6 +39,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-hbase/flinkx-hbase-core/src/main/java/com/dtstack/flinkx/hbase/HbaseHelper.java b/flinkx-hbase/flinkx-hbase-core/src/main/java/com/dtstack/flinkx/hbase/HbaseHelper.java
index 479c4dc5cf..f5426f4690 100644
--- a/flinkx-hbase/flinkx-hbase-core/src/main/java/com/dtstack/flinkx/hbase/HbaseHelper.java
+++ b/flinkx-hbase/flinkx-hbase-core/src/main/java/com/dtstack/flinkx/hbase/HbaseHelper.java
@@ -29,9 +29,11 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -66,13 +68,7 @@ public static org.apache.hadoop.hbase.client.Connection getHbaseConnection(Map hbaseConfigMap, String jobId, String plugin){
+ for (String key : KEYS_KERBEROS_REQUIRED) {
+ if(StringUtils.isEmpty(MapUtils.getString(hbaseConfigMap, key))){
+ throw new IllegalArgumentException(String.format("Must provide [%s] when authentication is Kerberos", key));
+ }
+ }
+
+ String principal = getPrincipal(hbaseConfigMap);
+ String keytab = getKeytab(hbaseConfigMap);
+
+ keytab = KerberosUtil.loadFile(hbaseConfigMap, keytab, jobId, plugin);
+ principal = KerberosUtil.findPrincipalFromKeytab(principal, keytab);
+ KerberosUtil.loadKrb5Conf(hbaseConfigMap, jobId, plugin);
+
+ Configuration conf = FileSystemUtil.getConfiguration(hbaseConfigMap, null);
+
+ UserGroupInformation ugi;
+ try {
+ ugi = KerberosUtil.loginAndReturnUGI(conf, principal, keytab);
+ } catch (Exception e){
+ throw new RuntimeException("Login kerberos error", e);
+ }
+
+ return ugi.doAs(new PrivilegedAction() {
+ @Override
+ public Connection run() {
+ try {
+ Configuration hConfiguration = getConfig(hbaseConfigMap);
+ return ConnectionFactory.createConnection(hConfiguration);
+ } catch (IOException e) {
+ LOG.error("Get connection fail with config:{}", hbaseConfigMap);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
public static Configuration getConfig(Map hbaseConfigMap){
Configuration hConfiguration = HBaseConfiguration.create();
for (Map.Entry entry : hbaseConfigMap.entrySet()) {
@@ -103,22 +136,6 @@ public static boolean openKerberos(Map hbaseConfigMap){
return AUTHENTICATION_TYPE.equalsIgnoreCase(MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION));
}
- private static void login(Map hbaseConfigMap, String jobId, String plugin){
- try {
- String principal = getPrincipal(hbaseConfigMap);
- String keytab = getKeytab(hbaseConfigMap);
-
- keytab = KerberosUtil.loadFile(hbaseConfigMap, keytab, jobId, plugin);
- principal = KerberosUtil.findPrincipalFromKeytab(principal, keytab);
- KerberosUtil.loadKrb5Conf(hbaseConfigMap, jobId, plugin);
-
- Configuration conf = FileSystemUtil.getConfiguration(hbaseConfigMap, null);
- KerberosUtil.login(conf, principal, keytab);
- } catch (Exception e){
- throw new RuntimeException("login kerberos error", e);
- }
- }
-
private static String getKeytab(Map hbaseConfigMap){
String keytab = MapUtils.getString(hbaseConfigMap, KEY_HBASE_MASTER_KEYTAB_FILE);
if(StringUtils.isNotEmpty(keytab)){
diff --git a/flinkx-hbase/flinkx-hbase-reader/pom.xml b/flinkx-hbase/flinkx-hbase-reader/pom.xml
index e7f06daac8..49d2a772fe 100644
--- a/flinkx-hbase/flinkx-hbase-reader/pom.xml
+++ b/flinkx-hbase/flinkx-hbase-reader/pom.xml
@@ -48,14 +48,12 @@
-
- com.dtstack.flinkx:flinkx-core
-
- org.apache.flink:*
com.data-artisans:*
org.scala-lang:*
- io.netty:*
+
org.slf4j:slf4j-api
+
+ ch.qos.logback:*
diff --git a/flinkx-hbase/flinkx-hbase-writer/pom.xml b/flinkx-hbase/flinkx-hbase-writer/pom.xml
index 361bdcb4b8..b4dd4ad694 100644
--- a/flinkx-hbase/flinkx-hbase-writer/pom.xml
+++ b/flinkx-hbase/flinkx-hbase-writer/pom.xml
@@ -34,15 +34,14 @@
-
- com.dtstack.flinkx:flinkx-core
com.google.code.gson:*
-
- org.apache.flink:*
com.data-artisans:*
org.scala-lang:*
- io.netty:*
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
diff --git a/flinkx-hdfs/flinkx-hdfs-reader/pom.xml b/flinkx-hdfs/flinkx-hdfs-reader/pom.xml
index afb0f02ac3..9772adcd27 100644
--- a/flinkx-hdfs/flinkx-hdfs-reader/pom.xml
+++ b/flinkx-hdfs/flinkx-hdfs-reader/pom.xml
@@ -79,6 +79,9 @@ under the License.
+
+
+
com.data-artisans:*
org.scala-lang:*
io.netty:*
diff --git a/flinkx-hdfs/flinkx-hdfs-writer/pom.xml b/flinkx-hdfs/flinkx-hdfs-writer/pom.xml
index f9c9c0fcc1..ea275816bb 100644
--- a/flinkx-hdfs/flinkx-hdfs-writer/pom.xml
+++ b/flinkx-hdfs/flinkx-hdfs-writer/pom.xml
@@ -79,6 +79,9 @@ under the License.
+ org.slf4j:slf4j-api
+
+ ch.qos.logback:*
com.google.code.gson:*
com.data-artisans:*
org.scala-lang:*
diff --git a/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/EStoreType.java b/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/EStoreType.java
index 53c7cd5b69..9800f10839 100644
--- a/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/EStoreType.java
+++ b/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/EStoreType.java
@@ -22,5 +22,5 @@
* @author toutian
*/
public enum EStoreType {
- TEXT, ORC
+ TEXT, ORC, PARQUET
}
diff --git a/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/util/DBUtil.java b/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/util/DBUtil.java
index 6be8752991..f1f5b97163 100755
--- a/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/util/DBUtil.java
+++ b/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/util/DBUtil.java
@@ -27,8 +27,11 @@
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.IOException;
+import java.security.PrivilegedAction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
@@ -49,6 +52,8 @@
*/
public final class DBUtil {
+ private static Logger LOG = LoggerFactory.getLogger(DBUtil.class);
+
public static final String SQLSTATE_USERNAME_PWD_ERROR = "28000";
public static final String SQLSTATE_CANNOT_ACQUIRE_CONNECT = "08004";
@@ -77,9 +82,13 @@ private DBUtil() {
public static Connection getConnection(ConnectionInfo connectionInfo) {
if(openKerberos(connectionInfo.getJdbcUrl())){
- login(connectionInfo);
+ return getConnectionWithKerberos(connectionInfo);
+ } else {
+ return getConnectionWithRetry(connectionInfo);
}
+ }
+ private static Connection getConnectionWithRetry(ConnectionInfo connectionInfo){
try {
return RetryUtil.executeWithRetry(new Callable() {
@Override
@@ -92,6 +101,36 @@ public Connection call() throws Exception {
}
}
+ private static Connection getConnectionWithKerberos(ConnectionInfo connectionInfo){
+ if(connectionInfo.getHiveConf() == null || connectionInfo.getHiveConf().isEmpty()){
+ throw new IllegalArgumentException("hiveConf can not be null or empty");
+ }
+
+ String keytab = getKeytab(connectionInfo.getHiveConf());
+ String principal = getPrincipal(connectionInfo.getJdbcUrl());
+
+ keytab = KerberosUtil.loadFile(connectionInfo.getHiveConf(), keytab, connectionInfo.getJobId(), connectionInfo.getPlugin());
+ principal = KerberosUtil.findPrincipalFromKeytab(principal, keytab);
+ KerberosUtil.loadKrb5Conf(connectionInfo.getHiveConf(), connectionInfo.getJobId(), connectionInfo.getPlugin());
+
+ Configuration conf = FileSystemUtil.getConfiguration(connectionInfo.getHiveConf(), null);
+
+ UserGroupInformation ugi;
+ try {
+ ugi = KerberosUtil.loginAndReturnUGI(conf, principal, keytab);
+ } catch (Exception e){
+ throw new RuntimeException("Login kerberos error:", e);
+ }
+
+ LOG.info("current ugi:{}", ugi);
+ return ugi.doAs(new PrivilegedAction() {
+ @Override
+ public Connection run(){
+ return getConnectionWithRetry(connectionInfo);
+ }
+ });
+ }
+
private static boolean openKerberos(final String jdbcUrl){
String[] splits = jdbcUrl.split(JDBC_REGEX);
if (splits.length == 2) {
@@ -108,28 +147,6 @@ private static boolean openKerberos(final String jdbcUrl){
return false;
}
- private static void login(ConnectionInfo info){
- try {
- if(info.getHiveConf() == null || info.getHiveConf().isEmpty()){
- throw new IllegalArgumentException("hiveConf can not be null or empty");
- }
-
- String keytab = getKeytab(info.getHiveConf());
- String principal = getPrincipal(info.getJdbcUrl());
-
- keytab = KerberosUtil.loadFile(info.getHiveConf(), keytab, info.getJobId(), info.getPlugin());
- principal = KerberosUtil.findPrincipalFromKeytab(principal, keytab);
- KerberosUtil.loadKrb5Conf(info.getHiveConf(), info.getJobId(), info.getPlugin());
-
- Configuration conf = FileSystemUtil.getConfiguration(info.getHiveConf(), null);
- conf.set("hadoop.security.authentication", "Kerberos");
-
- KerberosUtil.login(conf, principal, keytab);
- } catch (IOException e) {
- throw new RuntimeException("Login kerberos for hive error", e);
- }
- }
-
private static String getPrincipal(String jdbcUrl){
String[] splits = jdbcUrl.split(JDBC_REGEX);
if (splits.length == 2) {
diff --git a/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/util/HiveUtil.java b/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/util/HiveUtil.java
index c984617315..44355bb94f 100644
--- a/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/util/HiveUtil.java
+++ b/flinkx-hive/flinkx-hive-core/src/main/java/com/dtstack/flinkx/hive/util/HiveUtil.java
@@ -34,7 +34,7 @@
import java.util.regex.Pattern;
import static com.dtstack.flinkx.hive.EStoreType.*;
-import static com.dtstack.flinkx.hive.EWriteModeType.*;
+import static com.dtstack.flinkx.hive.EWriteModeType.OVERWRITE;
/**
* @author toutian
@@ -119,7 +119,7 @@ private void createTable(Connection connection, TableInfo tableInfo) {
DBUtil.executeSqlWithoutResultSet(connection, sql);
} catch (Exception e) {
if (overwrite || !e.getMessage().contains(TableExistException) && !e.getMessage().contains(TableAlreadyExistsException)) {
- logger.error("create table happens error:{}", e);
+ logger.error("create table happens error:", e);
throw new RuntimeException("create table happens error", e);
} else {
if (logger.isDebugEnabled()) {
@@ -153,7 +153,9 @@ private void fillTableInfo(Connection connection, TableInfo tableInfo) {
}
} else if (matcher.group(3).contains(ORC_FORMAT)) {
tableInfo.setStore(ORC.name());
- } else {
+ } else if (matcher.group(3).contains(PARQUET_FORMAT)) {
+ tableInfo.setStore(PARQUET.name());
+ }else {
throw new RuntimeException("Unsupported fileType:" + matcher.group(3));
}
}
@@ -193,8 +195,10 @@ public static String getCreateTableHql(TableInfo tableInfo) {
fieldsb.append(" ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
fieldsb.append(tableInfo.getDelimiter());
fieldsb.append("' LINES TERMINATED BY '\\n' STORED AS TEXTFILE ");
- } else {
+ } else if(ORC.name().equalsIgnoreCase(tableInfo.getStore())) {
fieldsb.append(" STORED AS ORC ");
+ }else{
+ fieldsb.append(" STORED AS PARQUET ");
}
return fieldsb.toString();
}
@@ -271,7 +275,7 @@ public static String convertType(String type) {
}
public static ObjectInspector columnTypeToObjectInspetor(String columnType) {
- ObjectInspector objectInspector = null;
+ ObjectInspector objectInspector;
switch (columnType.toUpperCase()) {
case "TINYINT":
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Byte.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
diff --git a/flinkx-hive/flinkx-hive-writer/pom.xml b/flinkx-hive/flinkx-hive-writer/pom.xml
index 993ad453fe..7fdfe387ff 100644
--- a/flinkx-hive/flinkx-hive-writer/pom.xml
+++ b/flinkx-hive/flinkx-hive-writer/pom.xml
@@ -73,6 +73,9 @@ under the License.
+ org.slf4j:slf4j-api
+
+ ch.qos.logback:*
com.google.code.gson:*
com.data-artisans:*
org.scala-lang:*
diff --git a/flinkx-hive/flinkx-hive-writer/src/main/java/com/dtstack/flinkx/hive/writer/HiveOutputFormat.java b/flinkx-hive/flinkx-hive-writer/src/main/java/com/dtstack/flinkx/hive/writer/HiveOutputFormat.java
index 51ace36523..f15f9f6926 100644
--- a/flinkx-hive/flinkx-hive-writer/src/main/java/com/dtstack/flinkx/hive/writer/HiveOutputFormat.java
+++ b/flinkx-hive/flinkx-hive-writer/src/main/java/com/dtstack/flinkx/hive/writer/HiveOutputFormat.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/flinkx-kafka09/flinkx-kafka09-reader/pom.xml b/flinkx-kafka09/flinkx-kafka09-reader/pom.xml
index e58e3b8d5c..0bd86261f3 100644
--- a/flinkx-kafka09/flinkx-kafka09-reader/pom.xml
+++ b/flinkx-kafka09/flinkx-kafka09-reader/pom.xml
@@ -34,9 +34,9 @@
- org.slf4j:slf4j-log4j12
- log4j:log4j
org.slf4j:slf4j-api
+
+ ch.qos.logback:*
netty-all:io.netty
com.google.guava:guava
diff --git a/flinkx-kafka09/flinkx-kafka09-writer/pom.xml b/flinkx-kafka09/flinkx-kafka09-writer/pom.xml
index 7c92c30762..1191c76809 100644
--- a/flinkx-kafka09/flinkx-kafka09-writer/pom.xml
+++ b/flinkx-kafka09/flinkx-kafka09-writer/pom.xml
@@ -34,9 +34,9 @@
- org.slf4j:slf4j-log4j12
- log4j:log4j
org.slf4j:slf4j-api
+
+ ch.qos.logback:*
netty-all:io.netty
com.google.guava:guava
diff --git a/flinkx-kafka10/flinkx-kafka10-reader/pom.xml b/flinkx-kafka10/flinkx-kafka10-reader/pom.xml
index b9bd1c2663..3010dc6a27 100644
--- a/flinkx-kafka10/flinkx-kafka10-reader/pom.xml
+++ b/flinkx-kafka10/flinkx-kafka10-reader/pom.xml
@@ -34,9 +34,9 @@
- org.slf4j:slf4j-log4j12
- log4j:log4j
org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
netty-all:io.netty
com.google.guava:guava
diff --git a/flinkx-kafka10/flinkx-kafka10-writer/pom.xml b/flinkx-kafka10/flinkx-kafka10-writer/pom.xml
index cc867622ce..d958c9e0da 100644
--- a/flinkx-kafka10/flinkx-kafka10-writer/pom.xml
+++ b/flinkx-kafka10/flinkx-kafka10-writer/pom.xml
@@ -34,9 +34,9 @@
- org.slf4j:slf4j-log4j12
- log4j:log4j
org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
netty-all:io.netty
com.google.guava:guava
diff --git a/flinkx-kafka11/flinkx-kafka11-reader/pom.xml b/flinkx-kafka11/flinkx-kafka11-reader/pom.xml
index 12974ef91c..707be61825 100644
--- a/flinkx-kafka11/flinkx-kafka11-reader/pom.xml
+++ b/flinkx-kafka11/flinkx-kafka11-reader/pom.xml
@@ -34,9 +34,9 @@
- org.slf4j:slf4j-log4j12
- log4j:log4j
org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
netty-all:io.netty
com.google.guava:guava
diff --git a/flinkx-kafka11/flinkx-kafka11-writer/pom.xml b/flinkx-kafka11/flinkx-kafka11-writer/pom.xml
index ee30c513d0..f1cdcd5a84 100644
--- a/flinkx-kafka11/flinkx-kafka11-writer/pom.xml
+++ b/flinkx-kafka11/flinkx-kafka11-writer/pom.xml
@@ -34,9 +34,9 @@
- org.slf4j:slf4j-log4j12
- log4j:log4j
org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
netty-all:io.netty
com.google.guava:guava
diff --git a/flinkx-mongodb/flinkx-mongodb-reader/pom.xml b/flinkx-mongodb/flinkx-mongodb-reader/pom.xml
index 7370b820ef..93b391d673 100644
--- a/flinkx-mongodb/flinkx-mongodb-reader/pom.xml
+++ b/flinkx-mongodb/flinkx-mongodb-reader/pom.xml
@@ -32,6 +32,13 @@
shade
+
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
+
*:*
diff --git a/flinkx-mongodb/flinkx-mongodb-writer/pom.xml b/flinkx-mongodb/flinkx-mongodb-writer/pom.xml
index 13ea9450a5..f5f6c32e7c 100644
--- a/flinkx-mongodb/flinkx-mongodb-writer/pom.xml
+++ b/flinkx-mongodb/flinkx-mongodb-writer/pom.xml
@@ -32,6 +32,13 @@
shade
+
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
+
*:*
diff --git a/flinkx-mysql/flinkx-mysql-dreader/pom.xml b/flinkx-mysql/flinkx-mysql-dreader/pom.xml
index cd93e8c74c..a928e9d8bb 100644
--- a/flinkx-mysql/flinkx-mysql-dreader/pom.xml
+++ b/flinkx-mysql/flinkx-mysql-dreader/pom.xml
@@ -39,6 +39,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-mysql/flinkx-mysql-reader/pom.xml b/flinkx-mysql/flinkx-mysql-reader/pom.xml
index d83832683a..a69e94be40 100644
--- a/flinkx-mysql/flinkx-mysql-reader/pom.xml
+++ b/flinkx-mysql/flinkx-mysql-reader/pom.xml
@@ -39,6 +39,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-mysql/flinkx-mysql-writer/pom.xml b/flinkx-mysql/flinkx-mysql-writer/pom.xml
index 5e06ac818e..0152092504 100644
--- a/flinkx-mysql/flinkx-mysql-writer/pom.xml
+++ b/flinkx-mysql/flinkx-mysql-writer/pom.xml
@@ -39,6 +39,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-odps/flinkx-odps-reader/pom.xml b/flinkx-odps/flinkx-odps-reader/pom.xml
index 7fcb63629c..a34bfb78bf 100644
--- a/flinkx-odps/flinkx-odps-reader/pom.xml
+++ b/flinkx-odps/flinkx-odps-reader/pom.xml
@@ -32,6 +32,13 @@
shade
+
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
+
*:*
diff --git a/flinkx-odps/flinkx-odps-writer/pom.xml b/flinkx-odps/flinkx-odps-writer/pom.xml
index 7b7fc17abc..c6639fb2b3 100644
--- a/flinkx-odps/flinkx-odps-writer/pom.xml
+++ b/flinkx-odps/flinkx-odps-writer/pom.xml
@@ -34,16 +34,14 @@
-
- com.dtstack.flinkx:flinkx-core
com.google.code.gson:*
-
-
- org.apache.flink:*
com.data-artisans:*
org.scala-lang:*
io.netty:*
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
diff --git a/flinkx-oracle/flinkx-oracle-reader/pom.xml b/flinkx-oracle/flinkx-oracle-reader/pom.xml
index c8f167a0fd..634a54148d 100644
--- a/flinkx-oracle/flinkx-oracle-reader/pom.xml
+++ b/flinkx-oracle/flinkx-oracle-reader/pom.xml
@@ -39,6 +39,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-oracle/flinkx-oracle-writer/pom.xml b/flinkx-oracle/flinkx-oracle-writer/pom.xml
index 3f956ad00a..6596230236 100644
--- a/flinkx-oracle/flinkx-oracle-writer/pom.xml
+++ b/flinkx-oracle/flinkx-oracle-writer/pom.xml
@@ -40,6 +40,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-postgresql/flinkx-postgresql-reader/pom.xml b/flinkx-postgresql/flinkx-postgresql-reader/pom.xml
index b31c373ac8..1acb776f0e 100644
--- a/flinkx-postgresql/flinkx-postgresql-reader/pom.xml
+++ b/flinkx-postgresql/flinkx-postgresql-reader/pom.xml
@@ -40,6 +40,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-postgresql/flinkx-postgresql-writer/pom.xml b/flinkx-postgresql/flinkx-postgresql-writer/pom.xml
index 272d6673e8..108b618b4f 100644
--- a/flinkx-postgresql/flinkx-postgresql-writer/pom.xml
+++ b/flinkx-postgresql/flinkx-postgresql-writer/pom.xml
@@ -39,6 +39,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java b/flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java
index 93f07c1bef..d0d16dec0d 100644
--- a/flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java
+++ b/flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java
@@ -26,7 +26,11 @@
import com.dtstack.flinkx.rdb.type.TypeConverterInterface;
import com.dtstack.flinkx.rdb.util.DBUtil;
import com.dtstack.flinkx.reader.MetaColumn;
-import com.dtstack.flinkx.util.*;
+import com.dtstack.flinkx.util.ClassUtil;
+import com.dtstack.flinkx.util.DateUtil;
+import com.dtstack.flinkx.util.FileSystemUtil;
+import com.dtstack.flinkx.util.StringUtil;
+import com.dtstack.flinkx.util.URLUtil;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.accumulators.Accumulator;
@@ -39,9 +43,16 @@
import org.apache.flink.hadoop.shaded.org.apache.http.impl.client.HttpClientBuilder;
import org.apache.flink.types.Row;
import java.io.IOException;
-import java.sql.*;
-import java.util.*;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import com.dtstack.flinkx.inputformat.RichInputFormat;
import org.apache.hadoop.fs.FSDataOutputStream;
diff --git a/flinkx-redis/flinkx-redis-writer/pom.xml b/flinkx-redis/flinkx-redis-writer/pom.xml
index 0417a7cd91..3e0157cd5a 100644
--- a/flinkx-redis/flinkx-redis-writer/pom.xml
+++ b/flinkx-redis/flinkx-redis-writer/pom.xml
@@ -33,6 +33,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-sqlserver/flinkx-sqlserver-reader/pom.xml b/flinkx-sqlserver/flinkx-sqlserver-reader/pom.xml
index c12a3b615f..47e830fc9d 100644
--- a/flinkx-sqlserver/flinkx-sqlserver-reader/pom.xml
+++ b/flinkx-sqlserver/flinkx-sqlserver-reader/pom.xml
@@ -39,6 +39,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-sqlserver/flinkx-sqlserver-writer/pom.xml b/flinkx-sqlserver/flinkx-sqlserver-writer/pom.xml
index 41f015da9d..a27fa1b4a6 100644
--- a/flinkx-sqlserver/flinkx-sqlserver-writer/pom.xml
+++ b/flinkx-sqlserver/flinkx-sqlserver-writer/pom.xml
@@ -40,6 +40,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-stream/flinkx-stream-reader/pom.xml b/flinkx-stream/flinkx-stream-reader/pom.xml
index cf0954e678..25a83749de 100644
--- a/flinkx-stream/flinkx-stream-reader/pom.xml
+++ b/flinkx-stream/flinkx-stream-reader/pom.xml
@@ -33,6 +33,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-stream/flinkx-stream-writer/pom.xml b/flinkx-stream/flinkx-stream-writer/pom.xml
index 57065c8e22..ae915a9b40 100644
--- a/flinkx-stream/flinkx-stream-writer/pom.xml
+++ b/flinkx-stream/flinkx-stream-writer/pom.xml
@@ -29,6 +29,11 @@
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
diff --git a/flinkx-test/src/main/java/com/dtstack/flinkx/test/LocalTest.java b/flinkx-test/src/main/java/com/dtstack/flinkx/test/LocalTest.java
index 762b8acf39..0a802370b7 100644
--- a/flinkx-test/src/main/java/com/dtstack/flinkx/test/LocalTest.java
+++ b/flinkx-test/src/main/java/com/dtstack/flinkx/test/LocalTest.java
@@ -261,14 +261,6 @@ private static void openCheckpointConf(StreamExecutionEnvironment env, Propertie
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- String backendPath = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_DATAURI_KEY);
- if(backendPath != null){
- //set checkpoint save path on file system,hdfs://, file://
- env.setStateBackend(new FsStateBackend(backendPath));
-
- LOG.info("Set StateBackend:" + backendPath);
- }
-
env.setRestartStrategy(RestartStrategies.failureRateRestart(
FAILURE_RATE,
Time.of(FAILURE_INTERVAL, TimeUnit.MINUTES),