diff --git a/chunjun-connectors/chunjun-connector-starrocks/pom.xml b/chunjun-connectors/chunjun-connector-starrocks/pom.xml
index ee2e5544e3..fc695fb905 100644
--- a/chunjun-connectors/chunjun-connector-starrocks/pom.xml
+++ b/chunjun-connectors/chunjun-connector-starrocks/pom.xml
@@ -12,56 +12,42 @@
chunjun-connector-starrocks
ChunJun : Connectors : StarRocks
+
+
+ 8
+ 8
+ 5.0.0
+
+
+
+ mysql
+ mysql-connector-java
+ 5.1.49
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.79
+
+
com.starrocks
- flink-connector-starrocks
- 1.1.13_flink-1.12
- provided
-
-
- org.apache.flink
- flink-table-common
-
-
- org.apache.flink
- flink-table-api
-
-
- org.apache.flink
- flink-table-api-java
-
-
- flink-clients_2.11
- org.apache.flink
-
-
- flink-table-api-java-bridge_2.11
- org.apache.flink
-
-
- flink-table-api-scala-bridge_2.11
- org.apache.flink
-
-
- flink-table-api-scala_2.11
- org.apache.flink
-
-
- flink-table-planner-blink_2.11
- org.apache.flink
-
-
- flink-table-planner_2.11
- org.apache.flink
-
-
+ starrocks-thrift-sdk
+ 1.0.1
+
+
+
+ org.apache.arrow
+ arrow-vector
+ ${arrow.version}
+
- com.dtstack.chunjun
- chunjun-connector-mysql
- ${project.version}
- provided
+ org.apache.arrow
+ arrow-memory-netty
+ ${arrow.version}
@@ -96,6 +82,20 @@
+
+
+ io.netty
+ com.dtstack.chunjun.connector.starrocks.shaded.io.netty
+
+
+ org.apache.arrow
+ com.dtstack.chunjun.connector.starrocks.shaded.org.apache.arrow
+
+
+ com.google.flatbuffers
+ com.dtstack.chunjun.connector.starrocks.shaded.com.google.flatbuffers
+
+
diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/conf/LoadConf.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/conf/LoadConf.java
new file mode 100644
index 0000000000..f29cbdc371
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/conf/LoadConf.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.starrocks.conf;
+
+import com.dtstack.chunjun.connector.starrocks.options.ConstantValue;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/** @author liuliu 2022/7/12 */
+public class LoadConf implements Serializable {
+
+ private static final Long serialVersionUID = 1L;
+
+ /** Timeout duration of the HTTP connection when checking the connectivity with StarRocks */
+ private Integer httpCheckTimeoutMs = ConstantValue.HTTP_CHECK_TIMEOUT_DEFAULT;
+
+ private Integer queueOfferTimeoutMs = ConstantValue.QUEUE_OFFER_TIMEOUT_DEFAULT;
+
+ private Integer queuePollTimeoutMs = ConstantValue.QUEUE_POLL_TIMEOUT_DEFAULT;
+
+ private Long batchMaxSize = ConstantValue.SINK_BATCH_MAX_BYTES_DEFAULT;
+
+ private Long batchMaxRows = ConstantValue.SINK_BATCH_MAX_ROWS_DEFAULT;
+
+ private Map headProperties = new HashMap<>();
+
+ public Integer getHttpCheckTimeoutMs() {
+ return httpCheckTimeoutMs;
+ }
+
+ public void setHttpCheckTimeoutMs(Integer httpCheckTimeoutMs) {
+ this.httpCheckTimeoutMs = httpCheckTimeoutMs;
+ }
+
+ public Integer getQueueOfferTimeoutMs() {
+ return queueOfferTimeoutMs;
+ }
+
+ public void setQueueOfferTimeoutMs(Integer queueOfferTimeoutMs) {
+ this.queueOfferTimeoutMs = queueOfferTimeoutMs;
+ }
+
+ public Integer getQueuePollTimeoutMs() {
+ return queuePollTimeoutMs;
+ }
+
+ public void setQueuePollTimeoutMs(Integer queuePollTimeoutMs) {
+ this.queuePollTimeoutMs = queuePollTimeoutMs;
+ }
+
+ public Long getBatchMaxSize() {
+ return batchMaxSize;
+ }
+
+ public void setBatchMaxSize(Long batchMaxSize) {
+ this.batchMaxSize = batchMaxSize;
+ }
+
+ public Long getBatchMaxRows() {
+ return batchMaxRows;
+ }
+
+ public void setBatchMaxRows(Long batchMaxRows) {
+ this.batchMaxRows = batchMaxRows;
+ }
+
+ public Map getHeadProperties() {
+ return headProperties;
+ }
+
+ public void setHeadProperties(Map headProperties) {
+ this.headProperties = headProperties;
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/conf/LoadConfBuilder.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/conf/LoadConfBuilder.java
new file mode 100644
index 0000000000..be3ac6d491
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/conf/LoadConfBuilder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.starrocks.conf;
+
+import java.util.Map;
+
+/** @author liuliu 2022/7/12 */
+public class LoadConfBuilder {
+ private final LoadConf loadConf;
+
+ public LoadConfBuilder() {
+ this.loadConf = new LoadConf();
+ }
+
+ public LoadConf build() {
+ return this.loadConf;
+ }
+
+ public LoadConfBuilder setBatchMaxSize(Long batchMaxSize) {
+ loadConf.setBatchMaxSize(batchMaxSize);
+ return this;
+ }
+
+ public LoadConfBuilder setBatchMaxRows(Long batMaxRows) {
+ loadConf.setBatchMaxRows(batMaxRows);
+ return this;
+ }
+
+ public LoadConfBuilder setHttpCheckTimeoutMs(int httpCheckTimeout) {
+ loadConf.setHttpCheckTimeoutMs(httpCheckTimeout);
+ return this;
+ }
+
+ public LoadConfBuilder setQueueOfferTimeoutMs(int queueOfferTimeoutMs) {
+ loadConf.setQueueOfferTimeoutMs(queueOfferTimeoutMs);
+ return this;
+ }
+
+ public LoadConfBuilder setQueuePollTimeoutMs(int queuePollTimeoutMs) {
+ loadConf.setQueuePollTimeoutMs(queuePollTimeoutMs);
+ return this;
+ }
+
+ public LoadConfBuilder setHeadProperties(Map loadProperties) {
+ loadConf.setHeadProperties(loadProperties);
+ return this;
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/conf/StarRocksConf.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/conf/StarRocksConf.java
index 65da278f91..b305dacfd1 100644
--- a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/conf/StarRocksConf.java
+++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/conf/StarRocksConf.java
@@ -17,40 +17,222 @@
*/
package com.dtstack.chunjun.connector.starrocks.conf;
-import com.dtstack.chunjun.connector.jdbc.conf.JdbcConf;
+import com.dtstack.chunjun.conf.ChunJunCommonConf;
-import java.util.ArrayList;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* @author lihongwei
* @date 2022/04/11
*/
-public class StarRocksConf extends JdbcConf {
+public class StarRocksConf extends ChunJunCommonConf {
+
+ // common
+ private String url;
+
+ private List feNodes;
+
+ private String database;
+
+ private String table;
+
+ private String username;
+
+ private String password;
+
+ private String writeMode;
+ /** * default value is 3 */
+ private Integer maxRetries = 3;
+
+ // sink
+ /** The time to sleep when the tablet version is too large */
+ private long waitRetryMills = 18000;
+
+ /** 是否配置了NameMapping, true, RowData中将携带名称匹配后的数据库和表名, sink端配置的database和table失效* */
+ private boolean nameMapped;
+
+ private LoadConf loadConf = new LoadConf();
+
+ // source
+ private String[] fieldNames;
- /** fe_ip:http_port 多个地址分号连接 */
- private String loadUrl;
- /** 主键模型表需要传入主键列表 */
- private List primaryKey = new ArrayList<>();
+ private DataType[] dataTypes;
+
+ private String filterStatement;
+
+ private int beClientKeepLiveMin = 10;
+
+ private int beQueryTimeoutSecond = 600;
+
+ private int beClientTimeout = 3000;
+
+ private int beFetchRows = 1024;
+
+ private long beFetchMaxBytes = 1024 * 1024 * 1024;
+
+ private Map beSocketProperties = new HashMap<>();
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public List getFeNodes() {
+ return feNodes;
+ }
+
+ public void setFeNodes(List feNodes) {
+ this.feNodes = feNodes;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getWriteMode() {
+ return writeMode;
+ }
+
+ public void setWriteMode(String writeMode) {
+ this.writeMode = writeMode;
+ }
+
+ public Integer getMaxRetries() {
+ return maxRetries;
+ }
+
+ public void setMaxRetries(Integer maxRetries) {
+ this.maxRetries = maxRetries;
+ }
+
+ public long getWaitRetryMills() {
+ return waitRetryMills;
+ }
+
+ public void setWaitRetryMills(long waitRetryMills) {
+ this.waitRetryMills = waitRetryMills;
+ }
+
+ public boolean isNameMapped() {
+ return nameMapped;
+ }
+
+ public void setNameMapped(boolean nameMapped) {
+ this.nameMapped = nameMapped;
+ }
+
+ public LoadConf getLoadConf() {
+ return loadConf;
+ }
+
+ public void setLoadConf(LoadConf loadConf) {
+ this.loadConf = loadConf;
+ }
+
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ public void setFieldNames(String[] fieldNames) {
+ this.fieldNames = fieldNames;
+ }
+
+ public DataType[] getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(DataType[] dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public String getFilterStatement() {
+ return filterStatement;
+ }
+
+ public void setFilterStatement(String filterStatement) {
+ this.filterStatement = filterStatement;
+ }
+
+ public int getBeClientKeepLiveMin() {
+ return beClientKeepLiveMin;
+ }
+
+ public void setBeClientKeepLiveMin(int beClientKeepLiveMin) {
+ this.beClientKeepLiveMin = beClientKeepLiveMin;
+ }
+
+ public int getBeQueryTimeoutSecond() {
+ return beQueryTimeoutSecond;
+ }
+
+ public void setBeQueryTimeoutSecond(int beQueryTimeoutSecond) {
+ this.beQueryTimeoutSecond = beQueryTimeoutSecond;
+ }
+
+ public int getBeClientTimeout() {
+ return beClientTimeout;
+ }
+
+ public void setBeClientTimeout(int beClientTimeout) {
+ this.beClientTimeout = beClientTimeout;
+ }
+
+ public int getBeFetchRows() {
+ return beFetchRows;
+ }
- public String getLoadUrl() {
- return loadUrl;
+ public void setBeFetchRows(int beFetchRows) {
+ this.beFetchRows = beFetchRows;
}
- public void setLoadUrl(String loadUrl) {
- this.loadUrl = loadUrl;
+ public long getBeFetchMaxBytes() {
+ return beFetchMaxBytes;
}
- public List getPrimaryKey() {
- return primaryKey;
+ public void setBeFetchMaxBytes(long beFetchMaxBytes) {
+ this.beFetchMaxBytes = beFetchMaxBytes;
}
- public void setPrimaryKey(List primaryKey) {
- this.primaryKey = primaryKey;
+ public Map getBeSocketProperties() {
+ return beSocketProperties;
}
- @Override
- public String toString() {
- return "StarRocksConf{" + "loadUrl='" + loadUrl + '\'' + ", primaryKey=" + primaryKey + '}';
+ public void setBeSocketProperties(Map beSocketProperties) {
+ this.beSocketProperties = beSocketProperties;
}
}
diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/dialect/StarRocksDialect.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/connection/StarRocksJdbcConnectionIProvider.java
similarity index 65%
rename from chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/dialect/StarRocksDialect.java
rename to chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/connection/StarRocksJdbcConnectionIProvider.java
index 0a591f1a3f..70d44bc80d 100644
--- a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/dialect/StarRocksDialect.java
+++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/connection/StarRocksJdbcConnectionIProvider.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,18 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.dtstack.chunjun.connector.starrocks.dialect;
-import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;
+package com.dtstack.chunjun.connector.starrocks.connection;
-/**
- * @author lihongwei
- * @date 2022/04/11
- */
-public class StarRocksDialect extends MysqlDialect {
+import org.apache.flink.annotation.Internal;
+
+import java.sql.Connection;
+
+/** connection provider. */
+@Internal
+public interface StarRocksJdbcConnectionIProvider {
+
+ Connection getConnection() throws Exception;
+
+ Connection reestablishConnection() throws Exception;
- @Override
- public String dialectName() {
- return "StarRocks";
- }
+ void close();
}
diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/connection/StarRocksJdbcConnectionOptions.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/connection/StarRocksJdbcConnectionOptions.java
new file mode 100644
index 0000000000..d6829bbc7c
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/connection/StarRocksJdbcConnectionOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.starrocks.connection;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+import static com.dtstack.chunjun.connector.starrocks.options.ConstantValue.CJ_DRIVER_CLASS_NAME;
+import static com.dtstack.chunjun.connector.starrocks.options.ConstantValue.DRIVER_CLASS_NAME;
+
+/** JDBC connection options. */
+@PublicEvolving
+public class StarRocksJdbcConnectionOptions implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final String url;
+ @Nullable protected final String username;
+ @Nullable protected final String password;
+
+ public StarRocksJdbcConnectionOptions(String url, String username, String password) {
+ this.url = Preconditions.checkNotNull(url, "jdbc url is empty");
+ this.username = username;
+ this.password = password;
+ }
+
+ public String getDbURL() {
+ return url;
+ }
+
+ public String getCjDriverName() {
+ return CJ_DRIVER_CLASS_NAME;
+ }
+
+ public String getDriverName() {
+ return DRIVER_CLASS_NAME;
+ }
+
+ public Optional getUsername() {
+ return Optional.ofNullable(username);
+ }
+
+ public Optional getPassword() {
+ return Optional.ofNullable(password);
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/connection/StarRocksJdbcConnectionProvider.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/connection/StarRocksJdbcConnectionProvider.java
new file mode 100644
index 0000000000..2ee2021256
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/connection/StarRocksJdbcConnectionProvider.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.starrocks.connection;
+
+import com.dtstack.chunjun.util.ClassUtil;
+import com.dtstack.chunjun.util.RetryUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/** Simple JDBC connection provider. */
+public class StarRocksJdbcConnectionProvider
+ implements StarRocksJdbcConnectionIProvider, Serializable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StarRocksJdbcConnectionProvider.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private final StarRocksJdbcConnectionOptions jdbcOptions;
+
+ private transient volatile Connection connection;
+
+ public StarRocksJdbcConnectionProvider(StarRocksJdbcConnectionOptions jdbcOptions) {
+ this.jdbcOptions = jdbcOptions;
+ }
+
+ @Override
+ public Connection getConnection() throws ClassNotFoundException {
+ if (connection == null) {
+ synchronized (this) {
+ if (connection == null) {
+ try {
+ Class.forName(jdbcOptions.getCjDriverName());
+ } catch (ClassNotFoundException ex) {
+ Class.forName(jdbcOptions.getDriverName());
+ }
+ Properties prop = new Properties();
+ if (org.apache.commons.lang3.StringUtils.isNotBlank(
+ jdbcOptions.getUsername().orElse(null))) {
+ prop.put("user", jdbcOptions.getUsername().get());
+ }
+ if (org.apache.commons.lang3.StringUtils.isNotBlank(
+ jdbcOptions.getPassword().orElse(null))) {
+ prop.put("password", jdbcOptions.getPassword().get());
+ }
+ synchronized (ClassUtil.LOCK_STR) {
+ connection =
+ RetryUtil.executeWithRetry(
+ () ->
+ DriverManager.getConnection(
+ jdbcOptions.getDbURL(), prop),
+ 3,
+ 2000,
+ false);
+ }
+ }
+ }
+ }
+ return connection;
+ }
+
+ public void checkValid() throws SQLException, ClassNotFoundException {
+ if (connection == null || !connection.isValid(10)) {
+ connection = null;
+ getConnection();
+ }
+ }
+
+ @Override
+ public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
+ close();
+ connection = getConnection();
+ return connection;
+ }
+
+ @Override
+ public void close() {
+ if (connection == null) {
+ return;
+ }
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.error("JDBC connection close failed.", e);
+ } finally {
+ connection = null;
+ }
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/converter/StarRocksColumnConverter.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/converter/StarRocksColumnConverter.java
new file mode 100644
index 0000000000..c16c2eae61
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/converter/StarRocksColumnConverter.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.starrocks.converter;
+
+import com.dtstack.chunjun.conf.ChunJunCommonConf;
+import com.dtstack.chunjun.conf.FieldConf;
+import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksSinkOP;
+import com.dtstack.chunjun.converter.AbstractRowConverter;
+import com.dtstack.chunjun.converter.IDeserializationConverter;
+import com.dtstack.chunjun.converter.ISerializationConverter;
+import com.dtstack.chunjun.element.AbstractBaseColumn;
+import com.dtstack.chunjun.element.ColumnRowData;
+import com.dtstack.chunjun.element.column.BigDecimalColumn;
+import com.dtstack.chunjun.element.column.BooleanColumn;
+import com.dtstack.chunjun.element.column.ByteColumn;
+import com.dtstack.chunjun.element.column.SqlDateColumn;
+import com.dtstack.chunjun.element.column.StringColumn;
+import com.dtstack.chunjun.element.column.TimestampColumn;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static com.dtstack.chunjun.connector.starrocks.util.StarRocksUtil.addStrForNum;
+
+/** @author liuliu 2022/7/12 */
+public class StarRocksColumnConverter
+ extends AbstractRowConverter