From b5f9eef033d257d163a95a58333b6dde79692962 Mon Sep 17 00:00:00 2001 From: otxiyin Date: Tue, 13 Sep 2022 11:01:09 +0800 Subject: [PATCH 1/2] [feat-#1244][sybase] add sybase reader plugin --- .../chunjun-connector-sybase/pom.xml | 93 +++++ .../converter/SybaseColumnConverter.java | 137 ++++++ .../converter/SybaseRawTypeConverter.java | 91 ++++ .../sybase/dialect/SybaseDialect.java | 80 ++++ .../sybase/source/SybaseSourceFactory.java | 42 ++ .../table/SybaseDynamicTableFactory.java | 38 ++ .../org.apache.flink.table.factories.Factory | 19 + chunjun-connectors/pom.xml | 1 + .../chunjun/metrics/AccumulatorCollector.java | 2 +- .../com/dtstack/chunjun/util/TelnetUtil.java | 7 + .../json/sybase/sybase_stream.json | 187 +++++++++ chunjun-local-test/pom.xml | 5 + .../sybase/sybase-source.md" | 395 ++++++++++++++++++ 13 files changed, 1096 insertions(+), 1 deletion(-) create mode 100644 chunjun-connectors/chunjun-connector-sybase/pom.xml create mode 100644 chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseColumnConverter.java create mode 100644 chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseRawTypeConverter.java create mode 100644 chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/dialect/SybaseDialect.java create mode 100644 chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/source/SybaseSourceFactory.java create mode 100644 chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/table/SybaseDynamicTableFactory.java create mode 100644 chunjun-connectors/chunjun-connector-sybase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 chunjun-examples/json/sybase/sybase_stream.json create mode 100644 "docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/sybase/sybase-source.md" diff --git a/chunjun-connectors/chunjun-connector-sybase/pom.xml b/chunjun-connectors/chunjun-connector-sybase/pom.xml new file mode 100644 index 0000000000..c91adcfcb0 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-sybase/pom.xml @@ -0,0 +1,93 @@ + + + + chunjun-connectors + com.dtstack.chunjun + 1.12-SNAPSHOT + + 4.0.0 + + chunjun-connector-sybase + ChunJun : Connectors : sybase + + + + com.dtstack.chunjun + chunjun-connector-jdbc-base + ${project.version} + + + jdbc.sybase + jconnect + 7.7 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.0 + + + package + + shade + + + false + + + org.slf4j:slf4j-api + log4j:log4j + ch.qos.logback:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + maven-antrun-plugin + + + copy-resources + + package + + run + + + + + + + + + + + + + + + + + diff --git a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseColumnConverter.java b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseColumnConverter.java new file mode 100644 index 0000000000..b0ae684503 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseColumnConverter.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.sybase.converter; + +import com.dtstack.chunjun.conf.ChunJunCommonConf; +import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter; +import com.dtstack.chunjun.converter.IDeserializationConverter; +import com.dtstack.chunjun.element.AbstractBaseColumn; +import com.dtstack.chunjun.element.column.BigDecimalColumn; +import com.dtstack.chunjun.element.column.BooleanColumn; +import com.dtstack.chunjun.element.column.BytesColumn; +import com.dtstack.chunjun.element.column.SqlDateColumn; +import com.dtstack.chunjun.element.column.StringColumn; +import com.dtstack.chunjun.element.column.TimeColumn; +import com.dtstack.chunjun.element.column.TimestampColumn; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; + +import com.sybase.jdbc4.tds.SybTimestamp; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** @Author OT @Date 2022/6/16 17:52 @Version 1.0 */ +public class SybaseColumnConverter extends JdbcColumnConverter { + public SybaseColumnConverter(RowType rowType, ChunJunCommonConf commonConf) { + super(rowType, commonConf); + } + + @Override + protected IDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return val -> { + // compatible with BIT(>1) + if (val instanceof byte[]) { + return new BytesColumn((byte[]) val); + } else { + return new BooleanColumn(Boolean.parseBoolean(val.toString())); + } + }; + case TINYINT: + return val -> new BigDecimalColumn(((Integer) val).byteValue()); + case SMALLINT: + case INTEGER: + return val -> new BigDecimalColumn((Integer) val); + case INTERVAL_YEAR_MONTH: + return (IDeserializationConverter) + val -> { + YearMonthIntervalType yearMonthIntervalType = + (YearMonthIntervalType) type; + switch (yearMonthIntervalType.getResolution()) { + case YEAR: + return new BigDecimalColumn( + Integer.parseInt(String.valueOf(val).substring(0, 4))); + case MONTH: + case YEAR_TO_MONTH: + default: + throw new UnsupportedOperationException( + "jdbc converter only support YEAR"); + } + }; + case FLOAT: + return val -> { + if (val instanceof Double) { + BigDecimal b = new BigDecimal(String.valueOf(val)); + return new BigDecimalColumn(b.doubleValue()); + } + return new BigDecimalColumn((Float) val); + }; + case DOUBLE: + return val -> new BigDecimalColumn((Double) val); + case BIGINT: + return val -> { + if (val instanceof Integer) { + return new BigDecimalColumn((Integer) val); + } + return new BigDecimalColumn((Long) val); + }; + case DECIMAL: + return val -> { + if (val instanceof BigInteger) { + return new BigDecimalColumn((BigInteger) val); + } + return new BigDecimalColumn((BigDecimal) val); + }; + case CHAR: + case VARCHAR: + return val -> new StringColumn((String) val); + case DATE: + return val -> new SqlDateColumn((Date) val); + case TIME_WITHOUT_TIME_ZONE: + return val -> { + if (val instanceof SybTimestamp) { + Time time = + Time.valueOf(((SybTimestamp) val).toLocalDateTime().toLocalTime()); + return new TimeColumn(time); + } + return new TimeColumn((Time) val); + }; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (IDeserializationConverter) + val -> + new TimestampColumn( + (Timestamp) val, ((TimestampType) (type)).getPrecision()); + + case BINARY: + case VARBINARY: + return val -> new BytesColumn((byte[]) val); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseRawTypeConverter.java b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseRawTypeConverter.java new file mode 100644 index 0000000000..7f4263eee4 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseRawTypeConverter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.sybase.converter; + +import com.dtstack.chunjun.throwable.UnsupportedTypeException; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +import java.util.Locale; + +/** @Author OT @Date 2022/6/13 17:59 @Version 1.0 */ +public class SybaseRawTypeConverter { + public static DataType apply(String type) { + switch (type.toUpperCase(Locale.ENGLISH)) { + case "BIGINT": + case "UNSIGNED INT": + return DataTypes.BIGINT(); + case "INT": + case "INTEGER": + case "UNSIGNED SMALLINT": + return DataTypes.INT(); + case "SMALLINT": + return DataTypes.SMALLINT(); + case "TINYINT": + return DataTypes.TINYINT(); + case "UNSIGNED BIGINT": + return DataTypes.DECIMAL(20, 0); + case "NUMERIC": + case "DECIMAL": + return DataTypes.DECIMAL(38, 18); + case "NUMERIC IDENTITY": + return DataTypes.DECIMAL(38, 0); + case "FLOAT": + case "REAL": + return DataTypes.FLOAT(); + case "DOUBLE": + return DataTypes.DOUBLE(); + case "SMALLMONEY": + return DataTypes.DECIMAL(10, 4); + case "MONEY": + return DataTypes.DECIMAL(19, 4); + case "SMALLDATETIME": + return DataTypes.TIMESTAMP(0); + case "DATETIME": + case "BIGDATETIME": + return DataTypes.TIMESTAMP(3); + case "DATE": + return DataTypes.DATE(); + case "TIME": + case "BIGTIME": + return DataTypes.TIME(); + case "CHAR": + case "VARCHAR": + case "UNICHAR": + case "UNIVARCHAR": + case "NCHAR": + case "NVARCHAR": + case "TEXT": + case "UNITEXT": + case "LONGSYSNAME": + case "STRING": + return DataTypes.STRING(); + case "BINARY": + case "TIMESTAMP": + case "VARBINARY": + case "IMAGE": + return DataTypes.BYTES(); + case "BIT": + return DataTypes.BOOLEAN(); + default: + throw new UnsupportedTypeException(type); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/dialect/SybaseDialect.java b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/dialect/SybaseDialect.java new file mode 100644 index 0000000000..21d11c6e7f --- /dev/null +++ b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/dialect/SybaseDialect.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.sybase.dialect; + +import com.dtstack.chunjun.conf.ChunJunCommonConf; +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.jdbc.source.JdbcInputSplit; +import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.chunjun.connector.sybase.converter.SybaseColumnConverter; +import com.dtstack.chunjun.connector.sybase.converter.SybaseRawTypeConverter; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.converter.RawTypeConverter; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import io.vertx.core.json.JsonArray; + +import java.sql.ResultSet; +import java.util.Optional; + +/** @Author OT @Date 2022/6/16 13:54 @Version 1.0 */ +public class SybaseDialect implements JdbcDialect { + private static final String DIALECT_NAME = "Sybase"; + private static final String DRIVER_NAME = "com.sybase.jdbc4.jdbc.SybDriver"; + + @Override + public String dialectName() { + return DIALECT_NAME; + } + + @Override + public boolean canHandle(String url) { + return url.startsWith("jdbc:sybase:Tds:"); + } + + @Override + public RawTypeConverter getRawTypeConverter() { + return SybaseRawTypeConverter::apply; + } + + @Override + public Optional defaultDriverName() { + return Optional.of(DRIVER_NAME); + } + + @Override + public String quoteIdentifier(String identifier) { + return "\"" + identifier + "\""; + } + + @Override + public AbstractRowConverter + getColumnConverter(RowType rowType, ChunJunCommonConf commonConf) { + return new SybaseColumnConverter(rowType, commonConf); + } + + @Override + public String getSplitModFilter(JdbcInputSplit split, String splitPkName) { + return String.format( + "%s %% %s = %s", + quoteIdentifier(splitPkName), split.getTotalNumberOfSplits(), split.getMod()); + } +} diff --git a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/source/SybaseSourceFactory.java b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/source/SybaseSourceFactory.java new file mode 100644 index 0000000000..c57860feb8 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/source/SybaseSourceFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.sybase.source; + +import com.dtstack.chunjun.conf.SyncConf; +import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory; +import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; +import com.dtstack.chunjun.connector.sybase.dialect.SybaseDialect; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.lang3.StringUtils; + +/** @Author OT @Date 2022/6/13 17:42 @Version 1.0 */ +public class SybaseSourceFactory extends JdbcSourceFactory { + public SybaseSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) { + super(syncConf, env, new SybaseDialect()); + // avoid result.next blocking + if (jdbcConf.isPolling() + && StringUtils.isEmpty(jdbcConf.getStartLocation()) + && jdbcConf.getFetchSize() == 0) { + jdbcConf.setFetchSize(1000); + } + JdbcUtil.putExtParam(jdbcConf); + } +} diff --git a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/table/SybaseDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/table/SybaseDynamicTableFactory.java new file mode 100644 index 0000000000..3309c938fe --- /dev/null +++ b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/table/SybaseDynamicTableFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.sybase.table; + +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory; +import com.dtstack.chunjun.connector.sybase.dialect.SybaseDialect; + +/** @Author OT @Date 2022/6/16 15:25 @Version 1.0 */ +public class SybaseDynamicTableFactory extends JdbcDynamicTableFactory { + private static final String IDENTIFIER = "sybase-x"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + protected JdbcDialect getDialect() { + return new SybaseDialect(); + } +} diff --git a/chunjun-connectors/chunjun-connector-sybase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/chunjun-connectors/chunjun-connector-sybase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..0ebaf9f6be --- /dev/null +++ b/chunjun-connectors/chunjun-connector-sybase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +com.dtstack.chunjun.connector.sybase.table.SybaseDynamicTableFactory diff --git a/chunjun-connectors/pom.xml b/chunjun-connectors/pom.xml index 1e3a7f4abd..00e9e21a68 100644 --- a/chunjun-connectors/pom.xml +++ b/chunjun-connectors/pom.xml @@ -42,6 +42,7 @@ chunjun-connector-starrocks chunjun-connector-oceanbase chunjun-connector-cassandra + chunjun-connector-sybase chunjun-connector-file diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/AccumulatorCollector.java b/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/AccumulatorCollector.java index 5f7ebd61e6..e71e2046ea 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/AccumulatorCollector.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/AccumulatorCollector.java @@ -149,7 +149,7 @@ public void collectAccumulator() { public long getAccumulatorValue(String name, boolean needWaited) { if (needWaited) { try { - TimeUnit.MILLISECONDS.wait(this.period); + TimeUnit.MILLISECONDS.sleep(this.period); } catch (InterruptedException e) { LOG.warn( "Interrupted when waiting for valueAccumulatorMap, e = {}", diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/util/TelnetUtil.java b/chunjun-core/src/main/java/com/dtstack/chunjun/util/TelnetUtil.java index 9c6b2a6656..2a87cc951c 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/util/TelnetUtil.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/util/TelnetUtil.java @@ -36,8 +36,13 @@ public class TelnetUtil { private static final Pattern JDBC_PATTERN = Pattern.compile("(?[^:@/]+):(?\\d+).*"); public static final String PHOENIX_PREFIX = "jdbc:phoenix"; + + private static final String SYBASE_PREFIX = "jdbc:sybase:Tds"; private static final Pattern PHOENIX_PATTERN = Pattern.compile("jdbc:phoenix:(?\\S+):(?\\d+).*"); + + private static final Pattern SYBASE_PATTERN = + Pattern.compile("jdbc:sybase:Tds:(?\\S+):(?\\d+).*"); private static final String HOST_KEY = "host"; private static final String PORT_KEY = "port"; private static final String SPLIT_KEY = ","; @@ -85,6 +90,8 @@ public static void telnet(String url) { Matcher matcher = null; if (StringUtils.startsWith(url, PHOENIX_PREFIX)) { matcher = PHOENIX_PATTERN.matcher(url); + } else if (StringUtils.startsWith(url, SYBASE_PREFIX)) { + matcher = SYBASE_PATTERN.matcher(url); } else { matcher = JDBC_PATTERN.matcher(url); } diff --git a/chunjun-examples/json/sybase/sybase_stream.json b/chunjun-examples/json/sybase/sybase_stream.json new file mode 100644 index 0000000000..57eacf5894 --- /dev/null +++ b/chunjun-examples/json/sybase/sybase_stream.json @@ -0,0 +1,187 @@ +{ + "job": { + "content": [ + { + "reader": { + "parameter": { + "password": "********", + "column": [ + { + "name": "field_id", + "type": "int" + }, + { + "name": "field_bit", + "type": "bit" + }, + { + "name": "field_tinyint", + "type": "tinyint" + }, + { + "name": "field_smallint", + "type": "smallint" + }, + { + "name": "field_unsigned_smallint", + "type": "unsigned smallint" + }, + { + "name": "field_int", + "type": "int" + }, + { + "name": "field_unsigned_int", + "type": "unsigned int" + }, + { + "name": "field_bigint", + "type": "bigint" + }, + { + "name": "field_unsigned_bigint", + "type": "unsigned bigint" + }, + { + "name": "field_decimal", + "type": "decimal" + }, + { + "name": "field_numeric", + "type": "numeric" + }, + { + "name": "field_float", + "type": "float" + }, + { + "name": "field_double_precision", + "type": "double" + }, + { + "name": "field_real", + "type": "real" + }, + { + "name": "field_smallmoney", + "type": "smallmoney" + }, + { + "name": "field_money", + "type": "money" + }, + { + "name": "field_date", + "type": "date" + }, + { + "name": "field_time", + "type": "time" + }, + { + "name": "field_bigtime", + "type": "bigtime" + }, + { + "name": "field_smalldatetime", + "type": "smalldatetime" + }, + { + "name": "field_datetime", + "type": "datetime" + }, + { + "name": "field_bigdatetime", + "type": "bigdatetime" + }, + { + "name": "field_timestamp", + "type": "timestamp" + }, + { + "name": "field_char", + "type": "char" + }, + { + "name": "field_nchar", + "type": "nchar" + }, + { + "name": "field_unichar", + "type": "unichar" + }, + { + "name": "field_varchar", + "type": "varchar" + }, + { + "name": "field_nvarchar", + "type": "nvarchar" + }, + { + "name": "field_univarchar", + "type": "univarchar" + }, + { + "name": "field_text", + "type": "text" + }, + { + "name": "field_unitext", + "type": "unitext" + }, + { + "name": "field_longsysname", + "type": "longsysname" + }, + { + "name": "field_binary", + "type": "binary" + }, + { + "name": "field_varbinary", + "type": "varbinary" + }, + { + "name": "field_image", + "type": "image" + } + ], + "connection": [ + { + "jdbcUrl": [ + "jdbc:sybase:Tds:host:port/tempdb" + ], + "schema": "guest", + "table": [ + "test1" + ] + } + ], + "username": "**" + }, + "name": "sybasereader" + }, + "writer": { + "parameter": { + "print": true + }, + "name": "streamwriter" + } + } + ], + "setting": { + "restore": { + "isRestore": false, + "isStream": false + }, + "errorLimit": { + "record": 0 + }, + "speed": { + "bytes": -1048576, + "channel": 1 + } + } + } +} diff --git a/chunjun-local-test/pom.xml b/chunjun-local-test/pom.xml index 7589f5be16..1f202e9a9a 100644 --- a/chunjun-local-test/pom.xml +++ b/chunjun-local-test/pom.xml @@ -211,6 +211,11 @@ + + com.dtstack.chunjun + chunjun-connector-sybase + ${project.version} + diff --git "a/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/sybase/sybase-source.md" "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/sybase/sybase-source.md" new file mode 100644 index 0000000000..4e6e3246c6 --- /dev/null +++ "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/sybase/sybase-source.md" @@ -0,0 +1,395 @@ +# Sybase Source + +## 一、介绍 + +支持从 sybase离线读取,支持 sybase实时间隔轮询读取 + +## 二、支持版本 + +sybase 15.7 + +## 三、插件名称 + +| 插件模式 | 插件名称 | +| -------- | ------------ | +| sync | sybasereader | + +## 四、参数说明 + +### 1、Sync + +- **connection** + + - 描述:数据库连接参数,包含 jdbcUrl、schema、table 等参数 + - 必选:是 + - 参数类型:List + - 默认值:无 + ```json + "connection": [{ + "jdbcUrl": ["jdbc:sybase:Tds:hostname:port/database"], + "table": ["table"], + "schema":"public" + }] + ``` +
+ +- **jdbcUrl** + + - 描述:针对关系型数据库的 jdbc 连接字符串,jdbcUrl。 + - 必选:是 + - 参数类型:string + - 默认值:无 +
+ +- **schema** + + - 描述:数据库 schema 名 + - 必选:否 + - 参数类型:string + - 默认值:无 +
+ +- **table** + + - 描述:目的表的表名称。目前只支持配置单个表,后续会支持多表 + - 必选:是 + - 参数类型:List + - 默认值:无 +
+ +- **username** + + - 描述:数据源的用户名 + - 必选:是 + - 参数类型:String + - 默认值:无 +
+ +- **password** + + - 描述:数据源指定用户名的密码 + - 必选:是 + - 参数类型:String + - 默认值:无 +
+ +- **fetchSize** + + - 描述:一次性从数据库中读取多少条数据,Sybase 默认一次将所有结果都读取到内存中,在数据量很大时可能会造成 OOM,设置这个参数可以控制每次读取 fetchSize 条数据,而不是默认的把所有数据一次读取出来;开启 fetchSize 需要满足:连接参数 useCursorFetch=true。 + 注意:此参数的值不可设置过大,否则会读取超时,导致任务失败。 + - 必选:否 + - 参数类型:int + - 默认值:1024 +
+ +- **where** + + - 描述:筛选条件,reader 插件根据指定的 column、table、where 条件拼接 SQL,并根据这个 SQL 进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将 where 条件指定为 gmt_create > time。 + - 注意:不可以将 where 条件指定为 limit 10,limit 不是 SQL 的合法 where 子句。 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **splitPk** + + - 描述:当 speed 配置中的 channel 大于 1 时指定此参数,Reader 插件根据并发数和此参数指定的字段拼接 sql,使每个并发读取不同的数据,提升读取速率。 + - 注意: + - 推荐 splitPk 使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。 + - 目前 splitPk 仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,chunjun 将报错。 + - 如果 channel 大于 1 但是没有配置此参数,任务将置为失败。 + - 仅支持数值型 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **splitStrategy** + + - 描述:分片策略,当speed 配置中的 channel 大于 1 时此参数才生效 + - 所有选项: + - range + - 分片时获取splitPk在表中的最大值和最小值之差,尽可能均匀地分配给各个分片 + - splitPk=id,最大值=1,最小值=7,channel=3 + - channel-0:id >= 1 and id < 3 + - channel-1: id >= 3 and id < 5 + - channel-2: id >= 5 + - mod + - 分片时根据分片数量对splitPk做mod操作 + - splitPk=id,chaeenl=3 + - channel-0:id mod 3 = 0 + - channel-1: id mod 3 = 1 + - channel-2: id mod 3 = 2 + - 注意: + - 目前增量模式下仅支持mod + - 必选:否 + - 参数类型:String + - 默认值: + - 增量模式:mode + - 其他: range +
+ +- **queryTimeOut** + + - 描述:查询超时时间,单位秒。 + - 注意:当数据量很大,或者从视图查询,或者自定义 sql 查询时,可通过此参数指定超时时间。 + - 必选:否 + - 参数类型:int + - 默认值:1000 +
+ +- **customSql** + + - 描述:自定义的查询语句,如果只指定字段不能满足需求时,可通过此参数指定查询的 sql,可以是任意复杂的查询语句。 + - 注意: + - 只能是查询语句,否则会导致任务失败; + - 查询语句返回的字段需要和 column 列表里的字段对应; + - 当指定了此参数时,connection 里指定的 table 无效; + - 当指定此参数时,column 必须指定具体字段信息,不能以\*号代替; + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **column** + + - 描述:需要读取的字段。 + - 格式:支持 3 种格式 +
1. 读取全部字段,如果字段数量很多,可以使用下面的写法: + ```bash + "column":["*"] + ``` + 2. 只指定字段名称: + ``` + "column":["id","name"] + ``` + 3. 指定具体信息: + + ```JSON + "column": { + "name": "col", + "type": "datetime", + "format": "yyyy-MM-dd hh:mm:ss", + "value": "value" + } + ``` + + - 属性说明: + - name:字段名称 + - type:字段类型,可以和数据库里的字段类型不一样,程序会做一次类型转换 + - format:如果字段是时间字符串,可以指定时间的格式,将字段类型转为日期格式返回 + - value:如果数据库里不存在指定的字段,则会把 value 的值作为常量列返回,如果指定的字段存在,当指定字段的值为 null 时,会以此 value 值作为默认值返回 + - 必选:是 + - 默认值:无 +
+ +- **polling** + + - 描述:是否开启间隔轮询,开启后会根据 pollingInterval 轮询间隔时间周期性的从数据库拉取数据。开启间隔轮询还需配置参数 pollingInterval,increColumn,可以选择配置参数 startLocation。若不配置参数 startLocation,任务启动时将会从数据库中查询增量字段最大值作为轮询的起始位置。 + - 必选:否 + - 参数类型:Boolean + - 默认值:false +
+ +- **pollingInterval** + + - 描述:轮询间隔时间,从数据库中拉取数据的间隔时间,默认为 5000 毫秒。 + - 必选:否 + - 参数类型:long + - 默认值:5000 +
+ +- **increColumn** + + - 描述:增量字段,可以是对应的增量字段名,也可以是纯数字,表示增量字段在 column 中的顺序位置(从 0 开始) + - 必选:否 + - 参数类型:String 或 int + - 默认值:无 +
+ +- **orderByColumn** + + - 描述:排序字段,用于拼接sql语句中的order by语句 + - 必选:否 + - 参数类型:String + - 注意:在增量模式中不生效,增量模式始终使用increColumn做order by + - 默认值:无 +
+ +- **startLocation** + + - 描述:增量查询起始位置 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **useMaxFunc** + + - 描述:用于标记是否保存 endLocation 位置的一条或多条数据,true:不保存,false(默认):保存, 某些情况下可能出现最后几条数据被重复记录的情况,可以将此参数配置为 true + - 必选:否 + - 参数类型:Boolean + - 默认值:false +
+ +- **requestAccumulatorInterval** + - 描述:发送查询累加器请求的间隔时间。 + - 必选:否 + - 参数类型:int + - 默认值:2 +
+ +### 2、SQL + +- **connector** + + - 描述:sybase-x + - 必选:是 + - 参数类型:String + - 默认值:无 +
+ +- **url** + + - 描述:jdbc:sybase:Tds:hostname:port/database + - 必选:是 + - 参数类型:String + - 默认值:无 +
+ +- **schema** + + - 描述:数据库 schema 名 + - 必选:否 + - 参数类型:string + - 默认值:无 +
+ +- **table-name** + + - 描述:表名 + - 必选:是 + - 参数类型:String + - 默认值:无: +
+ +- **username** + + - 描述:username + - 必选:是 + - 参数类型:String + - 默认值:无 +
+ +- **password** + + - 描述:password + - 必选:是 + - 参数类型:String + - 默认值:无 +
+ +- **scan.polling-interval** + + - 描述:间隔轮训时间。非必填(不填为离线任务),无默认 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **scan.parallelism** + + - 描述:并行度 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **scan.fetch-size** + + - 描述:每次从数据库中 fetch 大小,单位:条。 + - 必选:否 + - 参数类型:String + - 默认值:1024 +
+ +- **scan.query-timeout** + + - 描述:数据库连接超时时间,单位:秒。 + - 必选:否 + - 参数类型:String + - 默认值:1 +
+ +- **scan.partition.column** + + - 描述:多并行度读取的切分字段 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **scan.partition.strategy** + + - 描述:数据分片策略 + - 必选:否 + - 参数类型:String + - 默认值:range +
+ +- **scan.increment.column** + + - 描述:增量字段名称 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **scan.increment.column-type** + - 描述:增量字段类型 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **scan.order-by.column** + - 描述:排序字段,用于拼接sql语句中的order by语句 + - 必选:否 + - 参数类型:String + - 注意:在增量模式中不生效,增量模式始终使用increColumn做order by + - 默认值:无 +
+ +- **scan.start-location** + + - 描述:增量字段开始位置,如果不指定则先同步所有,然后在增量 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **scan.restore.columnname** + + - 描述:开启了 cp,任务从 sp/cp 续跑字段名称。如果续跑,则会覆盖 scan.start-location 开始位置,从续跑点开始 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **scan.restore.columntype** + - 描述:开启了 cp,任务从 sp/cp 续跑字段类型 + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +## 五、数据类型 + +| 是否支持 | 类型名称 | +| :------: | :----------------------------------------------------------: | +| 支持 | INT、BIT、TINYINT、SMALLINT、UNSIGNED SMALLINT、INT、UNSIGNED INT、BIGINT、UNSIGNED BIGINT、DECIMAL、NUMERIC、FLOAT、DOUBLE、REAL、SMALLMONEY、MONEY、DATE、TIME、BIGTIME、SMALLDATETIME、DATETIME、BIGDATETIME、CHAR、UNICHAR、VARCHAR、UNIVARCHAR、TEXT、UNITEXT、BINARY、VARBINARY、IMAGE、NUMERIC IDENTITY | +| 不支持 | ARRAY、MAP、STRUCT、UNION 等 | + +## 六、脚本示例 + +见项目内`chunjun-examples`文件夹。 From 1c75792ef05f50afe43d72a2e1f37db2dd68371d Mon Sep 17 00:00:00 2001 From: otxiyin Date: Tue, 13 Sep 2022 14:15:05 +0800 Subject: [PATCH 2/2] [feat-#1244][sybase] change sybase jdbc driver --- .../chunjun-connector-sybase/pom.xml | 6 +- .../converter/SybaseColumnConverter.java | 11 +- .../sybase/dialect/SybaseDialect.java | 4 +- .../com/dtstack/chunjun/util/TelnetUtil.java | 7 - .../json/sybase/sybase_stream.json | 146 +----------------- .../sybase/sybase-source.md" | 4 +- 6 files changed, 15 insertions(+), 163 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-sybase/pom.xml b/chunjun-connectors/chunjun-connector-sybase/pom.xml index c91adcfcb0..e049324709 100644 --- a/chunjun-connectors/chunjun-connector-sybase/pom.xml +++ b/chunjun-connectors/chunjun-connector-sybase/pom.xml @@ -19,9 +19,9 @@ ${project.version}
- jdbc.sybase - jconnect - 7.7 + net.sourceforge.jtds + jtds + 1.3.1 diff --git a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseColumnConverter.java b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseColumnConverter.java index b0ae684503..91c324e267 100644 --- a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/converter/SybaseColumnConverter.java @@ -35,8 +35,6 @@ import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.YearMonthIntervalType; -import com.sybase.jdbc4.tds.SybTimestamp; - import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Date; @@ -112,14 +110,7 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { case DATE: return val -> new SqlDateColumn((Date) val); case TIME_WITHOUT_TIME_ZONE: - return val -> { - if (val instanceof SybTimestamp) { - Time time = - Time.valueOf(((SybTimestamp) val).toLocalDateTime().toLocalTime()); - return new TimeColumn(time); - } - return new TimeColumn((Time) val); - }; + return val -> new TimeColumn((Time) val); case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: return (IDeserializationConverter) diff --git a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/dialect/SybaseDialect.java b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/dialect/SybaseDialect.java index 21d11c6e7f..0528e00953 100644 --- a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/dialect/SybaseDialect.java +++ b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/dialect/SybaseDialect.java @@ -38,7 +38,7 @@ /** @Author OT @Date 2022/6/16 13:54 @Version 1.0 */ public class SybaseDialect implements JdbcDialect { private static final String DIALECT_NAME = "Sybase"; - private static final String DRIVER_NAME = "com.sybase.jdbc4.jdbc.SybDriver"; + private static final String DRIVER_NAME = "net.sourceforge.jtds.jdbc.Driver"; @Override public String dialectName() { @@ -47,7 +47,7 @@ public String dialectName() { @Override public boolean canHandle(String url) { - return url.startsWith("jdbc:sybase:Tds:"); + return url.startsWith("jdbc:jtds:sybase:"); } @Override diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/util/TelnetUtil.java b/chunjun-core/src/main/java/com/dtstack/chunjun/util/TelnetUtil.java index 2a87cc951c..9c6b2a6656 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/util/TelnetUtil.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/util/TelnetUtil.java @@ -36,13 +36,8 @@ public class TelnetUtil { private static final Pattern JDBC_PATTERN = Pattern.compile("(?[^:@/]+):(?\\d+).*"); public static final String PHOENIX_PREFIX = "jdbc:phoenix"; - - private static final String SYBASE_PREFIX = "jdbc:sybase:Tds"; private static final Pattern PHOENIX_PATTERN = Pattern.compile("jdbc:phoenix:(?\\S+):(?\\d+).*"); - - private static final Pattern SYBASE_PATTERN = - Pattern.compile("jdbc:sybase:Tds:(?\\S+):(?\\d+).*"); private static final String HOST_KEY = "host"; private static final String PORT_KEY = "port"; private static final String SPLIT_KEY = ","; @@ -90,8 +85,6 @@ public static void telnet(String url) { Matcher matcher = null; if (StringUtils.startsWith(url, PHOENIX_PREFIX)) { matcher = PHOENIX_PATTERN.matcher(url); - } else if (StringUtils.startsWith(url, SYBASE_PREFIX)) { - matcher = SYBASE_PATTERN.matcher(url); } else { matcher = JDBC_PATTERN.matcher(url); } diff --git a/chunjun-examples/json/sybase/sybase_stream.json b/chunjun-examples/json/sybase/sybase_stream.json index 57eacf5894..a4eda6633d 100644 --- a/chunjun-examples/json/sybase/sybase_stream.json +++ b/chunjun-examples/json/sybase/sybase_stream.json @@ -4,161 +4,29 @@ { "reader": { "parameter": { - "password": "********", + "password": "tester", "column": [ { - "name": "field_id", + "name": "i", "type": "int" }, { - "name": "field_bit", - "type": "bit" - }, - { - "name": "field_tinyint", - "type": "tinyint" - }, - { - "name": "field_smallint", - "type": "smallint" - }, - { - "name": "field_unsigned_smallint", - "type": "unsigned smallint" - }, - { - "name": "field_int", - "type": "int" - }, - { - "name": "field_unsigned_int", - "type": "unsigned int" - }, - { - "name": "field_bigint", - "type": "bigint" - }, - { - "name": "field_unsigned_bigint", - "type": "unsigned bigint" - }, - { - "name": "field_decimal", - "type": "decimal" - }, - { - "name": "field_numeric", - "type": "numeric" - }, - { - "name": "field_float", - "type": "float" - }, - { - "name": "field_double_precision", - "type": "double" - }, - { - "name": "field_real", - "type": "real" - }, - { - "name": "field_smallmoney", - "type": "smallmoney" - }, - { - "name": "field_money", - "type": "money" - }, - { - "name": "field_date", - "type": "date" - }, - { - "name": "field_time", - "type": "time" - }, - { - "name": "field_bigtime", - "type": "bigtime" - }, - { - "name": "field_smalldatetime", - "type": "smalldatetime" - }, - { - "name": "field_datetime", - "type": "datetime" - }, - { - "name": "field_bigdatetime", - "type": "bigdatetime" - }, - { - "name": "field_timestamp", - "type": "timestamp" - }, - { - "name": "field_char", - "type": "char" - }, - { - "name": "field_nchar", - "type": "nchar" - }, - { - "name": "field_unichar", - "type": "unichar" - }, - { - "name": "field_varchar", + "name": "n", "type": "varchar" - }, - { - "name": "field_nvarchar", - "type": "nvarchar" - }, - { - "name": "field_univarchar", - "type": "univarchar" - }, - { - "name": "field_text", - "type": "text" - }, - { - "name": "field_unitext", - "type": "unitext" - }, - { - "name": "field_longsysname", - "type": "longsysname" - }, - { - "name": "field_binary", - "type": "binary" - }, - { - "name": "field_varbinary", - "type": "varbinary" - }, - { - "name": "field_image", - "type": "image" } ], "connection": [ { "jdbcUrl": [ - "jdbc:sybase:Tds:host:port/tempdb" + "jdbc:jtds:sybase://localhost/tester" ], - "schema": "guest", + "schema": "tester", "table": [ - "test1" + "tester" ] } ], - "username": "**" + "username": "tester" }, "name": "sybasereader" }, diff --git "a/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/sybase/sybase-source.md" "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/sybase/sybase-source.md" index 4e6e3246c6..d7541e87c3 100644 --- "a/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/sybase/sybase-source.md" +++ "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/sybase/sybase-source.md" @@ -26,7 +26,7 @@ sybase 15.7 - 默认值:无 ```json "connection": [{ - "jdbcUrl": ["jdbc:sybase:Tds:hostname:port/database"], + "jdbcUrl": ["jdbc:jtds:sybase://hostname:port/database"], "table": ["table"], "schema":"public" }] @@ -251,7 +251,7 @@ sybase 15.7 - **url** - - 描述:jdbc:sybase:Tds:hostname:port/database + - 描述:jdbc:jtds:sybase://hostname:port/database - 必选:是 - 参数类型:String - 默认值:无