diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java index b8d4641a4f..9804865e56 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java @@ -95,7 +95,7 @@ public DataStreamSink createSink(DataStream dataSet) { rowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral); } - builder.setRowConverter(rowConverter,useAbstractBaseColumn); + builder.setRowConverter(rowConverter, useAbstractBaseColumn); return createOutput(dataSet, builder.finish()); } diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBase14SourceFactory.java b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBase14SourceFactory.java index 1e46c4bb21..61ee91a543 100644 --- a/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBase14SourceFactory.java +++ b/chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBase14SourceFactory.java @@ -97,7 +97,7 @@ public DataStream createSource() { rowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral); } - builder.setRowConverter(rowConverter,useAbstractBaseColumn); + builder.setRowConverter(rowConverter, useAbstractBaseColumn); return createInput(builder.finish()); } } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java index abe833cecd..671badb697 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java @@ -42,9 +42,9 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.tuple.Pair; +import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Connection; import java.sql.Date; @@ -716,52 +716,53 @@ protected String buildQuerySqlBySplit(JdbcInputSplit jdbcInputSplit, List splitRangeFromDb = getSplitRangeFromDb(); - BigInteger left = NumberUtils.createBigInteger(splitRangeFromDb.getLeft()); - BigInteger right = NumberUtils.createBigInteger(splitRangeFromDb.getRight()); + if (StringUtils.isBlank(splitRangeFromDb.getLeft()) + || "null".equalsIgnoreCase(splitRangeFromDb.getLeft())) { + // 没有数据,返回空数组 + return new JdbcInputSplit[minNumSplits]; + } + BigDecimal left = new BigDecimal(splitRangeFromDb.getLeft()); + BigDecimal right = new BigDecimal(splitRangeFromDb.getRight()); LOG.info("create splitsInternal,the splitKey range is {} --> {}", left, right); - // 没有数据 返回空数组 - if (left == null || right == null) { - splits = new JdbcInputSplit[minNumSplits]; - } else { - BigInteger endAndStartGap = right.subtract(left); - - BigInteger step = endAndStartGap.divide(BigInteger.valueOf(minNumSplits)); - BigInteger remainder = endAndStartGap.remainder(BigInteger.valueOf(minNumSplits)); - if (step.compareTo(BigInteger.ZERO) == 0) { - // left = right时,step和remainder都为0 - if (remainder.compareTo(BigInteger.ZERO) == 0) { - minNumSplits = 1; - } else { - minNumSplits = remainder.intValue(); - } + BigDecimal endAndStartGap = right.subtract(left); + BigDecimal remainder = endAndStartGap.remainder(new BigDecimal(minNumSplits)); + endAndStartGap = endAndStartGap.subtract(remainder); + BigDecimal step = endAndStartGap.divide(new BigDecimal(minNumSplits)); + + if (step.compareTo(BigDecimal.ZERO) == 0) { + // left = right时,step和remainder都为0 + if (remainder.compareTo(BigDecimal.ZERO) == 0) { + minNumSplits = 1; + } else { + minNumSplits = remainder.intValue(); } + } - splits = new JdbcInputSplit[minNumSplits]; - BigInteger start; - BigInteger end = left; - for (int i = 0; i < minNumSplits; i++) { - start = end; - end = start.add(step); - end = - end.add( - (remainder.compareTo(BigInteger.valueOf(i)) > 0) - ? BigInteger.ONE - : BigInteger.ZERO); - // 分片范围是 splitPk >=start and splitPk < end 最后一个分片范围是splitPk >= start - if (i == minNumSplits - 1) { - end = null; - } - splits[i] = - new JdbcInputSplit( - i, - minNumSplits, - i, - jdbcConf.getStartLocation(), - null, - start.toString(), - Objects.isNull(end) ? null : end.toString()); + splits = new JdbcInputSplit[minNumSplits]; + BigDecimal start; + BigDecimal end = left; + for (int i = 0; i < minNumSplits; i++) { + start = end; + end = start.add(step); + if (remainder.compareTo(BigDecimal.ZERO) > 0) { + end = end.add(BigDecimal.ONE); + remainder = remainder.subtract(BigDecimal.ONE); } + // 分片范围是 splitPk >=start and splitPk < end 最后一个分片范围是splitPk >= start + if (i == minNumSplits - 1) { + end = null; + } + splits[i] = + new JdbcInputSplit( + i, + minNumSplits, + i, + jdbcConf.getStartLocation(), + null, + start.toString(), + Objects.isNull(end) ? null : end.toString()); } + return splits; } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/SqlUtil.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/SqlUtil.java index ae9ce43c95..3f4a17de17 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/SqlUtil.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/SqlUtil.java @@ -57,7 +57,7 @@ public static String buildQuerySplitRangeSql(JdbcConf jdbcConf, JdbcDialect jdbc } else { // rowNum字段作为splitKey - if (addRowNumColumn(jdbcConf.getSplitPk())) { + if (isRowNumSplitKey(jdbcConf.getSplitPk())) { StringBuilder customTableBuilder = new StringBuilder(128) .append("SELECT ") @@ -98,7 +98,7 @@ public static String buildQuerySqlBySplit( // customSql为空 且 splitPk是ROW_NUMBER() boolean flag = StringUtils.isBlank(jdbcConf.getCustomSql()) - && SqlUtil.addRowNumColumn(jdbcConf.getSplitPk()); + && SqlUtil.isRowNumSplitKey(jdbcConf.getSplitPk()); String splitFilter = null; if (jdbcInputSplit.getTotalNumberOfSplits() > 1) { @@ -227,7 +227,7 @@ public static String buildOrderSql( } /* 是否添加自定义函数column 作为分片key ***/ - public static boolean addRowNumColumn(String splitKey) { + public static boolean isRowNumSplitKey(String splitKey) { return StringUtils.isNotBlank(splitKey) && splitKey.contains(ConstantValue.LEFT_PARENTHESIS_SYMBOL); } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/test/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatTest.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/test/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatTest.java new file mode 100644 index 0000000000..211198b8b6 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/test/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.jdbc.source; + +import com.dtstack.chunjun.connector.jdbc.conf.JdbcConf; + +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; +import org.slf4j.Logger; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; + +/** @author liuliu 2022/4/15 */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({JdbcInputFormat.class}) +public class JdbcInputFormatTest { + + JdbcInputFormat jdbcInputFormat; + + @Before + public void setup() { + jdbcInputFormat = PowerMockito.mock(JdbcInputFormat.class); + Logger LOG = PowerMockito.mock(Logger.class); + Whitebox.setInternalState(jdbcInputFormat, "LOG", LOG); + JdbcConf jdbcConf = PowerMockito.mock(JdbcConf.class); + Whitebox.setInternalState(jdbcInputFormat, "jdbcConf", jdbcConf); + PowerMockito.when(jdbcConf.getStartLocation()).thenReturn("10"); + } + + @Test + public void createSplitsInternalBySplitRangeTest() + throws InvocationTargetException, IllegalAccessException { + PowerMockito.when(jdbcInputFormat.createSplitsInternalBySplitRange(Mockito.anyInt())) + .thenCallRealMethod(); + Method getSplitRangeFromDb = + PowerMockito.method(JdbcInputFormat.class, "getSplitRangeFromDb"); + Mockito.when(getSplitRangeFromDb.invoke(jdbcInputFormat)) + .thenReturn(Pair.of("12.123", "345534.12")); + JdbcInputSplit[] splitsInternalBySplitRange = + jdbcInputFormat.createSplitsInternalBySplitRange(3); + Arrays.stream(splitsInternalBySplitRange).forEach(split -> System.out.println(split)); + assert splitsInternalBySplitRange.length == 3; + } +} diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/element/column/ArrayColumn.java b/chunjun-core/src/main/java/com/dtstack/chunjun/element/column/ArrayColumn.java index c4ddfde39c..c9e5ffb215 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/element/column/ArrayColumn.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/element/column/ArrayColumn.java @@ -42,8 +42,7 @@ public class ArrayColumn extends AbstractBaseColumn { protected Array data; public ArrayColumn(final Array data) { - super(data); - this.data = data; + super(data, data.toString().length()); } @Override