Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 31 additions & 27 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ Status SegmentWriter::init(const vectorized::Block* block) {
return init(column_ids, true, block);
}

// Dynamic table with extended columns and directly write from delta writer
// Compaction/SchemaChange path will use the latest schema version of rowset
// as it's shcema, so it's block is not from dynamic table load procedure.
// If it is a dynamic table load procedure we should handle auto generated columns.
bool SegmentWriter::_should_create_writers_with_dynamic_block(size_t num_columns_in_block) {
return _tablet_schema->is_dynamic_schema() && _opts.is_direct_write &&
num_columns_in_block > _tablet_schema->columns().size();
}

Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
const vectorized::Block* block) {
DCHECK(_column_writers.empty());
Expand Down Expand Up @@ -193,8 +202,8 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
return Status::OK();
};

if (block) {
RETURN_IF_ERROR(_create_writers_with_block(block, create_column_writer));
if (block && _should_create_writers_with_dynamic_block(block->columns())) {
RETURN_IF_ERROR(_create_writers_with_dynamic_block(block, create_column_writer));
} else {
RETURN_IF_ERROR(_create_writers(create_column_writer));
}
Expand Down Expand Up @@ -227,38 +236,33 @@ Status SegmentWriter::_create_writers(
return Status::OK();
}

Status SegmentWriter::_create_writers_with_block(
// Dynamic Block consists of two parts, dynamic part of columns and static part of columns
// static dynamic
// | ----- | ------- |
// the static ones are original _tablet_schame columns
// the dynamic ones are auto generated and extended from file scan
Status SegmentWriter::_create_writers_with_dynamic_block(
const vectorized::Block* block,
std::function<Status(uint32_t, const TabletColumn&)> create_column_writer) {
// generate writers from schema and extended schema info
_olap_data_convertor->reserve(block->columns());
// new columns added, query column info from Master
vectorized::schema_util::FullBaseSchemaView schema_view;
if (block->columns() > _tablet_schema->num_columns()) {
schema_view.table_id = _tablet_schema->table_id();
RETURN_IF_ERROR(
vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
}
for (size_t i = 0; i < block->columns(); ++i) {
CHECK(block->columns() > _tablet_schema->num_columns());
schema_view.table_id = _tablet_schema->table_id();
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
// create writers with static columns
for (size_t i = 0; i < _tablet_schema->columns().size(); ++i) {
create_column_writer(i, _tablet_schema->column(i));
}
// create writers with auto generated columns
for (size_t i = _tablet_schema->columns().size(); i < block->columns(); ++i) {
const auto& column_type_name = block->get_by_position(i);
auto idx = _tablet_schema->field_index(column_type_name.name);
if (idx >= 0) {
RETURN_IF_ERROR(create_column_writer(i, _tablet_schema->column(idx)));
} else {
if (schema_view.column_name_to_column.count(column_type_name.name) == 0) {
// expr columns, maybe happend in query like `insert into table1 select function(column1), column2 from table2`
// the first column name may become `function(column1)`, so we use column offset to get columns info
// TODO here we could optimize to col_unique_id in the future
RETURN_IF_ERROR(create_column_writer(i, _tablet_schema->column(i)));
continue;
}
// extended columns
const auto& tcolumn = schema_view.column_name_to_column[column_type_name.name];
TabletColumn new_column(tcolumn);
RETURN_IF_ERROR(create_column_writer(i, new_column));
_opts.rowset_ctx->schema_change_recorder->add_extended_columns(
new_column, schema_view.schema_version);
}
const auto& tcolumn = schema_view.column_name_to_column[column_type_name.name];
TabletColumn new_column(tcolumn);
RETURN_IF_ERROR(create_column_writer(i, new_column));
_opts.rowset_ctx->schema_change_recorder->add_extended_columns(new_column,
schema_view.schema_version);
}
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class SegmentWriter {
bool is_unique_key() { return _tablet_schema->keys_type() == UNIQUE_KEYS; }

private:
Status _create_writers_with_block(
Status _create_writers_with_dynamic_block(
const vectorized::Block* block,
std::function<Status(uint32_t, const TabletColumn&)> writer_creator);
Status _create_writers(std::function<Status(uint32_t, const TabletColumn&)> writer_creator);
Expand All @@ -134,6 +134,7 @@ class SegmentWriter {
void set_min_max_key(const Slice& key);
void set_min_key(const Slice& key);
void set_max_key(const Slice& key);
bool _should_create_writers_with_dynamic_block(size_t num_columns_in_block);

void clear();

Expand Down
13 changes: 13 additions & 0 deletions regression-test/data/load_p0/insert/test_insert.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql1 --
false -2147483647 7 7
false 103 9 16
false 1002 2 18
false 5014 5 23
false 2147483647 8 31
true -2147483647 4 4
true 3021 1 15
true 3021 10 15
true 25699 6 21
true 2147483647 3 24

30 changes: 30 additions & 0 deletions regression-test/suites/load_p0/insert/test_insert.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,34 @@
suite("test_insert") {
// todo: test insert, such as insert values, insert select, insert txn
sql "show load"
def test_baseall = "test_query_db.baseall";
def test_bigtable = "test_query_db.bigtable";
def insert_tbl = "test_insert_tbl";

sql """ DROP TABLE IF EXISTS ${insert_tbl}"""
sql """
CREATE TABLE ${insert_tbl} (
`k1` char(5) NULL,
`k2` int(11) NULL,
`k3` tinyint(4) NULL,
`k4` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 5
PROPERTIES (
"replication_num"="1"
);
"""

sql """
INSERT INTO ${insert_tbl}
SELECT a.k6, a.k3, b.k1
, sum(b.k1) OVER (PARTITION BY a.k6 ORDER BY a.k3) AS w_sum
FROM ${test_baseall} a
JOIN ${test_bigtable} b ON a.k1 = b.k1 + 5
ORDER BY a.k6, a.k3, a.k1, w_sum
"""

qt_sql1 "select * from ${insert_tbl} order by 1, 2, 3, 4"
}