Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
64ac39c
Fix build query sql
lijiangbo Mar 7, 2019
4320cea
Support string type for MaximumAccumulator
lijiangbo Mar 7, 2019
aab174a
过滤条件空串判断
lijiangbo Apr 11, 2019
1f32b8d
Merge branch 'feature_1.5_v3.3.5_filter' into '1.5_v3.3.5'
lijiangbo Apr 11, 2019
fbfa577
过滤条件空串判断
lijiangbo Apr 11, 2019
4b8ba53
过滤条件空串判断
lijiangbo Apr 11, 2019
765a860
自定义sql
lijiangbo Apr 16, 2019
0b77682
自定义sql
lijiangbo Apr 16, 2019
c26b09d
writerConfig里jdbcUrl格式兼容
lijiangbo Apr 11, 2019
ef5ce52
fix读取parquet表bug
lijiangbo Apr 19, 2019
fee19bc
es连接参数可配置
lijiangbo Apr 9, 2019
4cdb9bd
timeout单位改为秒
lijiangbo Apr 9, 2019
c8dfd1e
对批量写入es结果进行处理
lijiangbo Apr 10, 2019
249c931
fix hdfs读取text格式文件多"\u0000"的bug
lijiangbo Apr 18, 2019
0741a0d
fix禁用取max值功能
lijiangbo Apr 25, 2019
e55e3a2
fix npe
lijiangbo Apr 25, 2019
cc9ba86
fix npe
lijiangbo Apr 25, 2019
00dcaed
fix
lijiangbo Apr 26, 2019
c2535d2
Merge remote-tracking branch 'origin/1.5_v3.5.3' into 1.5_v3.5.4
lijiangbo Apr 26, 2019
952ea7d
fix禁用取max值功能
lijiangbo Apr 25, 2019
431a086
fix npe
lijiangbo Apr 25, 2019
f06e4bd
fix npe
lijiangbo Apr 25, 2019
68b16df
fix
lijiangbo Apr 26, 2019
bfc67b3
fix useMaxFunc为false时sql拼接问题
lijiangbo Apr 26, 2019
2455bfa
Merge branch 'feature_1.5_v3.3.6_useMaxFunc' into '1.5_v3.3.6'
zoudaokoulife Apr 26, 2019
2f6afcc
useMaxFunc为false时不读取边界值
lijiangbo Apr 26, 2019
75f47df
Merge branch 'feature_1.5_v3.3.6_useMaxFunc_falae' into '1.5_v3.3.6'
zoudaokoulife Apr 26, 2019
28a4dec
oracle增量字段区分date和timestamp类型
lijiangbo Apr 29, 2019
e59c98d
Merge branch 'feature_oracleDateTs' into '1.5_v3.3.6'
zoudaokoulife May 5, 2019
ea81da4
修改oracle增量字段date类型的日期格式
lijiangbo May 5, 2019
12235f8
Merge branch 'date_format' into '1.5_v3.3.6'
zoudaokoulife May 5, 2019
07bbfa3
fix useMaxFunc为false时sql拼接问题
lijiangbo Apr 26, 2019
48b13ee
useMaxFunc为false时不读取边界值
lijiangbo Apr 26, 2019
4e49386
oracle增量字段区分date和timestamp类型
lijiangbo Apr 29, 2019
194df88
修改oracle增量字段date类型的日期格式
lijiangbo May 5, 2019
45f3620
上传jar包
lijiangbo May 11, 2019
5de73c6
add readme file
lijiangbo May 11, 2019
91d3b31
db2不使用limit
lijiangbo May 14, 2019
88def12
db2不使用limit
lijiangbo May 14, 2019
e13163b
fix read file from ftp
lijiangbo May 17, 2019
4f1afb2
Merge branch 'fix_read_ftp' into '1.5_v3.6.0_beta_1.0'
zoudaokoulife May 17, 2019
0a34415
Merge remote-tracking branch 'origin/1.5_v3.6.0_beta_1.0' into 1.5_dev
lijiangbo May 17, 2019
8091e44
add log
lijiangbo Jun 5, 2019
8713d2b
fix任务取消时指标输出问题
lijiangbo Jun 5, 2019
fb8621d
设置任务关闭时默认等待时间
lijiangbo Jun 6, 2019
4344f72
fix读取hdfs字段信息报错
lijiangbo Jun 11, 2019
daac168
fix读取hdfs字段信息报错
lijiangbo Jun 11, 2019
b9e9106
fix指标输出等待时间
lijiangbo Jun 11, 2019
6e6f758
fix
lijiangbo Jun 11, 2019
f688b6d
Merge remote-tracking branch 'origin/1.5_v3.3.6' into 1.5_v3.5.6
lijiangbo Jun 12, 2019
089913c
Merge remote-tracking branch 'origin/1.5_v3.5.6' into 1.5_dev
lijiangbo Jun 12, 2019
13a637f
merge 3.5.6 into current
lijiangbo Jun 12, 2019
b554be1
任务结束等待指标输出时间设为固定值
lijiangbo Jun 12, 2019
edffa16
Revert "fix读取hdfs字段信息报错"
lijiangbo Jun 13, 2019
f499ff2
fix读取orc字段报错
lijiangbo Jun 13, 2019
8d4d430
fix读取orc字段报错
lijiangbo Jun 13, 2019
df7eb6b
Merge remote-tracking branch 'origin/1.5_v3.3.7' into 1.5_v3.5.7
lijiangbo Jun 13, 2019
1f3040b
Merge remote-tracking branch 'origin/1.5_v3.5.7' into 1.5_dev
lijiangbo Jun 13, 2019
b33f383
排除多余的snappy依赖
lijiangbo Jun 17, 2019
eddbdc9
decimal类型获取错误
lijiangbo Jun 17, 2019
a3ef645
Merge remote-tracking branch 'origin/1.5_v3.3.7' into 1.5_v3.5.7
lijiangbo Jun 17, 2019
5a3735b
Merge remote-tracking branch 'origin/1.5_v3.5.7' into 1.5_dev
lijiangbo Jun 17, 2019
34d5bde
snappy重复依赖排除
lijiangbo Jun 17, 2019
d790078
snappy重复依赖排除
lijiangbo Jun 17, 2019
d02da8c
Merge remote-tracking branch 'origin/1.5_v3.3.7' into 1.5_v3.5.7
lijiangbo Jun 17, 2019
3d278b1
Merge remote-tracking branch 'origin/1.5_v3.5.7' into 1.5_dev
lijiangbo Jun 17, 2019
e028b6b
fix读取parquet表创建分片bug
lijiangbo Jun 19, 2019
0c49c45
fix npe
lijiangbo Jun 21, 2019
3d1bf75
fix npe
lijiangbo Jun 21, 2019
317c5e1
fix读取parquet数据重复问题
lijiangbo Jun 21, 2019
979cf23
fix ftp读取多文件问题
lijiangbo Jun 22, 2019
798a6a3
fix ftp读取多文件问题
lijiangbo Jun 22, 2019
5c70c0a
fix ftp读取多文件问题
lijiangbo Jun 24, 2019
15e0769
fix ftp读取多文件问题
lijiangbo Jun 22, 2019
6460775
fix ftp读取多文件问题
lijiangbo Jun 24, 2019
adab1d2
ftp连接超时时间可配置
lijiangbo Jun 24, 2019
ac7d9dd
Merge remote-tracking branch 'origin/1.5_v3.3.7' into 1.5_v3.5.7
lijiangbo Jun 24, 2019
61196b5
Merge remote-tracking branch 'origin/1.5_v3.5.7' into 1.5_dev
lijiangbo Jun 24, 2019
098d067
修复flinkx on yarn模式运行报错问题
Jun 26, 2019
8d27f38
Merge pull request #63 from gk0916/v1.5.1
yangsishu Jun 27, 2019
ec24feb
Merge branch 'v1.5.1' of https://github.com/DTStack/flinkx into v1.5.2
lijiangbo Jun 28, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions flinkx-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -75,6 +81,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public static ColumnType fromString(String type) {
throw new RuntimeException("null ColumnType!");
}

