Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7c50b91
Merge remote-tracking branch 'origin/1.5_v3.7.3' into 1.5_v3.8.0_beta…
lijiangbo Sep 17, 2019
0d3cf72
update
lijiangbo Sep 20, 2019
e94d446
Merge branch '1.8_dev' into 1.8_kerberos_local_test
lijiangbo Sep 21, 2019
37e4470
实时采集sink到hive,hdfs 增加parquet格式
kanata163 Sep 24, 2019
88b14bb
Merge branch 'feature_1.8_dev_parquet' into '1.8_dev'
lijiangbo Sep 24, 2019
6ad5406
exclude log4j logback
jiemotongxue Sep 24, 2019
4e9126e
hdfs carbon kafka09直接依赖了log4j
jiemotongxue Sep 24, 2019
b9f6873
Merge branch '1.8_dev_skiplog4jAndlogback' into '1.8_dev'
lijiangbo Sep 24, 2019
f728e84
hdfs carbon kafka09直接依赖了log4j
jiemotongxue Sep 24, 2019
1c23b42
hdfs carbon kafka09直接依赖了log4j
jiemotongxue Sep 24, 2019
64a1f1a
hbase 依赖 netty
jiemotongxue Sep 24, 2019
539e489
Merge branch '1.8_dev_skiplog4jAndlogback' into '1.8_dev'
lijiangbo Sep 24, 2019
a657c1b
去掉固定的stateBackend
lijiangbo Sep 24, 2019
b343187
add logback
jiemotongxue Sep 25, 2019
417d77c
Merge branch '1.8_dev_skiplog4jAndlogback' into '1.8_dev'
lijiangbo Sep 25, 2019
240413c
Merge branch 'feature_1.8_stateBackend' into '1.8_dev'
kanata163 Sep 25, 2019
96304b6
LocalTest类去掉FsStateBackend
lijiangbo Sep 25, 2019
1a40a98
Merge remote-tracking branch 'origin/1.8_dev' into 1.8_kerberos_local…
lijiangbo Sep 25, 2019
d27cb2f
flinkx-core/pom.xml include
jiemotongxue Sep 25, 2019
817bd82
1.txt skiip
jiemotongxue Sep 25, 2019
aa63b29
Merge branch '1.8_dev_skiplog4jAndlogback' into '1.8_dev'
lijiangbo Sep 25, 2019
45a5ef0
Merge branch '1.8_dev' into 1.8_kerberos_local_test
lijiangbo Sep 25, 2019
bbe23a6
优化错误日志记录
kanata163 Sep 25, 2019
ac9b114
Merge branch 'fix_1.5_dev_19035' into '1.5_dev'
lijiangbo Sep 25, 2019
92d93c8
Merge branch '1.5_dev' into 1.8_dev
kanata163 Sep 25, 2019
867f748
修改kerberos登录方式
lijiangbo Sep 25, 2019
fe569b8
Merge branch '1.8_kerberos_local_test' into '1.8_dev'
lijiangbo Sep 25, 2019
666536a
【fix NPE】【数据同步 临时运行任务,报超时异常】【19035】
kanata163 Sep 26, 2019
26fcb5c
[脏数据空指针错误]
lijiangbo Sep 27, 2019
50070d4
Merge remote-tracking branch 'origin/1.5_v3.8.2' into 1.5_dev
lijiangbo Sep 27, 2019
5463060
Merge remote-tracking branch 'origin/1.5_dev' into 1.8_dev
lijiangbo Sep 27, 2019
a3e1ff8
添加新插件文档
lijiangbo Oct 5, 2019
aa7c4c4
Merge branch 'feature_add_doc' into '1.5_dev'
lijiangbo Oct 5, 2019
b1e33fc
Merge remote-tracking branch 'origin/1.5_dev' into 1.8_dev
lijiangbo Oct 5, 2019
166493c
添加Kerberos文档
lijiangbo Oct 5, 2019
0912a73
添加统计指标文档
lijiangbo Oct 5, 2019
adc60ae
Merge branch 'kerberos_doc' into '1.8_dev'
lijiangbo Oct 5, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ reader和writer包括name和parameter,分别表示插件名称和插件参数
* [MongoDB读取插件](docs/mongodbreader.md)
* [Stream读取插件](docs/streamreader.md)
* [Carbondata读取插件](docs/carbondatareader.md)
* [MySQL binlog读取插件](docs/binlog.md)
* [KafKa读取插件](docs/kafkareader.md)

