diff --git a/flinkx-core/pom.xml b/flinkx-core/pom.xml index 8660b01683..d8ae634452 100644 --- a/flinkx-core/pom.xml +++ b/flinkx-core/pom.xml @@ -63,6 +63,12 @@ org.apache.flink flink-streaming-java_2.11 ${flink.version} + + + org.xerial.snappy + snappy-java + + @@ -75,6 +81,12 @@ org.apache.flink flink-hadoop-compatibility_2.11 ${flink.version} + + + org.xerial.snappy + snappy-java + + diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/common/ColumnType.java b/flinkx-core/src/main/java/com/dtstack/flinkx/common/ColumnType.java index e5f976f05d..b0919806da 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/common/ColumnType.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/common/ColumnType.java @@ -49,8 +49,8 @@ public static ColumnType fromString(String type) { throw new RuntimeException("null ColumnType!"); } - if(type.toUpperCase().startsWith("DECIMAL")) { - return DECIMAL; + if(type.contains("(")){ + type = type.substring(0, type.indexOf("(")); } return valueOf(type.toUpperCase()); diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/inputformat/RichInputFormat.java b/flinkx-core/src/main/java/com/dtstack/flinkx/inputformat/RichInputFormat.java index 54e983599e..bd4d6d2152 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/inputformat/RichInputFormat.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/inputformat/RichInputFormat.java @@ -19,9 +19,8 @@ package com.dtstack.flinkx.inputformat; import com.dtstack.flinkx.constants.Metrics; -import com.dtstack.flinkx.metrics.InputMetric; +import com.dtstack.flinkx.metrics.BaseMetric; import com.dtstack.flinkx.reader.ByteRateLimiter; -import com.dtstack.flinkx.util.SysUtil; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; @@ -51,7 +50,7 @@ public abstract class RichInputFormat extends org.apache.flink.api.common.io.Ric protected long bytes; protected ByteRateLimiter byteRateLimiter; - protected transient InputMetric inputMetric; + protected transient BaseMetric inputMetric; protected abstract void openInternal(InputSplit inputSplit) throws IOException; @@ -64,7 +63,8 @@ public void open(InputSplit inputSplit) throws IOException { numReadCounter = getRuntimeContext().getLongCounter(Metrics.NUM_READS); - inputMetric = new InputMetric(getRuntimeContext(), numReadCounter); + inputMetric = new BaseMetric(getRuntimeContext(), "reader"); + inputMetric.addMetric(Metrics.NUM_READS, numReadCounter); openInternal(inputSplit); @@ -94,8 +94,8 @@ public void close() throws IOException { try{ closeInternal(); - if (inputMetric.getDelayPeriodMill() != 0){ - SysUtil.sleep(inputMetric.getDelayPeriodMill()); + if(inputMetric != null){ + inputMetric.waitForReportMetrics(); } }catch (Exception e){ throw new RuntimeException(e); diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/BaseMetric.java b/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/BaseMetric.java new file mode 100644 index 0000000000..cd6075b265 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/BaseMetric.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.dtstack.flinkx.metrics; + +import com.dtstack.flinkx.constants.Metrics; +import com.dtstack.flinkx.util.SysUtil; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.metrics.MetricGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author jiangbo + * @date 2019/6/5 + */ +public class BaseMetric { + + protected final Logger LOG = LoggerFactory.getLogger(getClass()); + + private final static Long DEFAULT_PERIOD_MILLISECONDS = 10000L; + + private Long delayPeriodMill; + + private MetricGroup flinkxOutput; + + private String sourceName; + + private long totalWaitMill = 0; + + private long maxWaitMill; + + public BaseMetric(RuntimeContext runtimeContext, String sourceName) { + this.sourceName = sourceName; + maxWaitMill = TaskManagerOptions.TASK_CANCELLATION_INTERVAL.defaultValue(); + flinkxOutput = runtimeContext.getMetricGroup().addGroup(Metrics.METRIC_GROUP_KEY_FLINKX, Metrics.METRIC_GROUP_VALUE_OUTPUT); + + if(sourceName.contains("writer")){ + delayPeriodMill = (long)(DEFAULT_PERIOD_MILLISECONDS * 2.5); + } else { + delayPeriodMill = (long)(DEFAULT_PERIOD_MILLISECONDS * 1.2); + } + + LOG.info("delayPeriodMill:[{}]", delayPeriodMill); + } + + public void addMetric(String metricName, LongCounter counter){ + flinkxOutput.gauge(metricName, new SimpleAccumulatorGauge(counter)); + } + + public void waitForReportMetrics(){ + if(delayPeriodMill == 0){ + return; + } + + if(totalWaitMill + delayPeriodMill > maxWaitMill){ + return; + } + + try { + Thread.sleep(delayPeriodMill); + totalWaitMill += delayPeriodMill; + LOG.info("wait [{}] mill for source [{}]", totalWaitMill, sourceName); + } catch (InterruptedException e){ + SysUtil.sleep(delayPeriodMill); + totalWaitMill += delayPeriodMill; + LOG.info("Task [{}] thread is interrupted,wait [{}] mill for source [{}]", sourceName, totalWaitMill, sourceName); + } + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/InputMetric.java b/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/InputMetric.java deleted file mode 100644 index f052f86a3a..0000000000 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/InputMetric.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.flinkx.metrics; - -import com.dtstack.flinkx.constants.Metrics; -import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; -import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; - -import java.lang.reflect.Field; -import java.util.concurrent.RunnableScheduledFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * company: www.dtstack.com - * - * @author: toutian - * create: 2019/3/18 - */ -public class InputMetric { - protected final Logger LOG = LoggerFactory.getLogger(getClass()); - - private RuntimeContext runtimeContext; - - private final static Long DEFAULT_PERIOD_MILLISECONDS = 10000L; - - private Long delayPeriodMill = 12000L; - - public InputMetric(RuntimeContext runtimeContext, LongCounter numRead) { - this.runtimeContext = runtimeContext; - - final MetricGroup flinkxInput = getRuntimeContext().getMetricGroup().addGroup(Metrics.METRIC_GROUP_KEY_FLINKX, Metrics.METRIC_GROUP_VALUE_INPUT); - - flinkxInput.gauge(Metrics.NUM_READS, new SimpleAccumulatorGauge(numRead)); - - initPeriod(); - } - - private RuntimeContext getRuntimeContext() { - return runtimeContext; - } - - public Long getDelayPeriodMill() { - return delayPeriodMill; - } - - public void initPeriod() { - try { - MetricGroup mgObj = runtimeContext.getMetricGroup(); - Class amgCls = (Class) mgObj.getClass().getSuperclass().getSuperclass(); - Field registryField = amgCls.getDeclaredField("registry"); - registryField.setAccessible(true); - MetricRegistryImpl registryImplObj = (MetricRegistryImpl) registryField.get(mgObj); - if (registryImplObj.getReporters().isEmpty()) { - return; - } - Field executorField = registryImplObj.getClass().getDeclaredField("executor"); - executorField.setAccessible(true); - ScheduledExecutorService executor = (ScheduledExecutorService) executorField.get(registryImplObj); - Field scheduleField = (executor.getClass().getSuperclass().getDeclaredField("e")); - scheduleField.setAccessible(true); - ScheduledThreadPoolExecutor scheduleObj = (ScheduledThreadPoolExecutor) scheduleField.get(executor); - Runnable runableObj = scheduleObj.getQueue().iterator().next(); - RunnableScheduledFuture runableFuture = (RunnableScheduledFuture) runableObj; - Field outerTaskField = runableFuture.getClass().getDeclaredField("outerTask"); - outerTaskField.setAccessible(true); - Object scheduledFutureTask = outerTaskField.get(runableFuture); - Field periodField = scheduledFutureTask.getClass().getDeclaredField("period"); - periodField.setAccessible(true); - long schedulePeriod = (long) periodField.get(scheduledFutureTask); - long schedulePeriodMill = -1 * new FiniteDuration(schedulePeriod, TimeUnit.NANOSECONDS).toMillis(); - - LOG.info("InputMetric.scheduledFutureTask.schedulePeriodMill:{} ...", schedulePeriodMill); - - if (schedulePeriodMill > DEFAULT_PERIOD_MILLISECONDS) { - this.delayPeriodMill = (long) (schedulePeriodMill * 1.2); - } - } catch (Exception e) { - LOG.error("{}", e); - } - - LOG.info("InputMetric.delayPeriodMill:{} ...", delayPeriodMill); - } -} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/OutputMetric.java b/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/OutputMetric.java deleted file mode 100644 index 79408ebc15..0000000000 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/OutputMetric.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.flinkx.metrics; - -import com.dtstack.flinkx.constants.Metrics; -import org.apache.flink.api.common.accumulators.IntCounter; -import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.metrics.MetricGroup; - -/** - * company: www.dtstack.com - * - * @author: toutian - * create: 2019/3/18 - */ -public class OutputMetric { - - private transient RuntimeContext runtimeContext; - - public OutputMetric(RuntimeContext runtimeContext, IntCounter numErrors, IntCounter numNullErrors, - IntCounter numDuplicateErrors, IntCounter numConversionErrors, IntCounter numOtherErrors, LongCounter numWrite) { - this.runtimeContext = runtimeContext; - - final MetricGroup flinkxOutput = getRuntimeContext().getMetricGroup().addGroup(Metrics.METRIC_GROUP_KEY_FLINKX, Metrics.METRIC_GROUP_VALUE_OUTPUT); - - flinkxOutput.gauge(Metrics.NUM_ERRORS, new SimpleAccumulatorGauge(numErrors)); - flinkxOutput.gauge(Metrics.NUM_NULL_ERRORS, new SimpleAccumulatorGauge(numNullErrors)); - flinkxOutput.gauge(Metrics.NUM_DUPLICATE_ERRORS, new SimpleAccumulatorGauge(numDuplicateErrors)); - flinkxOutput.gauge(Metrics.NUM_CONVERSION_ERRORS, new SimpleAccumulatorGauge(numConversionErrors)); - flinkxOutput.gauge(Metrics.NUM_OTHER_ERRORS, new SimpleAccumulatorGauge(numOtherErrors)); - flinkxOutput.gauge(Metrics.NUM_WRITES, new SimpleAccumulatorGauge(numWrite)); - } - - private RuntimeContext getRuntimeContext() { - return runtimeContext; - } -} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java b/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java index d587d130fc..5d62a836a0 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java @@ -23,11 +23,10 @@ import com.dtstack.flinkx.latch.Latch; import com.dtstack.flinkx.latch.LocalLatch; import com.dtstack.flinkx.latch.MetricLatch; -import com.dtstack.flinkx.metrics.OutputMetric; +import com.dtstack.flinkx.metrics.BaseMetric; import com.dtstack.flinkx.writer.DirtyDataManager; import com.dtstack.flinkx.writer.ErrorLimiter; import org.apache.commons.lang.StringUtils; -import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -72,19 +71,19 @@ public abstract class RichOutputFormat extends org.apache.flink.api.common.io.Ri protected LongCounter numWriteCounter; /** 错误记录数 */ - protected IntCounter errCounter; + protected LongCounter errCounter; /** Number of null pointer errors */ - protected IntCounter nullErrCounter; + protected LongCounter nullErrCounter; /** Number of primary key conflict errors */ - protected IntCounter duplicateErrCounter; + protected LongCounter duplicateErrCounter; /** Number of type conversion errors */ - protected IntCounter conversionErrCounter; + protected LongCounter conversionErrCounter; /** Number of other errors */ - protected IntCounter otherErrCounter; + protected LongCounter otherErrCounter; /** 错误限制 */ protected ErrorLimiter errorLimiter; @@ -112,7 +111,7 @@ public abstract class RichOutputFormat extends org.apache.flink.api.common.io.Ri protected String jobId; - protected transient OutputMetric outputMetric; + protected transient BaseMetric outputMetric; public DirtyDataManager getDirtyDataManager() { return dirtyDataManager; @@ -167,16 +166,20 @@ public void open(int taskNumber, int numTasks) throws IOException { this.numTasks = numTasks; //错误记录数 - errCounter = context.getIntCounter(Metrics.NUM_ERRORS); - nullErrCounter = context.getIntCounter(Metrics.NUM_NULL_ERRORS); - duplicateErrCounter = context.getIntCounter(Metrics.NUM_DUPLICATE_ERRORS); - conversionErrCounter = context.getIntCounter(Metrics.NUM_CONVERSION_ERRORS); - otherErrCounter = context.getIntCounter(Metrics.NUM_OTHER_ERRORS); - - //总记录数 + errCounter = context.getLongCounter(Metrics.NUM_ERRORS); + nullErrCounter = context.getLongCounter(Metrics.NUM_NULL_ERRORS); + duplicateErrCounter = context.getLongCounter(Metrics.NUM_DUPLICATE_ERRORS); + conversionErrCounter = context.getLongCounter(Metrics.NUM_CONVERSION_ERRORS); + otherErrCounter = context.getLongCounter(Metrics.NUM_OTHER_ERRORS); numWriteCounter = context.getLongCounter(Metrics.NUM_WRITES); - outputMetric = new OutputMetric(context, errCounter, nullErrCounter, duplicateErrCounter, conversionErrCounter, otherErrCounter, numWriteCounter); + outputMetric = new BaseMetric(context, "writer"); + outputMetric.addMetric(Metrics.NUM_ERRORS, errCounter); + outputMetric.addMetric(Metrics.NUM_NULL_ERRORS, nullErrCounter); + outputMetric.addMetric(Metrics.NUM_DUPLICATE_ERRORS, duplicateErrCounter); + outputMetric.addMetric(Metrics.NUM_CONVERSION_ERRORS, conversionErrCounter); + outputMetric.addMetric(Metrics.NUM_OTHER_ERRORS, otherErrCounter); + outputMetric.addMetric(Metrics.NUM_WRITES, numWriteCounter); Map vars = context.getMetricGroup().getAllVariables(); @@ -316,6 +319,11 @@ public void close() throws IOException { if(rows.size() != 0) { writeRecordInternal(); } + + if(outputMetric != null){ + outputMetric.waitForReportMetrics(); + } + if(needWaitBeforeCloseInternal()) { Latch latch = newLatch("#3"); beforeCloseInternal(); @@ -347,7 +355,7 @@ public void close() throws IOException { errorLimiter.updateErrorInfo(); } catch (Exception e){ - LOG.warn("Update error info error when task closing:{}", e); + LOG.warn("Update error info error when task closing: ", e); } errorLimiter.acquire(); diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java index 1fa9aac713..c148221088 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java @@ -207,15 +207,4 @@ public static String row2string(Row row, List columnTypes, String delimi return sb.toString(); } - - /** - * Split the string according to the specified separator,ignore separators in quotes and parentheses - * @param str String to be split - * @param delimter Separator - * @return Result array - */ - public static String[] splitIgnoreQuotaBrackets(String str, String delimter){ - String splitPatternStr = delimter + "(?![^()]*+\\))(?![^{}]*+})(?![^\\[\\]]*+\\])(?=(?:[^\"]|\"[^\"]*\")*$)"; - return str.split(splitPatternStr); - } } diff --git a/flinkx-db2/flinkx-db2-core/src/main/java/com/dtstack/flinkx/db2/Db2DatabaseMeta.java b/flinkx-db2/flinkx-db2-core/src/main/java/com/dtstack/flinkx/db2/Db2DatabaseMeta.java index 12178d0a16..0a15418e27 100644 --- a/flinkx-db2/flinkx-db2-core/src/main/java/com/dtstack/flinkx/db2/Db2DatabaseMeta.java +++ b/flinkx-db2/flinkx-db2-core/src/main/java/com/dtstack/flinkx/db2/Db2DatabaseMeta.java @@ -72,12 +72,12 @@ protected String makeValues(List column) { @Override public String getSQLQueryFields(String tableName) { - return "SELECT * FROM " + tableName + " LIMIT 0"; + return "SELECT * FROM " + tableName + " FETCH FIRST 1 ROWS ONLY"; } @Override public String getSQLQueryColumnFields(List column, String table) { - return "SELECT " + quoteColumns(column) + " FROM " + quoteTable(table) + " LIMIT 0"; + return "SELECT " + quoteColumns(column) + " FROM " + quoteTable(table) + " FETCH FIRST 1 ROWS ONLY"; } @Override diff --git a/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpConfigConstants.java b/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpConfigConstants.java index ff5cd23957..03f0253d22 100644 --- a/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpConfigConstants.java +++ b/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpConfigConstants.java @@ -30,7 +30,7 @@ public class FtpConfigConstants { public static final int DEFAULT_SFTP_PORT = 22; - public static final int DEFAULT_TIMEOUT = 60000; + public static final int DEFAULT_TIMEOUT = 5000; public static final String DEFAULT_FTP_CONNECT_PATTERN = "PASV"; diff --git a/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpConfigKeys.java b/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpConfigKeys.java index e47e52a9d4..df727c2825 100644 --- a/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpConfigKeys.java +++ b/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpConfigKeys.java @@ -48,4 +48,6 @@ public class FtpConfigKeys { public static final String KEY_IS_FIRST_HEADER = "isFirstLineHeader"; + public static final String KEY_TIMEOUT = "timeout"; + } diff --git a/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/StandardFtpHandler.java b/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/StandardFtpHandler.java index 0c02c9f8e6..c3e8cb387f 100644 --- a/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/StandardFtpHandler.java +++ b/flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/StandardFtpHandler.java @@ -46,6 +46,9 @@ public class StandardFtpHandler implements FtpHandler { private FTPClient ftpClient = null; + public FTPClient getFtpClient() { + return ftpClient; + } @Override public void loginFtpServer(String host, String username, String password, int port, int timeout, String connectMode) { @@ -275,7 +278,6 @@ public void deleteAllFilesInDir(String dir) { public InputStream getInputStream(String filePath) { try { InputStream is = ftpClient.retrieveFileStream(new String(filePath.getBytes(),FTP.DEFAULT_CONTROL_ENCODING)); - ftpClient.getReply(); return is; } catch (IOException e) { String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath); diff --git a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java index 377989213f..022c92f78f 100644 --- a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java +++ b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java @@ -59,7 +59,7 @@ public class FtpInputFormat extends RichInputFormat { protected String protocol; - protected Integer timeout = 60000; + protected Integer timeout; protected String connectMode = FtpConfigConstants.DEFAULT_FTP_CONNECT_PATTERN; diff --git a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormatBuilder.java b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormatBuilder.java index 2af194870f..c225a6f627 100644 --- a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormatBuilder.java +++ b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormatBuilder.java @@ -61,6 +61,10 @@ public void setEncoding(String encoding) { } } + public void setTimeout(Integer timeout){ + format.timeout = timeout; + } + public void setMetaColumn(List metaColumns) { format.metaColumns = metaColumns; } diff --git a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpReader.java b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpReader.java index 1d6a1b40fe..d4405b02d4 100644 --- a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpReader.java +++ b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpReader.java @@ -50,6 +50,7 @@ public class FtpReader extends DataReader { private String encoding; private boolean isFirstLineHeader; private List metaColumns; + private Integer timeout; public FtpReader(DataTransferConfig config, StreamExecutionEnvironment env) { @@ -75,6 +76,7 @@ public FtpReader(DataTransferConfig config, StreamExecutionEnvironment env) { password = readerConfig.getParameter().getStringVal(KEY_PASSWORD); encoding = readerConfig.getParameter().getStringVal(KEY_ENCODING); isFirstLineHeader = readerConfig.getParameter().getBooleanVal(KEY_IS_FIRST_HEADER,false); + timeout = readerConfig.getParameter().getIntVal(KEY_TIMEOUT, FtpConfigConstants.DEFAULT_TIMEOUT); List columns = readerConfig.getParameter().getColumn(); metaColumns = MetaColumn.getMetaColumns(columns); @@ -94,6 +96,7 @@ public DataStream readData() { builder.setProtocol(protocol); builder.setUsername(username); builder.setIsFirstLineHeader(isFirstLineHeader); + builder.setTimeout(timeout); return createInput(builder.finish(), "ftpreader"); } diff --git a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpSeqBufferedReader.java b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpSeqBufferedReader.java index e8ee15fb5e..5983393aff 100644 --- a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpSeqBufferedReader.java +++ b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpSeqBufferedReader.java @@ -19,6 +19,7 @@ package com.dtstack.flinkx.ftp.reader; import com.dtstack.flinkx.ftp.FtpHandler; +import com.dtstack.flinkx.ftp.StandardFtpHandler; import java.io.*; import java.util.Iterator; @@ -54,7 +55,7 @@ public String readLine() throws IOException{ if(br != null){ String line = br.readLine(); if (line == null){ - br = null; + close(); return readLine(); } @@ -65,8 +66,6 @@ public String readLine() throws IOException{ } private void nextStream() throws IOException{ - close(); - if(iter.hasNext()){ String file = iter.next(); InputStream in = ftpHandler.getInputStream(file); @@ -88,6 +87,10 @@ public void close() throws IOException { if (br != null){ br.close(); br = null; + + if (ftpHandler instanceof StandardFtpHandler){ + ((StandardFtpHandler) ftpHandler).getFtpClient().completePendingCommand(); + } } } diff --git a/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpOutputFormat.java b/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpOutputFormat.java index 5359ad720f..3a366ee580 100644 --- a/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpOutputFormat.java +++ b/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpOutputFormat.java @@ -64,7 +64,7 @@ public class FtpOutputFormat extends RichOutputFormat { protected String protocol; - protected Integer timeout = 60000; + protected Integer timeout; protected String connectMode = FtpConfigConstants.DEFAULT_FTP_CONNECT_PATTERN; diff --git a/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpOutputFormatBuilder.java b/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpOutputFormatBuilder.java index 0ea49bc349..4bcdd07040 100644 --- a/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpOutputFormatBuilder.java +++ b/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpOutputFormatBuilder.java @@ -100,6 +100,10 @@ public void setConnectPattern(String connectPattern) { } } + public void setTimeout(Integer timeout){ + format.timeout = timeout; + } + @Override protected void checkFormat() { diff --git a/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpWriter.java b/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpWriter.java index 403557364d..855cd5c016 100644 --- a/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpWriter.java +++ b/flinkx-ftp/flinkx-ftp-writer/src/main/java/com/dtstack/flinkx/ftp/writer/FtpWriter.java @@ -20,6 +20,7 @@ import com.dtstack.flinkx.config.DataTransferConfig; import com.dtstack.flinkx.config.WriterConfig; +import com.dtstack.flinkx.ftp.FtpConfigConstants; import com.dtstack.flinkx.util.StringUtil; import com.dtstack.flinkx.writer.DataWriter; import org.apache.flink.streaming.api.datastream.DataStream; @@ -55,6 +56,7 @@ public class FtpWriter extends DataWriter{ private List columnName; private List columnType; + private Integer timeout; public FtpWriter(DataTransferConfig config) { super(config); @@ -74,6 +76,7 @@ public FtpWriter(DataTransferConfig config) { encoding = writerConfig.getParameter().getStringVal(KEY_ENCODING); connectPattern = writerConfig.getParameter().getStringVal(KEY_CONNECT_PATTERN, DEFAULT_FTP_CONNECT_PATTERN); path = writerConfig.getParameter().getStringVal(KEY_PATH); + timeout = writerConfig.getParameter().getIntVal(KEY_TIMEOUT, FtpConfigConstants.DEFAULT_TIMEOUT); fieldDelimiter = writerConfig.getParameter().getStringVal(KEY_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER); if(!fieldDelimiter.equals(DEFAULT_FIELD_DELIMITER)) { @@ -112,6 +115,7 @@ public DataStreamSink writeData(DataStream dataSet) { builder.setDirtyPath(dirtyPath); builder.setDirtyHadoopConfig(dirtyHadoopConfig); builder.setSrcCols(srcCols); + builder.setTimeout(timeout); OutputFormatSinkFunction sinkFunction = new OutputFormatSinkFunction(builder.finish()); DataStreamSink dataStreamSink = dataSet.addSink(sinkFunction); diff --git a/flinkx-hdfs/flinkx-hdfs-core/pom.xml b/flinkx-hdfs/flinkx-hdfs-core/pom.xml index d8afd2fb7c..4966c063b2 100644 --- a/flinkx-hdfs/flinkx-hdfs-core/pom.xml +++ b/flinkx-hdfs/flinkx-hdfs-core/pom.xml @@ -51,6 +51,10 @@ under the License. derby org.apache.derby + + org.xerial.snappy + snappy-java + @@ -63,31 +67,29 @@ under the License. org.apache.hadoop hadoop-yarn-api + + org.xerial.snappy + snappy-java + - - - - - - - - - - - - - - - - - - parquet-hadoop org.apache.parquet 1.8.3 + + + org.xerial.snappy + snappy-java + + + + + + org.xerial.snappy + snappy-java + 1.1.4 diff --git a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsOrcInputFormat.java b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsOrcInputFormat.java index 0edde3ff8e..4615385bba 100644 --- a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsOrcInputFormat.java +++ b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsOrcInputFormat.java @@ -34,10 +34,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; /** * The subclass of HdfsInputFormat which handles orc files @@ -57,6 +54,8 @@ public class HdfsOrcInputFormat extends HdfsInputFormat { private transient List fields; + private static final String COMPLEX_FIELD_TYPE_SYMBOL_REGEX = ".*(<|>|\\{|}|[|]).*"; + @Override protected void configureAnythingElse() { orcSerde = new OrcSerde(); @@ -105,13 +104,17 @@ protected void configureAnythingElse() { int endIndex = typeStruct.lastIndexOf(">"); typeStruct = typeStruct.substring(startIndex, endIndex); - String[] cols = StringUtil.splitIgnoreQuotaBrackets(typeStruct,","); + if(typeStruct.matches(COMPLEX_FIELD_TYPE_SYMBOL_REGEX)){ + throw new RuntimeException("Field types such as array, map, and struct are not supported."); + } + + List cols = parseColumnAndType(typeStruct); - fullColNames = new String[cols.length]; - fullColTypes = new String[cols.length]; + fullColNames = new String[cols.size()]; + fullColTypes = new String[cols.size()]; - for(int i = 0; i < cols.length; ++i) { - String[] temp = cols[i].split(":"); + for(int i = 0; i < cols.size(); ++i) { + String[] temp = cols.get(i).split(":"); fullColNames[i] = temp[0]; fullColTypes[i] = temp[1]; } @@ -133,6 +136,24 @@ protected void configureAnythingElse() { } } + private List parseColumnAndType(String typeStruct){ + List cols = new ArrayList<>(); + List splits = Arrays.asList(typeStruct.split(",")); + Iterator it = splits.iterator(); + while (it.hasNext()){ + String current = it.next(); + if(current.contains("(")){ + if(current.contains("(")){ + String next = it.next(); + cols.add(current + "," + next); + } + } else { + cols.add(current); + } + } + + return cols; + } @Override public HdfsOrcInputSplit[] createInputSplits(int minNumSplits) throws IOException { diff --git a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsParquetInputFormat.java b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsParquetInputFormat.java index f1f1c3d210..87b664a3eb 100644 --- a/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsParquetInputFormat.java +++ b/flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsParquetInputFormat.java @@ -18,6 +18,7 @@ package com.dtstack.flinkx.hdfs.reader; +import com.dtstack.flinkx.common.ColumnType; import com.dtstack.flinkx.hdfs.HdfsUtil; import com.dtstack.flinkx.reader.MetaColumn; import com.google.common.collect.Lists; @@ -171,13 +172,15 @@ public boolean reachedEnd() throws IOException { private Object getData(Group currentLine,String type,int index){ Object data = null; + ColumnType columnType = ColumnType.fromString(type); + try{ if (index == -1){ return null; } Type colSchemaType = currentLine.getType().getType(index); - switch (type){ + switch (columnType.name().toLowerCase()){ case "tinyint" : case "smallint" : case "int" : data = currentLine.getInteger(index,0);break; @@ -278,15 +281,15 @@ private List getAllPartitionPath(String tableLocation) throws IOExceptio return pathList; } - if(fsStatus[0].isDirectory()){ - for(FileStatus status : fsStatus){ + for (FileStatus status : fsStatus) { + if(status.isDirectory()){ pathList.addAll(getAllPartitionPath(status.getPath().toString())); + } else { + pathList.add(status.getPath().toString()); } - return pathList; - }else{ - pathList.add(tableLocation); - return pathList; } + + return pathList; } finally { if (fs != null){ fs.close(); diff --git a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java index a01cc56432..69367529be 100644 --- a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java +++ b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java @@ -23,13 +23,12 @@ import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.*; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.util.LeaderConnectionInfo; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.LegacyYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; @@ -126,8 +125,12 @@ public static ClusterClient createYarnClient(LauncherOptions launcherOptions) { throw new RuntimeException("No flink session found on yarn cluster."); } - AbstractYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor(config, yarnConf, ".", yarnClient, false); - ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId); + HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config); + if(highAvailabilityMode.equals(HighAvailabilityMode.ZOOKEEPER) && applicationId!=null){ + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID,applicationId.toString()); + } + YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(config, yarnConf, "", yarnClient, false); + ClusterClient clusterClient = yarnClusterDescriptor.retrieve(applicationId); clusterClient.setDetached(true); return clusterClient; } catch(Exception e) { diff --git a/jars/db2jcc-3.72.44.jar b/jars/db2jcc-3.72.44.jar new file mode 100644 index 0000000000..16fbffb3c8 Binary files /dev/null and b/jars/db2jcc-3.72.44.jar differ diff --git a/jars/ojdbc8-12.2.0.1.jar b/jars/ojdbc8-12.2.0.1.jar new file mode 100644 index 0000000000..95b03406ab Binary files /dev/null and b/jars/ojdbc8-12.2.0.1.jar differ diff --git a/jars/readme.md b/jars/readme.md new file mode 100644 index 0000000000..c319e27781 --- /dev/null +++ b/jars/readme.md @@ -0,0 +1,17 @@ +# 打包找不到db2和oracle相关驱动包临时解决办法 + +下载这连个驱动包,上传到本地仓库: + +db2:[下载](db2jcc-3.72.44.jar) + +oracle:[下载](ojdbc8-12.2.0.1.jar) + +然后上传到本地仓库: + +``` +mvn install:install-file -DgroupId=com.ibm.db2 -DartifactId=db2jcc -Dversion=3.72.44 -Dpackaging=jar -Dfile=db2jcc-3.72.44.jar + +mvn install:install-file -DgroupId=com.github.noraui -DartifactId=ojdbc8 -Dversion=12.2.0.1 -Dpackaging=jar -Dfile=ojdbc8-12.2.0.1.jar +``` + +说明:这两个驱动包在我们自己搭建的仓库里有,并且这两个版本的驱动包在已经在生产环境中使用,所以不能很快修改版本,需要做相关测试,我们会在后期的版本中修改这两个驱动包的版本,可以先暂时下载安装驱动来解决。