if(type.toUpperCase().startsWith("DECIMAL")) {
return DECIMAL;
if(type.contains("(")){
type = type.substring(0, type.indexOf("("));
}

return valueOf(type.toUpperCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
package com.dtstack.flinkx.inputformat;

import com.dtstack.flinkx.constants.Metrics;
import com.dtstack.flinkx.metrics.InputMetric;
import com.dtstack.flinkx.metrics.BaseMetric;
import com.dtstack.flinkx.reader.ByteRateLimiter;
import com.dtstack.flinkx.util.SysUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
Expand Down Expand Up @@ -51,7 +50,7 @@ public abstract class RichInputFormat extends org.apache.flink.api.common.io.Ric
protected long bytes;
protected ByteRateLimiter byteRateLimiter;

protected transient InputMetric inputMetric;
protected transient BaseMetric inputMetric;

protected abstract void openInternal(InputSplit inputSplit) throws IOException;

Expand All @@ -64,7 +63,8 @@ public void open(InputSplit inputSplit) throws IOException {

numReadCounter = getRuntimeContext().getLongCounter(Metrics.NUM_READS);

inputMetric = new InputMetric(getRuntimeContext(), numReadCounter);
inputMetric = new BaseMetric(getRuntimeContext(), "reader");
inputMetric.addMetric(Metrics.NUM_READS, numReadCounter);

openInternal(inputSplit);

Expand Down Expand Up @@ -94,8 +94,8 @@ public void close() throws IOException {
try{
closeInternal();

if (inputMetric.getDelayPeriodMill() != 0){
SysUtil.sleep(inputMetric.getDelayPeriodMill());
if(inputMetric != null){
inputMetric.waitForReportMetrics();
}
}catch (Exception e){
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package com.dtstack.flinkx.metrics;

import com.dtstack.flinkx.constants.Metrics;
import com.dtstack.flinkx.util.SysUtil;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* @author jiangbo
* @date 2019/6/5
*/
public class BaseMetric {

protected final Logger LOG = LoggerFactory.getLogger(getClass());

private final static Long DEFAULT_PERIOD_MILLISECONDS = 10000L;

private Long delayPeriodMill;

private MetricGroup flinkxOutput;

private String sourceName;

private long totalWaitMill = 0;

private long maxWaitMill;

public BaseMetric(RuntimeContext runtimeContext, String sourceName) {
this.sourceName = sourceName;
maxWaitMill = TaskManagerOptions.TASK_CANCELLATION_INTERVAL.defaultValue();
flinkxOutput = runtimeContext.getMetricGroup().addGroup(Metrics.METRIC_GROUP_KEY_FLINKX, Metrics.METRIC_GROUP_VALUE_OUTPUT);

if(sourceName.contains("writer")){
delayPeriodMill = (long)(DEFAULT_PERIOD_MILLISECONDS * 2.5);
} else {
delayPeriodMill = (long)(DEFAULT_PERIOD_MILLISECONDS * 1.2);
}

LOG.info("delayPeriodMill:[{}]", delayPeriodMill);
}

public void addMetric(String metricName, LongCounter counter){
flinkxOutput.gauge(metricName, new SimpleAccumulatorGauge<Long>(counter));
}

public void waitForReportMetrics(){
if(delayPeriodMill == 0){
return;
}

if(totalWaitMill + delayPeriodMill > maxWaitMill){
return;
}

try {
Thread.sleep(delayPeriodMill);
totalWaitMill += delayPeriodMill;
LOG.info("wait [{}] mill for source [{}]", totalWaitMill, sourceName);
} catch (InterruptedException e){
SysUtil.sleep(delayPeriodMill);
totalWaitMill += delayPeriodMill;
LOG.info("Task [{}] thread is interrupted,wait [{}] mill for source [{}]", sourceName, totalWaitMill, sourceName);
}
}
}
107 changes: 0 additions & 107 deletions flinkx-core/src/main/java/com/dtstack/flinkx/metrics/InputMetric.java

This file was deleted.

This file was deleted.

Loading