From 148d8bd7b37600ea8eec8bbde13325713fc5e1a8 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Tue, 12 Mar 2024 21:46:10 +0800 Subject: [PATCH] upd --- be/src/exec/exec_node.cpp | 26 +++++++++++----- be/src/exec/exec_node.h | 2 ++ be/src/pipeline/exec/join_probe_operator.cpp | 20 ++++++++++--- be/src/pipeline/exec/join_probe_operator.h | 7 ++++- .../exec/nested_loop_join_probe_operator.cpp | 4 ++- .../exec/nested_loop_join_probe_operator.h | 2 +- be/src/pipeline/pipeline_x/operator.cpp | 30 +++++++++++++------ be/src/pipeline/pipeline_x/operator.h | 2 ++ be/src/vec/exec/join/vjoin_node_base.cpp | 20 ++++++++++--- be/src/vec/exec/join/vjoin_node_base.h | 7 ++++- .../vec/exec/join/vnested_loop_join_node.cpp | 1 + be/src/vec/exec/join/vnested_loop_join_node.h | 2 +- 12 files changed, 94 insertions(+), 29 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 368e94562a264f..ed032d0976700e 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -514,6 +514,24 @@ std::string ExecNode::get_name() { Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { SCOPED_TIMER(_exec_timer); SCOPED_TIMER(_projection_timer); + auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { + if (to->is_nullable() && !from->is_nullable()) { + if (_keep_origin || !from->is_exclusive()) { + auto& null_column = reinterpret_cast(*to); + null_column.get_nested_column().insert_range_from(*from, 0, rows); + null_column.get_null_map_column().get_data().resize_fill(rows, 0); + } else { + to = make_nullable(from, false)->assume_mutable(); + } + } else { + if (_keep_origin || !from->is_exclusive()) { + to->insert_range_from(*from, 0, rows); + } else { + to = from->assume_mutable(); + } + } + }; + using namespace vectorized; MutableBlock mutable_block = VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor); @@ -535,13 +553,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo auto column_ptr = origin_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it - if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { - DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); - reinterpret_cast(mutable_columns[i].get()) - ->insert_range_from_not_nullable(*column_ptr, 0, rows); - } else { - mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); - } + insert_column_datas(mutable_columns[i], column_ptr, rows); } DCHECK(mutable_block.rows() == rows); output_block->set_columns(std::move(mutable_columns)); diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 1dd8979f5b36af..05e32528e9e58d 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -325,6 +325,8 @@ class ExecNode { std::shared_ptr _query_statistics = nullptr; + bool _keep_origin = false; + private: static Status create_tree_helper(RuntimeState* state, ObjectPool* pool, const std::vector& tnodes, diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 03b20fdb4d4d35..814d7c0fbf628e 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -82,6 +82,14 @@ template Status JoinProbeLocalState::_build_output_block( vectorized::Block* origin_block, vectorized::Block* output_block, bool keep_origin) { auto& p = Base::_parent->template cast(); + if (!Base::_projections.empty()) { + *output_block = *origin_block; + if (p._is_outer_join) { + DCHECK(output_block->columns() >= 2); + output_block->erase_tail(output_block->columns() - 2); + } + return Status::OK(); + } SCOPED_TIMER(_build_output_block_timer); auto is_mem_reuse = output_block->mem_reuse(); vectorized::MutableBlock mutable_block = @@ -197,14 +205,18 @@ JoinProbeOperatorX::JoinProbeOperatorX(ObjectPool* pool, const T _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, std::vector(tnode.hash_join_node.vintermediate_tuple_id_list.size()))); - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + if (!Base::_output_row_descriptor) { + _output_row_desc.reset( + new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + } } else if (tnode.__isset.nested_loop_join_node) { _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list, std::vector(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size()))); - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + if (!Base::_output_row_descriptor) { + _output_row_desc.reset(new RowDescriptor( + descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + } } else { // Iff BE has been upgraded and FE has not yet, we should keep origin logics for CROSS JOIN. DCHECK_EQ(_join_op, TJoinOp::CROSS_JOIN); diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 9bb716ff36dfe3..be68e547de0f68 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -70,7 +70,12 @@ class JoinProbeOperatorX : public StatefulOperatorX { Status init(const TPlanNode& tnode, RuntimeState* state) override; Status open(doris::RuntimeState* state) override; - [[nodiscard]] const RowDescriptor& row_desc() const override { return *_output_row_desc; } + [[nodiscard]] const RowDescriptor& row_desc() const override { + if (Base::_output_row_descriptor) { + return *Base::_output_row_descriptor; + } + return *_output_row_desc; + } [[nodiscard]] const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 9272418ca0ad60..35d5bc6085ee41 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -436,7 +436,9 @@ NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, con : JoinProbeOperatorX(pool, tnode, operator_id, descs), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), - _old_version_flag(!tnode.__isset.nested_loop_join_node) {} + _old_version_flag(!tnode.__isset.nested_loop_join_node) { + _keep_origin = _is_output_left_side_only; +} Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(JoinProbeOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 770289f397f2ba..7a8be87d922b90 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -228,7 +228,7 @@ class NestedLoopJoinProbeOperatorX final const RowDescriptor& row_desc() const override { return _old_version_flag ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) - : *_output_row_desc; + : (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc); } bool need_more_input_data(RuntimeState* state) const override; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 0c890b83041cc8..b4b66b1e7ff590 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -171,9 +171,28 @@ void PipelineXLocalStateBase::clear_origin_block() { Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block, vectorized::Block* output_block) const { - auto local_state = state->get_local_state(operator_id()); + auto* local_state = state->get_local_state(operator_id()); SCOPED_TIMER(local_state->exec_time_counter()); SCOPED_TIMER(local_state->_projection_timer); + + auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { + if (to->is_nullable() && !from->is_nullable()) { + if (_keep_origin || !from->is_exclusive()) { + auto& null_column = reinterpret_cast(*to); + null_column.get_nested_column().insert_range_from(*from, 0, rows); + null_column.get_null_map_column().get_data().resize_fill(rows, 0); + } else { + to = make_nullable(from, false)->assume_mutable(); + } + } else { + if (_keep_origin || !from->is_exclusive()) { + to->insert_range_from(*from, 0, rows); + } else { + to = from->assume_mutable(); + } + } + }; + using namespace vectorized; vectorized::MutableBlock mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, @@ -188,14 +207,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, &result_column_id)); auto column_ptr = origin_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); - //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it - if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { - DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); - reinterpret_cast(mutable_columns[i].get()) - ->insert_range_from_not_nullable(*column_ptr, 0, rows); - } else { - mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); - } + insert_column_datas(mutable_columns[i], column_ptr, rows); } DCHECK(mutable_block.rows() == rows); output_block->set_columns(std::move(mutable_columns)); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 06ba93a36f3832..18395229f9117b 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -326,6 +326,8 @@ class OperatorXBase : public OperatorBase { std::string _op_name; bool _ignore_data_distribution = false; int _parallel_tasks = 0; + + bool _keep_origin = false; }; template diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 6fab6b8b91f759..79ca34e8326d9b 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -95,14 +95,18 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des } if (tnode.__isset.hash_join_node) { - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + if (!_output_row_descriptor) { + _output_row_desc.reset( + new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + } _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, std::vector(tnode.hash_join_node.vintermediate_tuple_id_list.size()))); } else if (tnode.__isset.nested_loop_join_node) { - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + if (!_output_row_descriptor) { + _output_row_desc.reset(new RowDescriptor( + descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + } _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list, std::vector(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size()))); @@ -166,6 +170,14 @@ void VJoinNodeBase::_construct_mutable_join_block() { Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block, bool keep_origin) { SCOPED_TIMER(_build_output_block_timer); + if (!_projections.empty()) { + *output_block = *origin_block; + if (_is_outer_join) { + DCHECK(output_block->columns() >= 2); + output_block->erase_tail(output_block->columns() - 2); + } + return Status::OK(); + } auto is_mem_reuse = output_block->mem_reuse(); MutableBlock mutable_block = is_mem_reuse diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 0e6ac3c9837db8..449d58b6fb14da 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -63,7 +63,12 @@ class VJoinNodeBase : public ExecNode { Status open(RuntimeState* state) override; - const RowDescriptor& row_desc() const override { return *_output_row_desc; } + const RowDescriptor& row_desc() const override { + if (_output_row_descriptor) { + return *_output_row_descriptor; + } + return *_output_row_desc; + } const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 3548680bf4998e..77eef4d4da5a00 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -102,6 +102,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { if (tnode.nested_loop_join_node.__isset.is_output_left_side_only) { _is_output_left_side_only = tnode.nested_loop_join_node.is_output_left_side_only; + _keep_origin = _is_output_left_side_only; } if (tnode.nested_loop_join_node.__isset.join_conjuncts && diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index fd31b651bd3a13..18bc901222f3fb 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -92,7 +92,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { const RowDescriptor& row_desc() const override { return _old_version_flag ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) - : *_output_row_desc; + : (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc); } std::shared_ptr get_left_block() { return _left_block; }