### 5.2 写入插件

Expand All @@ -244,10 +246,23 @@ reader和writer包括name和parameter,分别表示插件名称和插件参数
* [Redis写入插件](docs/rediswriter.md)
* [Stream写入插件](docs/streamwriter.md)
* [Carbondata写入插件](docs/carbondatawriter.md)
* [Kafka写入插件](docs/kafkawriter.md)
* [Hive写入插件](docs/hivewriter.md)

[断点续传和实时采集功能介绍](docs/restore.md)

[数据源开启Kerberos](docs/kerberos.md)

[统计指标说明](docs/statistics.md)

## 6.版本说明

1.flinkx的分支版本跟flink的版本对应,比如:flinkx v1.4.0 对应 flink1.4.0,现在支持flink1.4和1.5
1.flinkx的分支版本跟flink的版本对应,比如:flinkx v1.5.0 对应 flink1.5.0,版本说明:

| 插件版本 | flink版本 |
| ----- | ------- |
| 1.5.x | 1.5.4 |
| 1.8.x | 1.8.1 |

## 7.招聘信息

Expand Down
191 changes: 191 additions & 0 deletions docs/binlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# MySQL binlog读取插件(*reader)

## 1. 配置样例

```json
{
"job": {
"content": [{
"reader": {
"parameter": {
"jdbcUrl" : "jdbc:mysql://127.0.0.1:3306/test?charset=utf8",
"username" : "username",
"password" : "password",
"host" : "127.0.0.1",
"port": 3306,
"table" : [ "test_sink" ],
"filter" : "",
"cat" : "insert,update,delete",
"start" : {
"journalName" : "bin.000004",
"timestamp" : 123123
},
"pavingData" : false,
"bufferSize" : 1024
},
"name": "binlogreader"
},
"writer": {

}
}]
},
"setting": {

}
}
```

## 2. 参数说明

* **name**

* 描述:插件名,此处填写插件名称。

* 必选:是

* 默认值:无

* **jdbcUrl**

