Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
*/
public class DorisConf extends ChunJunCommonConf {

private String fieldDelimiter;

private String lineDelimiter;

private String database;

private String table;
Expand All @@ -58,22 +54,6 @@ public class DorisConf extends ChunJunCommonConf {

private Properties loadProperties;

public String getFieldDelimiter() {
return fieldDelimiter;
}

public void setFieldDelimiter(String fieldDelimiter) {
this.fieldDelimiter = fieldDelimiter;
}

public String getLineDelimiter() {
return lineDelimiter;
}

public void setLineDelimiter(String lineDelimiter) {
this.lineDelimiter = lineDelimiter;
}

public String getDatabase() {
return database;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,6 @@ public DorisConfBuilder setFeNodes(List<String> feNodes) {
return this;
}

public DorisConfBuilder setFieldDelimiter(String fieldDelimiter) {
this.dorisConf.setFieldDelimiter(fieldDelimiter);
return this;
}

public DorisConfBuilder setLineDelimiter(String lineDelimiter) {
this.dorisConf.setLineDelimiter(lineDelimiter);
return this;
}

public DorisConfBuilder setUsername(String username) {
this.dorisConf.setUsername(username);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.IntStream;

/**
* Company:www.dtstack.com.
Expand All @@ -17,18 +20,16 @@
*/
public class Carrier implements Serializable {
private static final long serialVersionUID = 1L;
private final StringJoiner insertContent;
private final List<Map<String, Object>> insertContent;
private final StringJoiner deleteContent;
private final String fieldDelimiter;
private int batch = 0;
private String database;
private String table;
private List<String> columns;
private final Set<Integer> rowDataIndexes = new HashSet<>();

public Carrier(String fieldDelimiter, String lineDelimiter) {
this.fieldDelimiter = fieldDelimiter;
insertContent = new StringJoiner(lineDelimiter);
public Carrier() {
insertContent = new ArrayList<>();
deleteContent = new StringJoiner(" OR ");
}

Expand All @@ -48,8 +49,8 @@ public void setTable(String table) {
this.table = table;
}

public String getInsertContent() {
return insertContent.toString();
public List<Map<String, Object>> getInsertContent() {
return insertContent;
}

public String getDeleteContent() {
Expand Down Expand Up @@ -78,14 +79,20 @@ public void addInsertContent(List<String> insertV) {
// It is certain that in this case, the size
// of insertV is twice the size of column
List<String> forward = insertV.subList(0, columns.size());
String forwardV = StringUtils.join(forward, fieldDelimiter);
final Map<String, Object> forwardV = new HashMap<>(columns.size());
IntStream.range(0, columns.size())
.forEach(i -> forwardV.put(columns.get(i), forward.get(i)));
insertContent.add(forwardV);
List<String> behind = insertV.subList(columns.size(), insertV.size());
String behindV = StringUtils.join(behind, fieldDelimiter);
final Map<String, Object> behindV = new HashMap<>(columns.size());
IntStream.range(0, columns.size())
.forEach(i -> behindV.put(columns.get(i), behind.get(i)));
insertContent.add(behindV);
} else {
String s = StringUtils.join(insertV, fieldDelimiter);
insertContent.add(s);
final Map<String, Object> values = new HashMap<>(columns.size());
IntStream.range(0, columns.size())
.forEach(i -> values.put(columns.get(i), insertV.get(i)));
insertContent.add(values);
}
}
}
Expand All @@ -110,7 +117,9 @@ private String buildMergeOnConditions(List<String> columns, List<String> values)
List<String> deleteOnStr = new ArrayList<>();
for (int i = 0, size = columns.size(); i < size; i++) {
String s =
columns.get(i)
"`"
+ columns.get(i)
+ "`"
+ "<=>"
+ "'"
+ ((values.get(i)) == null ? "" : values.get(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,13 @@ public class DorisLoadClient implements Serializable {
.collect(Collectors.toCollection(HashSet::new));

private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load?";
private static final String NULL_VALUE = "\\N";
private static final String KEY_SCHEMA = "schema";
private static final String KEY_TABLE = "table";
private static final String KEY_POINT = ".";
public static final String KEY_BEFORE = "before_";
public static final String KEY_AFTER = "after_";

private final DorisStreamLoad dorisStreamLoad;
private final String fieldDelimiter;
private final String lineDelimiter;
private final boolean nameMapped;
private String hostPort;
private final DorisConf conf;
Expand All @@ -80,8 +77,6 @@ public DorisLoadClient(DorisStreamLoad dorisStreamLoad, DorisConf conf, String h
this.hostPort = hostPort;
this.conf = conf;
this.nameMapped = conf.isNameMapped();
this.fieldDelimiter = conf.getFieldDelimiter();
this.lineDelimiter = conf.getLineDelimiter();
}

public void setHostPort(String hostPort) {
Expand Down Expand Up @@ -271,15 +266,15 @@ private void wrapValuesFromRowData(
if (column.equalsIgnoreCase(trueCol)) {
insertV.add(convert(value, i));
deleteV.add(convert(value, i));
break;
continue;
}
}
// case 2, need to insert.
if (headers[i].startsWith(KEY_AFTER)) {
String trueCol = headers[i].substring(6);
if (column.equalsIgnoreCase(trueCol)) {
insertV.add(convert(value, i));
break;
continue;
}
}
// case 3. column name is obvious.
Expand All @@ -288,7 +283,6 @@ private void wrapValuesFromRowData(
if (delete) {
deleteV.add(convert(value, i));
}
break;
}
}
}
Expand All @@ -301,7 +295,7 @@ private Carrier initCarrier(
List<String> deleteV,
String schema,
String table) {
Carrier carrier = new Carrier(fieldDelimiter, lineDelimiter);
Carrier carrier = new Carrier();
carrier.setColumns(columns);
carrier.setDatabase(schema);
carrier.setTable(table);
Expand Down Expand Up @@ -347,6 +341,6 @@ private List<String> getColumnName(List<FieldConf> fields) {

private String convert(@Nonnull ColumnRowData rowData, int index) {
Object value = rowData.getField(index);
return (value == null || "".equals(value.toString())) ? NULL_VALUE : value.toString();
return (value == null || "".equals(value.toString())) ? null : value.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import java.util.Arrays;
import java.util.Base64;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;

/**
* @author tiezhu@dtstack.com
Expand All @@ -63,8 +65,6 @@ public class DorisStreamLoad implements Serializable {
new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
private final String authEncoding;
private final Properties streamLoadProp;
private final String fieldDelimiter;
private final String lineDelimiter;

public DorisStreamLoad(DorisConf options) {
this.authEncoding =
Expand All @@ -73,8 +73,6 @@ public DorisStreamLoad(DorisConf options) {
String.format("%s:%s", options.getUsername(), options.getPassword())
.getBytes(StandardCharsets.UTF_8));
this.streamLoadProp = options.getLoadProperties();
this.fieldDelimiter = options.getFieldDelimiter();
this.lineDelimiter = options.getLineDelimiter();
}

/**
Expand All @@ -94,23 +92,34 @@ private HttpPut generatePut(
httpPut.setHeader("Expect", "100-continue");
httpPut.setHeader("Content-Type", "text/plain; charset=UTF-8");
httpPut.setHeader("label", label);
httpPut.setHeader("columns", StringUtils.join(columnNames, ","));
httpPut.setHeader("format", "json");
// if body is list type ,strip_outer_array should be true
httpPut.setHeader("strip_outer_array", "true");
List<String> columns =
columnNames.stream()
.map(this::quoteColumn)
.collect(Collectors.toCollection(LinkedList::new));
httpPut.setHeader("columns", StringUtils.join(columns, ","));
if (StringUtils.isNotBlank(mergeConditions)) {
httpPut.setHeader("merge_type", "MERGE");
httpPut.setHeader("delete", mergeConditions);
} else {
httpPut.setHeader("merge_type", "APPEND");
}
httpPut.setHeader("column_separator", fieldDelimiter);
if (!"\n".equals(lineDelimiter)) {
httpPut.setHeader("line_delimiter", lineDelimiter);
}
// httpPut.setHeader("column_separator", fieldDelimiter);
// if (!"\n".equals(lineDelimiter)) {
// httpPut.setHeader("line_delimiter", lineDelimiter);
// }
for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) {
httpPut.setHeader(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
}
return httpPut;
}

private String quoteColumn(String column) {
return "`" + column + "`";
}

public static class LoadResponse {
public int status;
public String respContent;
Expand Down Expand Up @@ -138,9 +147,9 @@ public String toString() {
*/
public void load(Carrier carrier, String loadUrlStr) throws IOException {
List<String> columnNames = carrier.getColumns();
String value = carrier.getInsertContent();
String json = OM.writeValueAsString(carrier.getInsertContent());
String mergeConditions = carrier.getDeleteContent();
LoadResponse loadResponse = loadBatch(columnNames, value, mergeConditions, loadUrlStr);
LoadResponse loadResponse = loadBatch(columnNames, json, mergeConditions, loadUrlStr);
LOG.debug("StreamLoad Response:{}", loadResponse);
if (loadResponse.status != 200) {
throw new ConnectException("stream load error, detail : " + loadResponse);
Expand Down Expand Up @@ -187,7 +196,7 @@ private String generateLabel() {
String formatDate = sdf.format(new Date());
label =
String.format(
"chunjun_connector_%s_%s",
"flinkx_connector_%s_%s",
formatDate, UUID.randomUUID().toString().replaceAll("-", ""));
}
return label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_WRITE_MODE_DEFAULT;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.EXEC_MEM_LIMIT_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FE_NODES_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FIELD_DELIMITER;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FIELD_DELIMITER_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FLUSH_INTERNAL_MS_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LINE_DELIMITER;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LINE_DELIMITER_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_OPTIONS_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_PROPERTIES_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.PASSWORD_KEY;
Expand Down Expand Up @@ -136,10 +132,6 @@ public DorisSinkFactory(SyncConf syncConf) {
.setDatabase(parameter.getStringVal(DATABASE_KEY))
.setTable(parameter.getStringVal(TABLE_KEY))
.setFeNodes((List<String>) parameter.getVal(FE_NODES_KEY))
.setFieldDelimiter(
parameter.getStringVal(FIELD_DELIMITER_KEY, FIELD_DELIMITER))
.setLineDelimiter(
parameter.getStringVal(LINE_DELIMITER_KEY, LINE_DELIMITER))
.setLoadOptions(loadConf)
.setLoadProperties(
parameter.getProperties(LOAD_PROPERTIES_KEY, new Properties()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ public class DorisDynamicTableFactory implements DynamicTableSinkFactory {
DorisOptions.EXEC_MEM_LIMIT,
DorisOptions.DESERIALIZE_QUEUE_SIZE,
DorisOptions.DESERIALIZE_ARROW_ASYNC,
DorisOptions.FIELD_DELIMITER,
DorisOptions.LINE_DELIMITER,
DorisOptions.MAX_RETRIES,
DorisOptions.WRITE_MODE,
DorisOptions.BATCH_SIZE)
Expand Down Expand Up @@ -112,8 +110,6 @@ private static DorisConf getConfByOptions(ReadableConfig config) {

LoadConf loadConf = getLoadConf(config);
dorisConf.setLoadConf(loadConf);
dorisConf.setFieldDelimiter(config.get(DorisOptions.FIELD_DELIMITER));
dorisConf.setLineDelimiter(config.get(DorisOptions.LINE_DELIMITER));
dorisConf.setLoadProperties(new Properties());
dorisConf.setMaxRetries(config.get(DorisOptions.MAX_RETRIES));
dorisConf.setWriteMode(config.get(DorisOptions.WRITE_MODE));
Expand Down