Skip to content

读取mysql单表1000万条数据失败 #13

@Liguangmin

Description

@Liguangmin

最近在实用flinkx(flink1.4.0,slot内存大小1G,每个taskmanager配置两个slot,三台服务器组成的集群),发现在抽取mysql数据(单表1000万条,单条记录500字节左右)出现任务卡死的现象,JdbcInputFormat.java openInternal方法中 resultSet = statement.executeQuery();执行后无法返回,我按照JdbcInputFormat.java 中jdbc读取数据的方式单独写个java测试用例,jvm参数 -Xms1024m -Xmx1024m,出现oom异常,将jvm参数调大至2G即可正常运行,我怀疑是FetchSize设置没有生效,将openInternal方法中statement.setFetchSize(databaseInterface.getFetchSize());改为statement.setFetchSize(Integer.MIN_VALUE);,在1G内存下即可正常运行。
同时需要将获取descColumnTypeList的代码提前到statement.executeQuery();之前执行,否则会抛出如下异常
java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@6438a396 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.

附修改后的JdbcInputFormat.java 文件

/*

  • Licensed to the Apache Software Foundation (ASF) under one
  • or more contributor license agreements. See the NOTICE file
  • distributed with this work for additional information
  • regarding copyright ownership. The ASF licenses this file
  • to you under the Apache License, Version 2.0 (the
  • "License"); you may not use this file except in compliance
  • with the License. You may obtain a copy of the License at
  • http://www.apache.org/licenses/LICENSE-2.0
    
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License.
    */

package com.dtstack.flinkx.rdb.inputformat;

import com.dtstack.flinkx.rdb.DatabaseInterface;
import com.dtstack.flinkx.rdb.type.TypeConverterInterface;
import com.dtstack.flinkx.rdb.util.DBUtil;
import com.dtstack.flinkx.util.ClassUtil;
import com.dtstack.flinkx.util.DateUtil;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.Counter;
import org.apache.flink.types.Row;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.;
import java.sql.Date;
import java.util.
;

import com.dtstack.flinkx.inputformat.RichInputFormat;

/**

  • InputFormat for reading data from a database and generate Rows.

  • Company: www.dtstack.com

  • @author huyifan.zju@163.com
    */
    public class JdbcInputFormat extends RichInputFormat {

    protected static final long serialVersionUID = 1L;

    protected DatabaseInterface databaseInterface;

    protected String username;

    protected String password;

    protected String drivername;

    protected String dbURL;

    protected String queryTemplate;

    protected int resultSetType;

    protected int resultSetConcurrency;

    protected List descColumnTypeList;

    protected transient Connection dbConn;

    protected transient PreparedStatement statement;

    protected transient ResultSet resultSet;

    protected boolean hasNext;

    protected Object[][] parameterValues;

    protected int columnCount;

    protected String table;

    protected TypeConverterInterface typeConverter;

    protected List column;

    public JdbcInputFormat() {
    resultSetType = ResultSet.TYPE_FORWARD_ONLY;
    resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
    }

    @OverRide
    public void configure(Configuration configuration) {

    }

    @OverRide
    public void openInternal(InputSplit inputSplit) throws IOException {
    try {
    ClassUtil.forName(drivername, getClass().getClassLoader());
    dbConn = DBUtil.getConnection(dbURL, username, password);

         if(drivername.equalsIgnoreCase("org.postgresql.Driver")){
             dbConn.setAutoCommit(false);
         }
    
         statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
    
         //提前执行 |by lgm
         if(descColumnTypeList == null) {
             descColumnTypeList = DBUtil.analyzeTable(dbConn,databaseInterface,table,column);
         }
    
         if (inputSplit != null && parameterValues != null) {
             for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
                 Object param = parameterValues[inputSplit.getSplitNumber()][i];
                 DBUtil.setParameterValue(param,statement,i);
             }
             if (LOG.isDebugEnabled()) {
                 LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
             }
         }
    
    
         //statement.setFetchSize(databaseInterface.getFetchSize());
           //重新设定FetchSize |by lgm
         if(drivername.equalsIgnoreCase("mysqlreader")) {
             statement.setFetchSize(Integer.MIN_VALUE);
         }
         else {
             statement.setFetchSize(databaseInterface.getFetchSize());
         }
         statement.setQueryTimeout(databaseInterface.getQueryTimeout());
    
    
    
         resultSet = statement.executeQuery();
         hasNext = resultSet.next();
         columnCount = resultSet.getMetaData().getColumnCount();
    
    
     } catch (SQLException se) {
         throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
     }
    
     LOG.info("JdbcInputFormat[" + jobName + "]open: end");
    

    }

    @OverRide
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    return cachedStatistics;
    }

    @OverRide
    public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    if (parameterValues == null) {
    return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
    }
    GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
    for (int i = 0; i < ret.length; i++) {
    ret[i] = new GenericInputSplit(i, ret.length);
    }
    return ret;
    }

    @OverRide
    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
    return new DefaultInputSplitAssigner(inputSplits);
    }

    @OverRide
    public boolean reachedEnd() throws IOException {
    return !hasNext;
    }

    @OverRide
    public Row nextRecordInternal(Row row) throws IOException {
    row = new Row(columnCount);
    try {
    if (!hasNext) {
    return null;
    }

         DBUtil.getRow(dbURL,row,descColumnTypeList,resultSet,typeConverter);
         //update hasNext after we've read the record
         hasNext = resultSet.next();
         return row;
     } catch (SQLException se) {
         throw new IOException("Couldn't read data - " + se.getMessage(), se);
     } catch (NullPointerException npe) {
         throw new IOException("Couldn't access resultSet", npe);
     }
    

    }

    @OverRide
    public void closeInternal() throws IOException {
    DBUtil.closeDBResources(resultSet,statement,dbConn);
    parameterValues = null;
    }

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions