Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ public void close() throws IOException {
if(rows.size() != 0) {
writeRecordInternal();
}

if(needWaitBeforeCloseInternal()) {
Latch latch = newLatch("#3");
beforeCloseInternal();
Expand All @@ -292,26 +291,24 @@ public void close() throws IOException {
latch.waitUntil(numTasks);
System.out.println("hyf waitUtil end");
}

closeInternal();

if(needWaitAfterCloseInternal()) {
Latch latch = newLatch("#4");
latch.addOne();
latch.waitUntil(numTasks);
}
}finally {
afterCloseInternal();

if(dirtyDataManager != null) {
dirtyDataManager.close();
}

if(errorLimiter != null) {
errorLimiter.acquire();
errorLimiter.stop();
try{
closeInternal();
if(needWaitAfterCloseInternal()) {
Latch latch = newLatch("#4");
latch.addOne();
latch.waitUntil(numTasks);
}
afterCloseInternal();
}finally {
if(dirtyDataManager != null) {
dirtyDataManager.close();
}
if(errorLimiter != null) {
errorLimiter.acquire();
errorLimiter.stop();
}
}

LOG.info("subtask[" + taskNumber + "] close() finished");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public HdfsWriter(DataTransferConfig config) {
password = writerConfig.getParameter().getStringVal(KEY_PASSWORD);

if(StringUtil.isNotEmpty(partition)) {
try {
Connection dbConn = HiveUtil.getConnection(dbUrl, username, password);
try (Connection dbConn = HiveUtil.getConnection(dbUrl, username, password);){
HiveUtil.addPartitionsIfNotExists(dbConn, table, partition);
} catch (SQLException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.hadoop.shaded.com.google.common.collect.Sets;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -45,19 +44,15 @@ public class HiveUtil {
private static String COMMA = ",";
private static String ASSIGN = "=";
private static String SINGLE_QUOTE = "'";
private static String hiveDriverClass = "org.apache.hive.jdbc.HiveDriver";

private static String quotedString(String str) {
return QUOTE + str + QUOTE;
}

public static Connection getConnection(String url, String username, String password) throws SQLException {
Connection dbConn = null;
ClassUtil.forName("org.apache.hive.jdbc.HiveDriver", HiveUtil.class.getClassLoader());
if (username == null) {
dbConn = DriverManager.getConnection(url);
} else {
dbConn = DriverManager.getConnection(url, username, password);
}
ClassUtil.forName(hiveDriverClass, HiveUtil.class.getClassLoader());
Connection dbConn = DBUtil.getConnection(url, username, password);
return dbConn;
}

Expand Down Expand Up @@ -145,6 +140,4 @@ public static List<String> showPartitions(Connection dbConn, String table) {
}
return partitions;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.dtstack.flinkx.rdb.DatabaseInterface;
import com.dtstack.flinkx.rdb.inputformat.JdbcInputFormatBuilder;
import com.dtstack.flinkx.inputformat.RichInputFormat;
import com.dtstack.flinkx.rdb.util.DBUtil;
import com.dtstack.flinkx.reader.DataReader;
import com.dtstack.flinkx.util.ClassUtil;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
Expand Down Expand Up @@ -242,10 +243,9 @@ protected String getQuerySql() {
}

protected Connection getConnection() {

try {
ClassUtil.forName(databaseInterface.getDriverClass(), this.getClass().getClassLoader());
connection = DriverManager.getConnection(dbUrl, username, password);
connection = DBUtil.getConnection(dbUrl, username, password);
} catch (Throwable e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import com.dtstack.flinkx.config.WriterConfig;
import com.dtstack.flinkx.rdb.DatabaseInterface;
import com.dtstack.flinkx.rdb.outputformat.JdbcOutputFormatBuilder;
import com.dtstack.flinkx.rdb.util.DBUtil;
import com.dtstack.flinkx.util.ClassUtil;
import com.dtstack.flinkx.writer.DataWriter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.types.Row;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -145,7 +145,7 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
protected Connection getConnection() {
try {
ClassUtil.forName(databaseInterface.getDriverClass(), this.getClass().getClassLoader());
connection = DriverManager.getConnection(dbUrl, username, password);
connection = DBUtil.getConnection(dbUrl, username, password);
} catch (Throwable e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,41 +264,8 @@ public Row nextRecordInternal(Row row) throws IOException {

@Override
public void closeInternal() throws IOException {
LOG.info("-------invoke close fun------------");

//called once per inputFormat (on stop)
LOG.info("-----------close input format--------");
try {
if(statement != null) {
statement.close();
}
} catch (SQLException se) {
LOG.error("Inputformat Statement couldn't be closed - " + se.getMessage());
} finally {
statement = null;
}

try {
if(dbConn != null) {
dbConn.close();
}
} catch (SQLException se) {
LOG.error("Inputformat couldn't be closed - " + se.getMessage());
} finally {
dbConn = null;
}

DBUtil.closeDBResources(resultSet,statement,dbConn);
parameterValues = null;

if(resultSet == null) {
return;
}
try {
resultSet.close();
} catch (SQLException se) {
LOG.error("Inputformat ResultSet couldn't be closed - " + se.getMessage());
}

}

}