* 描述:MySQL数据库的jdbc连接字符串,参考文档:[Mysql官方文档](http://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html)

* 必选:是

* 默认值:无

* **username**

* 描述:数据源的用户名

* 必选:是

* 默认值:无

* **password**

* 描述:数据源指定用户名的密码

* 必选:是

* 默认值:无

* **host**

* 描述:启动MySQL slave的机器ip。

* 必选:是

* 默认值:无

* **port**

* 描述:启动MySQL slave的端口

* 必选:否

* 默认值:3306

* **table**

* 描述:需要解析的数据表。

* 注意:指定此参数后filter参数将无效。

* 必选:否

* 默认值:无

* **filter**

* 描述:过滤表名的Perl正则表达式。

* 例子:

* 所有表:.* or .*\\..*

* canal schema下所有表: canal\\..*

* canal下的以canal打头的表:canal\\.canal.*

* canal schema下的一张表:canal\\.test1

* 必选:否

* 默认值:无

* **cat**

* 描述:需要解析的数据更新类型,包括insert、update、delete三种。

* 注意:以英文逗号分割的格式填写。

* 必选:否

* 默认值:null

* **start**

* 描述:要读取的binlog文件的开始位置。

* 参数:

* journalName:采集起点按文件开始时的文件名称;

* timestamp:采集七点按时间开始时的时间戳;

* 默认值:无

* **pavingData**

* 描述:是否将解析出的json数据拍平

* 示例:假设解析的表为tb1,数据库为test,对tb1中的id字段做update操作,id原来的值为1,更新后为2,则pavingData为true时数据格式为:

```json
{
"type":"update",
"schema":"test",
"table":"tb1",
"ts":1231232,
"ingestion":123213,
"before_id":1,
"after_id":2
}
```

pavingData为false时:

```json
{
"message":{
"type":"update",
"schema":"test",
"table":"tb1",
"ts":1231232,
"ingestion":123213,
"before_id":{
"id":1
},
"after_id":{
"id":2
}
}
}
```

其中”ts“是数据变更时间,ingestion是插件解析这条数据的纳秒时间

* 必选:否

* 默认值:false

* **bufferSize**

* 描述:并发缓存大小

* 注意:必须为2的幂

* 必选:否

* 默认值:1024
78 changes: 42 additions & 36 deletions docs/hbasereader.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,56 @@

## 1. 配置样例

```
```json
{
"job": {
"setting": {},
"content": [{
"reader": {
"name": "hbasereader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.property.clientPort": "2181",
"hbase.rootdir": "hdfs://ns1/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "host1,host2,host3",
"zookeeper.znode.parent": "/hbase"
},
"table": "tableTest",
"encodig": "utf-8",
"column": [{
"name": "rowkey",
"type": "string"
},
{
"name": "cf1:id",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": true
}
}
},
"writer": {}
}]
}
"job": {
"setting": {},
"content": [{
"reader": {
"name": "hbasereader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.property.clientPort": "2181",
"hbase.rootdir": "hdfs://ns1/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "host1,host2,host3",
"zookeeper.znode.parent": "/hbase",
"hbase.security.authentication":"Kerberos",
"hbase.security.authorization":true,
"hbase.master.kerberos.principal":"hbase/node1@TEST.COM",
"hbase.master.keytab.file":"hbase.keytab",
"hbase.regionserver.keytab.file":"hbase.keytab",
"hbase.regionserver.kerberos.principal":"hbase/node1@TEST.COM"
},
"table": "tableTest",
"encodig": "utf-8",
"column": [{
"name": "rowkey",
"type": "string"
},
{
"name": "cf1:id",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": true
}
}
},
"writer": {}
}]
}
}
```

## 2. 参数说明

* **hbaseConfig**

* 描述:hbase的连接配置,以json的形式组织 (见hbase-site.xml)
* 描述:hbase的连接配置,以json的形式组织 (见hbase-site.xml),开启kerberos的话参考文档[数据源开启Kerberos](kerberos.md)

* 必选:是

Expand Down
12 changes: 9 additions & 3 deletions docs/hbasewriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## 1. 配置样例

```
```json
{
"job": {
"setting": {
Expand All @@ -17,7 +17,13 @@
"hbase.rootdir": "hdfs://ns1/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "host1,host2,host3",
"zookeeper.znode.parent": "/hbase"
"zookeeper.znode.parent": "/hbase",
"hbase.security.authentication":"Kerberos",
"hbase.security.authorization":true,
"hbase.master.kerberos.principal":"hbase/node1@TEST.COM",
"hbase.master.keytab.file":"hbase.keytab",
"hbase.regionserver.keytab.file":"hbase.keytab",
"hbase.regionserver.kerberos.principal":"hbase/node1@TEST.COM"
},
"table": "tableTest",
"rowkeyColumn": [{
Expand Down Expand Up @@ -50,7 +56,7 @@

* **hbaseConfig**

* 描述:hbase的连接配置,以json的形式组织 (见hbase-site.xml)
* 描述:hbase的连接配置,以json的形式组织 (见hbase-site.xml),开启kerberos的话参考文档[数据源开启Kerberos](kerberos.md)

* 必选:是

Expand Down
2 changes: 1 addition & 1 deletion docs/hdfsreader.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

* **path**

* 描述:要读取的文件路径,多个路径可以用逗号隔开
* 描述:要读取的文件路径,多个路径可以用逗号隔开,开启kerberos的话参考文档[数据源开启Kerberos](kerberos.md)。

* 必选:是 <br />

Expand Down
2 changes: 1 addition & 1 deletion docs/hdfswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

* **defaultFS**

* 描述:Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:端口;例如:hdfs://127.0.0.1:9000<br />
* 描述:Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:端口;例如:hdfs://127.0.0.1:9

* 必选:是 <br />

Expand Down
Loading