diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java b/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java index 5fc04e879e..915b8754f5 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java @@ -283,7 +283,6 @@ public void close() throws IOException { if(rows.size() != 0) { writeRecordInternal(); } - if(needWaitBeforeCloseInternal()) { Latch latch = newLatch("#3"); beforeCloseInternal(); @@ -292,26 +291,24 @@ public void close() throws IOException { latch.waitUntil(numTasks); System.out.println("hyf waitUtil end"); } - - closeInternal(); - - if(needWaitAfterCloseInternal()) { - Latch latch = newLatch("#4"); - latch.addOne(); - latch.waitUntil(numTasks); - } }finally { - afterCloseInternal(); - - if(dirtyDataManager != null) { - dirtyDataManager.close(); - } - - if(errorLimiter != null) { - errorLimiter.acquire(); - errorLimiter.stop(); + try{ + closeInternal(); + if(needWaitAfterCloseInternal()) { + Latch latch = newLatch("#4"); + latch.addOne(); + latch.waitUntil(numTasks); + } + afterCloseInternal(); + }finally { + if(dirtyDataManager != null) { + dirtyDataManager.close(); + } + if(errorLimiter != null) { + errorLimiter.acquire(); + errorLimiter.stop(); + } } - LOG.info("subtask[" + taskNumber + "] close() finished"); } } diff --git a/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsWriter.java b/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsWriter.java index 058dd2d270..f298206b28 100644 --- a/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsWriter.java +++ b/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsWriter.java @@ -97,8 +97,7 @@ public HdfsWriter(DataTransferConfig config) { password = writerConfig.getParameter().getStringVal(KEY_PASSWORD); if(StringUtil.isNotEmpty(partition)) { - try { - Connection dbConn = HiveUtil.getConnection(dbUrl, username, password); + try (Connection dbConn = HiveUtil.getConnection(dbUrl, username, password);){ HiveUtil.addPartitionsIfNotExists(dbConn, table, partition); } catch (SQLException e) { throw new RuntimeException(e); diff --git a/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HiveUtil.java b/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HiveUtil.java index 991d483705..c3b319c8ba 100644 --- a/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HiveUtil.java +++ b/flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HiveUtil.java @@ -24,7 +24,6 @@ import org.apache.flink.hadoop.shaded.com.google.common.collect.Sets; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -45,19 +44,15 @@ public class HiveUtil { private static String COMMA = ","; private static String ASSIGN = "="; private static String SINGLE_QUOTE = "'"; + private static String hiveDriverClass = "org.apache.hive.jdbc.HiveDriver"; private static String quotedString(String str) { return QUOTE + str + QUOTE; } public static Connection getConnection(String url, String username, String password) throws SQLException { - Connection dbConn = null; - ClassUtil.forName("org.apache.hive.jdbc.HiveDriver", HiveUtil.class.getClassLoader()); - if (username == null) { - dbConn = DriverManager.getConnection(url); - } else { - dbConn = DriverManager.getConnection(url, username, password); - } + ClassUtil.forName(hiveDriverClass, HiveUtil.class.getClassLoader()); + Connection dbConn = DBUtil.getConnection(url, username, password); return dbConn; } @@ -145,6 +140,4 @@ public static List showPartitions(Connection dbConn, String table) { } return partitions; } - - } diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/JdbcDataReader.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/JdbcDataReader.java index b9a64b9171..1b1470ab8a 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/JdbcDataReader.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datareader/JdbcDataReader.java @@ -23,6 +23,7 @@ import com.dtstack.flinkx.rdb.DatabaseInterface; import com.dtstack.flinkx.rdb.inputformat.JdbcInputFormatBuilder; import com.dtstack.flinkx.inputformat.RichInputFormat; +import com.dtstack.flinkx.rdb.util.DBUtil; import com.dtstack.flinkx.reader.DataReader; import com.dtstack.flinkx.util.ClassUtil; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -242,10 +243,9 @@ protected String getQuerySql() { } protected Connection getConnection() { - try { ClassUtil.forName(databaseInterface.getDriverClass(), this.getClass().getClassLoader()); - connection = DriverManager.getConnection(dbUrl, username, password); + connection = DBUtil.getConnection(dbUrl, username, password); } catch (Throwable e) { throw new RuntimeException(e); } diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datawriter/JdbcDataWriter.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datawriter/JdbcDataWriter.java index 33342598d6..6d1919bbff 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datawriter/JdbcDataWriter.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/datawriter/JdbcDataWriter.java @@ -22,6 +22,7 @@ import com.dtstack.flinkx.config.WriterConfig; import com.dtstack.flinkx.rdb.DatabaseInterface; import com.dtstack.flinkx.rdb.outputformat.JdbcOutputFormatBuilder; +import com.dtstack.flinkx.rdb.util.DBUtil; import com.dtstack.flinkx.util.ClassUtil; import com.dtstack.flinkx.writer.DataWriter; import org.apache.flink.streaming.api.datastream.DataStream; @@ -29,7 +30,6 @@ import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.types.Row; import java.sql.Connection; -import java.sql.DriverManager; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -145,7 +145,7 @@ public DataStreamSink writeData(DataStream dataSet) { protected Connection getConnection() { try { ClassUtil.forName(databaseInterface.getDriverClass(), this.getClass().getClassLoader()); - connection = DriverManager.getConnection(dbUrl, username, password); + connection = DBUtil.getConnection(dbUrl, username, password); } catch (Throwable e) { throw new RuntimeException(e); } diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java index 759927aca7..1623c3faec 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java @@ -264,41 +264,8 @@ public Row nextRecordInternal(Row row) throws IOException { @Override public void closeInternal() throws IOException { - LOG.info("-------invoke close fun------------"); - - //called once per inputFormat (on stop) - LOG.info("-----------close input format--------"); - try { - if(statement != null) { - statement.close(); - } - } catch (SQLException se) { - LOG.error("Inputformat Statement couldn't be closed - " + se.getMessage()); - } finally { - statement = null; - } - - try { - if(dbConn != null) { - dbConn.close(); - } - } catch (SQLException se) { - LOG.error("Inputformat couldn't be closed - " + se.getMessage()); - } finally { - dbConn = null; - } - + DBUtil.closeDBResources(resultSet,statement,dbConn); parameterValues = null; - - if(resultSet == null) { - return; - } - try { - resultSet.close(); - } catch (SQLException se) { - LOG.error("Inputformat ResultSet couldn't be closed - " + se.getMessage()); - } - } }