diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java index 50b6935d12..221c780879 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java @@ -96,17 +96,22 @@ public void configure(Configuration configuration) { @Override public void openInternal(InputSplit inputSplit) throws IOException { - try { - ClassUtil.forName(drivername, getClass().getClassLoader()); - dbConn = DBUtil.getConnection(dbURL, username, password); + try { + ClassUtil.forName(drivername, getClass().getClassLoader()); + dbConn = DBUtil.getConnection(dbURL, username, password); - if(drivername.equalsIgnoreCase("org.postgresql.Driver")){ - dbConn.setAutoCommit(false); - } + if(drivername.equalsIgnoreCase("org.postgresql.Driver")){ + dbConn.setAutoCommit(false); + } + + statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); - statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); + //提前执行, 否则会由于此连接select数据没有完全返回造成异常|by lgm + if(descColumnTypeList == null) { + descColumnTypeList = DBUtil.analyzeTable(dbConn,databaseInterface,table,column); + } - if (inputSplit != null && parameterValues != null) { + if (inputSplit != null && parameterValues != null) { for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) { Object param = parameterValues[inputSplit.getSplitNumber()][i]; DBUtil.setParameterValue(param,statement,i); @@ -114,22 +119,31 @@ public void openInternal(InputSplit inputSplit) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()]))); } - } + } + + + //statement.setFetchSize(databaseInterface.getFetchSize()); + //重新设定FetchSize |by lgm + if(drivername.equalsIgnoreCase("mysqlreader")) { + statement.setFetchSize(Integer.MIN_VALUE); + } + else { + statement.setFetchSize(databaseInterface.getFetchSize()); + } + statement.setQueryTimeout(databaseInterface.getQueryTimeout()); + - statement.setFetchSize(databaseInterface.getFetchSize()); - statement.setQueryTimeout(databaseInterface.getQueryTimeout()); - resultSet = statement.executeQuery(); - hasNext = resultSet.next(); - columnCount = resultSet.getMetaData().getColumnCount(); - if(descColumnTypeList == null) { - descColumnTypeList = DBUtil.analyzeTable(dbConn,databaseInterface,table,column); + resultSet = statement.executeQuery(); + hasNext = resultSet.next(); + columnCount = resultSet.getMetaData().getColumnCount(); + + + } catch (SQLException se) { + throw new IllegalArgumentException("open() failed." + se.getMessage(), se); } - } catch (SQLException se) { - throw new IllegalArgumentException("open() failed." + se.getMessage(), se); - } - LOG.info("JdbcInputFormat[" + jobName + "]open: end"); + LOG.info("JdbcInputFormat[" + jobName + "]open: end"); }