Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,24 @@ std::string ExecNode::get_name() {
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_projection_timer);
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
null_column.get_nested_column().insert_range_from(*from, 0, rows);
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
} else {
to = make_nullable(from, false)->assume_mutable();
}
} else {
if (_keep_origin || !from->is_exclusive()) {
to->insert_range_from(*from, 0, rows);
} else {
to = from->assume_mutable();
}
}
};

using namespace vectorized;
MutableBlock mutable_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
Expand All @@ -535,13 +553,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) {
DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable());
reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
->insert_range_from_not_nullable(*column_ptr, 0, rows);
} else {
mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ class ExecNode {

std::shared_ptr<QueryStatistics> _query_statistics = nullptr;

bool _keep_origin = false;

private:
static Status create_tree_helper(RuntimeState* state, ObjectPool* pool,
const std::vector<TPlanNode>& tnodes,
Expand Down
20 changes: 16 additions & 4 deletions be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ template <typename SharedStateArg, typename Derived>
Status JoinProbeLocalState<SharedStateArg, Derived>::_build_output_block(
vectorized::Block* origin_block, vectorized::Block* output_block, bool keep_origin) {
auto& p = Base::_parent->template cast<typename Derived::Parent>();
if (!Base::_projections.empty()) {
*output_block = *origin_block;
if (p._is_outer_join) {
DCHECK(output_block->columns() >= 2);
output_block->erase_tail(output_block->columns() - 2);
}
return Status::OK();
}
SCOPED_TIMER(_build_output_block_timer);
auto is_mem_reuse = output_block->mem_reuse();
vectorized::MutableBlock mutable_block =
Expand Down Expand Up @@ -197,14 +205,18 @@ JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.hash_join_node.vintermediate_tuple_id_list.size())));
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
if (!Base::_output_row_descriptor) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
}
} else if (tnode.__isset.nested_loop_join_node) {
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size())));
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
if (!Base::_output_row_descriptor) {
_output_row_desc.reset(new RowDescriptor(
descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
}
} else {
// Iff BE has been upgraded and FE has not yet, we should keep origin logics for CROSS JOIN.
DCHECK_EQ(_join_op, TJoinOp::CROSS_JOIN);
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ class JoinProbeOperatorX : public StatefulOperatorX<LocalStateType> {
Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status open(doris::RuntimeState* state) override;
[[nodiscard]] const RowDescriptor& row_desc() const override { return *_output_row_desc; }
[[nodiscard]] const RowDescriptor& row_desc() const override {
if (Base::_output_row_descriptor) {
return *Base::_output_row_descriptor;
}
return *_output_row_desc;
}

[[nodiscard]] const RowDescriptor& intermediate_row_desc() const override {
return *_intermediate_row_desc;
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, con
: JoinProbeOperatorX<NestedLoopJoinProbeLocalState>(pool, tnode, operator_id, descs),
_is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only &&
tnode.nested_loop_join_node.is_output_left_side_only),
_old_version_flag(!tnode.__isset.nested_loop_join_node) {}
_old_version_flag(!tnode.__isset.nested_loop_join_node) {
_keep_origin = _is_output_left_side_only;
}

Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(JoinProbeOperatorX<NestedLoopJoinProbeLocalState>::init(tnode, state));
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class NestedLoopJoinProbeOperatorX final
const RowDescriptor& row_desc() const override {
return _old_version_flag
? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor)
: *_output_row_desc;
: (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc);
}

bool need_more_input_data(RuntimeState* state) const override;
Expand Down
30 changes: 21 additions & 9 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,28 @@ void PipelineXLocalStateBase::clear_origin_block() {

Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block,
vectorized::Block* output_block) const {
auto local_state = state->get_local_state(operator_id());
auto* local_state = state->get_local_state(operator_id());
SCOPED_TIMER(local_state->exec_time_counter());
SCOPED_TIMER(local_state->_projection_timer);

auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
null_column.get_nested_column().insert_range_from(*from, 0, rows);
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
} else {
to = make_nullable(from, false)->assume_mutable();
}
} else {
if (_keep_origin || !from->is_exclusive()) {
to->insert_range_from(*from, 0, rows);
} else {
to = from->assume_mutable();
}
}
};

using namespace vectorized;
vectorized::MutableBlock mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
Expand All @@ -188,14 +207,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) {
DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable());
reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
->insert_range_from_not_nullable(*column_ptr, 0, rows);
} else {
mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ class OperatorXBase : public OperatorBase {
std::string _op_name;
bool _ignore_data_distribution = false;
int _parallel_tasks = 0;

bool _keep_origin = false;
};

template <typename LocalStateType>
Expand Down
20 changes: 16 additions & 4 deletions be/src/vec/exec/join/vjoin_node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,18 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des
}

if (tnode.__isset.hash_join_node) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
if (!_output_row_descriptor) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
}
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.hash_join_node.vintermediate_tuple_id_list.size())));
} else if (tnode.__isset.nested_loop_join_node) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
if (!_output_row_descriptor) {
_output_row_desc.reset(new RowDescriptor(
descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
}
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size())));
Expand Down Expand Up @@ -166,6 +170,14 @@ void VJoinNodeBase::_construct_mutable_join_block() {
Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block,
bool keep_origin) {
SCOPED_TIMER(_build_output_block_timer);
if (!_projections.empty()) {
*output_block = *origin_block;
if (_is_outer_join) {
DCHECK(output_block->columns() >= 2);
output_block->erase_tail(output_block->columns() - 2);
}
return Status::OK();
}
auto is_mem_reuse = output_block->mem_reuse();
MutableBlock mutable_block =
is_mem_reuse
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/join/vjoin_node_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ class VJoinNodeBase : public ExecNode {

Status open(RuntimeState* state) override;

const RowDescriptor& row_desc() const override { return *_output_row_desc; }
const RowDescriptor& row_desc() const override {
if (_output_row_descriptor) {
return *_output_row_descriptor;
}
return *_output_row_desc;
}

const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; }

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/join/vnested_loop_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {

if (tnode.nested_loop_join_node.__isset.is_output_left_side_only) {
_is_output_left_side_only = tnode.nested_loop_join_node.is_output_left_side_only;
_keep_origin = _is_output_left_side_only;
}

if (tnode.nested_loop_join_node.__isset.join_conjuncts &&
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/join/vnested_loop_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase {
const RowDescriptor& row_desc() const override {
return _old_version_flag
? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor)
: *_output_row_desc;
: (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc);
}

std::shared_ptr<Block> get_left_block() { return _left_block; }
Expand Down