diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java index ef3d5d302e..ccfb3d5f13 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java @@ -52,6 +52,9 @@ public class DorisConf extends JdbcConf { /** * default value is 3 */ private Integer maxRetries = 3; + /** retry load sleep timeout* */ + private long waitRetryMills = 18000; + /** 是否配置了NameMapping, true, RowData中将携带名称匹配后的数据库和表名, sink端配置的database和table失效* */ private boolean nameMapped; @@ -59,6 +62,14 @@ public class DorisConf extends JdbcConf { private Properties loadProperties; + public long getWaitRetryMills() { + return waitRetryMills; + } + + public void setWaitRetryMills(long waitRetryMills) { + this.waitRetryMills = waitRetryMills; + } + public String getDatabase() { return database; } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java index b6730fcfe3..f707522971 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java @@ -88,6 +88,16 @@ public DorisConfBuilder setFlushIntervalMills(long flushIntervalMills) { return this; } + public DorisConfBuilder setMaxRetries(int maxRetries) { + this.dorisConf.setMaxRetries(maxRetries); + return this; + } + + public DorisConfBuilder setWaitRetryMills(long waitRetryMills) { + this.dorisConf.setWaitRetryMills(waitRetryMills); + return this; + } + public DorisConf build() { StringJoiner errorMessage = new StringJoiner("\n"); diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java index ef0620ab91..9a1742d4d6 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java @@ -36,6 +36,8 @@ public final class DorisKeys { public static final String FLUSH_INTERNAL_MS_KEY = "flushIntervalMills"; + public static final String WAITRETRIES_MS_KEY = "waitRetryMills"; + public static final String MAX_RETRIES_KEY = "maxRetries"; public static final String DATABASE_KEY = "database"; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java index 8601a731cd..e2843bb754 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java @@ -21,6 +21,7 @@ package com.dtstack.chunjun.connector.doris.rest; import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.doris.DorisUtil; import com.dtstack.chunjun.connector.doris.options.DorisConf; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.element.ColumnRowData; @@ -36,7 +37,6 @@ import javax.annotation.Nonnull; -import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.HashMap; @@ -63,8 +63,6 @@ public class DorisLoadClient implements Serializable { private final Set metaHeader = Stream.of("schema", "table", "type", "opTime", "ts", "scn") .collect(Collectors.toCollection(HashSet::new)); - - private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load?"; private static final String KEY_SCHEMA = "schema"; private static final String KEY_TABLE = "table"; private static final String KEY_POINT = "."; @@ -73,16 +71,13 @@ public class DorisLoadClient implements Serializable { private final DorisStreamLoad dorisStreamLoad; private final boolean nameMapped; - private String hostPort; private final DorisConf conf; - public DorisLoadClient(DorisStreamLoad dorisStreamLoad, DorisConf conf, String hostPort) { + public DorisLoadClient(DorisStreamLoad dorisStreamLoad, DorisConf conf) { this.dorisStreamLoad = dorisStreamLoad; - this.hostPort = hostPort; this.conf = conf; this.nameMapped = conf.isNameMapped(); } - /** * Each time a RowData is processed, a Carrier is obtained and then returned. * @@ -218,13 +213,15 @@ private void processWithGenericRowData( * @param carrier data carrier * @throws WriteRecordException */ - public void flush(Carrier carrier) throws WriteRecordException { + public void flush(final Carrier carrier) throws WriteRecordException { try { - dorisStreamLoad.load( + DorisUtil.doRetry( + dorisStreamLoad::load, + dorisStreamLoad::replaceBackend, carrier, - String.format( - LOAD_URL_PATTERN, hostPort, carrier.getDatabase(), carrier.getTable())); - } catch (IOException e) { + conf.getMaxRetries(), + conf.getWaitRetryMills()); + } catch (Exception e) { String errorMessage = "write record failed."; throw new WriteRecordException(errorMessage, e, -1, carrier.toString()); } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java index 3da2174dfd..60514db37b 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java @@ -63,10 +63,14 @@ public class DorisStreamLoad implements Serializable { private static final ObjectMapper OM = new ObjectMapper(); private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout")); - private final String authEncoding; - private final Properties streamLoadProp; + private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load?"; + private String authEncoding; + private Properties streamLoadProp; + private String hostPort; + private DorisConf options; public DorisStreamLoad(DorisConf options) { + this.options = options; this.authEncoding = Base64.getEncoder() .encodeToString( @@ -75,6 +79,14 @@ public DorisStreamLoad(DorisConf options) { this.streamLoadProp = options.getLoadProperties(); } + public void setHostPort(String hostPort) { + this.hostPort = hostPort; + } + + public void setOptions(DorisConf options) { + this.options = options; + } + /** * Generate Http Put request. * @@ -138,15 +150,33 @@ public String toString() { } } + public void replaceBackend() throws IOException { + String backend = getBackend(); + this.setHostPort(backend); + LOG.info("replace backend node to {}", backend); + } + + private String getBackend() throws IOException { + try { + // get be url from fe + return FeRestService.randomBackend(options); + } catch (IOException e) { + LOG.error("get backends info fail"); + throw new IOException(e); + } + } + /** * Doris load data via stream. * * @param carrier data carrier. - * @param loadUrlStr doris load url. * @throws IOException io exception. */ - public void load(Carrier carrier, String loadUrlStr) throws IOException { + public void load(Carrier carrier) throws IOException { List columnNames = carrier.getColumns(); + String loadUrlStr = + String.format( + LOAD_URL_PATTERN, hostPort, carrier.getDatabase(), carrier.getTable()); String json = OM.writeValueAsString(carrier.getInsertContent()); String mergeConditions = carrier.getDeleteContent(); LoadResponse loadResponse = loadBatch(columnNames, json, mergeConditions, loadUrlStr); @@ -196,7 +226,7 @@ private String generateLabel() { String formatDate = sdf.format(new Date()); label = String.format( - "flinkx_connector_%s_%s", + "chunjun_connector_%s_%s", formatDate, UUID.randomUUID().toString().replaceAll("-", "")); } return label; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java index 4db296e2f5..e79bf3a81b 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java @@ -70,7 +70,8 @@ private String getBackend() throws IOException { @Override public void open(int taskNumber, int numTasks) throws IOException { DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(options); - client = new DorisLoadClient(dorisStreamLoad, options, getBackend()); + dorisStreamLoad.replaceBackend(); + client = new DorisLoadClient(dorisStreamLoad, options); super.open(taskNumber, numTasks); } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java index 5942109d09..4ae72ba7a0 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java @@ -53,6 +53,7 @@ import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FLUSH_INTERNAL_MS_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_OPTIONS_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_PROPERTIES_KEY; +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.MAX_RETRIES_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.PASSWORD_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_BATCH_SIZE_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_CONNECT_TIMEOUT_MS_KEY; @@ -62,6 +63,7 @@ import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_TABLET_SIZE_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.TABLE_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.USER_NAME_KEY; +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.WAITRETRIES_MS_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.WRITE_MODE_KEY; /** @@ -145,6 +147,8 @@ public DorisSinkFactory(SyncConf syncConf) { .setUsername(parameter.getStringVal(USER_NAME_KEY)) .setBatchSize(parameter.getIntVal(BATCH_SIZE_KEY, 1000)) .setFlushIntervalMills(parameter.getLongVal(FLUSH_INTERNAL_MS_KEY, 10000L)) + .setMaxRetries(parameter.getIntVal(MAX_RETRIES_KEY, 1)) + .setWaitRetryMills(parameter.getLongVal(WAITRETRIES_MS_KEY, 18000L)) .build(); options.setColumn(syncConf.getWriter().getFieldList()); super.initCommonConf(options);