From 931224834fd4f7e16e8e35d42aacf6f6f41f6015 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 25 Jun 2026 22:47:45 +0800 Subject: [PATCH 1/4] [fix](be) Avoid local runtime filter merge deadlock ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: Local runtime filter merge can deadlock when one join build instance publishes a local-merge runtime filter while another instance sends its runtime filter size. The old local merge context lock protected both the merger and the producer list, so one path could hold a producer runtime filter lock and then wait for the context lock while another path held the context lock and then waited for a producer lock. This change gives RuntimeFilterMerger its own internal synchronization and makes LocalMergeContext expose a snapshot of the merger and producers. Publish, send-size, and sync-size paths take the context lock only while copying that snapshot, then merge filters or update producer sizes outside the context lock. RuntimeFilterMerger returns the ready transition from merge_from directly, removing the separate unlocked ready check. ### Release note None ### Check List (For Author) - Test: Unit Test - build-support/clang-format.sh be/src/exec/runtime_filter/runtime_filter_merger.h be/src/exec/runtime_filter/runtime_filter_mgr.cpp be/src/exec/runtime_filter/runtime_filter_mgr.h be/src/exec/runtime_filter/runtime_filter_producer.cpp be/test/exec/runtime_filter/runtime_filter_merger_test.cpp be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp - git diff --cached --check - ./run-be-ut.sh --run --filter=RuntimeFilterMgrTest.* - ./run-be-ut.sh --run --filter=RuntimeFilterMergerTest.* - Behavior changed: No - Does this need documentation: No --- .../runtime_filter/runtime_filter_merger.h | 26 ++--- .../runtime_filter/runtime_filter_mgr.cpp | 103 ++++++++++++------ .../exec/runtime_filter/runtime_filter_mgr.h | 22 +++- .../runtime_filter_producer.cpp | 46 ++++---- .../runtime_filter_merger_test.cpp | 24 ++-- .../runtime_filter_mgr_test.cpp | 31 +++--- 6 files changed, 148 insertions(+), 104 deletions(-) diff --git a/be/src/exec/runtime_filter/runtime_filter_merger.h b/be/src/exec/runtime_filter/runtime_filter_merger.h index 9405d2405a52fb..c18d261ecc34ce 100644 --- a/be/src/exec/runtime_filter/runtime_filter_merger.h +++ b/be/src/exec/runtime_filter/runtime_filter_merger.h @@ -46,6 +46,7 @@ class RuntimeFilterMerger : public RuntimeFilter { } std::string debug_string() override { + std::unique_lock l(_rmtx); return fmt::format( "Merger: ({}, expected_producer_num: {}, received_producer_num: {}, " "received_rf_size_num: {}, received_sum_size: {})", @@ -54,12 +55,15 @@ class RuntimeFilterMerger : public RuntimeFilter { } // If input is a disabled predicate, the final result is a disabled predicate. - Status merge_from(const RuntimeFilter* other) { + // Returns true only for the call that makes the merger ready. + Status merge_from(const RuntimeFilter* other, bool* ready) { + std::unique_lock l(_rmtx); _received_producer_num++; if (_expected_producer_num < _received_producer_num) { return Status::InternalError( "runtime filter merger input product more than expected, {}", debug_string()); } + *ready = _received_producer_num == _expected_producer_num; if (_received_producer_num == _expected_producer_num) { _rf_state = State::READY; } @@ -72,6 +76,7 @@ class RuntimeFilterMerger : public RuntimeFilter { } void set_expected_producer_num(int num) { + std::unique_lock l(_rmtx); if (_received_producer_num > 0 || _received_rf_size_num > 0) { throw Exception(ErrorCode::INTERNAL_ERROR, "runtime filter merger set expected producer after receive data, {}", @@ -80,9 +85,13 @@ class RuntimeFilterMerger : public RuntimeFilter { _expected_producer_num = num; } - int get_expected_producer_num() const { return _expected_producer_num; } + int get_expected_producer_num() { + std::unique_lock l(_rmtx); + return _expected_producer_num; + } bool add_rf_size(uint64_t size) { + std::unique_lock l(_rmtx); _received_rf_size_num++; if (_expected_producer_num < _received_rf_size_num) { throw Exception(ErrorCode::INTERNAL_ERROR, @@ -93,18 +102,9 @@ class RuntimeFilterMerger : public RuntimeFilter { return (_received_rf_size_num == _expected_producer_num); } - uint64_t get_received_sum_size() const { return _received_sum_size; } - - bool ready() const { return _rf_state == State::READY; } - - void set_wrapper_state_and_ready_to_apply(RuntimeFilterWrapper::State state, - std::string reason = "") { + uint64_t get_received_sum_size() { std::unique_lock l(_rmtx); - if (_rf_state == State::READY) { - return; - } - _wrapper->set_state(state, reason); - _rf_state = State::READY; + return _received_sum_size; } private: diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp index f8e687f09b7f7b..7b39239251ab3b 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp @@ -104,27 +104,60 @@ Status RuntimeFilterMgr::register_local_merger_producer_filter( Status LocalMergeContext::register_producer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, std::shared_ptr producer) { - std::lock_guard l(mtx); - if (producer->stage() > stage) { + std::lock_guard l(_mtx); + if (producer->stage() > _stage) { // New recursive CTE round: discard stale merger and producers from // the previous round and recreate the merger for the new round. - merger.reset(); - producers.clear(); - stage = producer->stage(); + _merger.reset(); + _producers.clear(); + _stage = producer->stage(); } - if (!merger) { - RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger)); + if (!_merger) { + RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &_merger)); } - producers.emplace_back(producer); - merger->set_expected_producer_num(cast_set(producers.size())); + _producers.emplace_back(producer); + _merger->set_expected_producer_num(cast_set(_producers.size())); // Sync the local merger's stage from the producer so that outgoing merge RPCs // (via _push_to_remote) carry the correct recursive CTE round number. - merger->set_stage(producer->stage()); + _merger->set_stage(producer->stage()); return Status::OK(); } -Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id, - LocalMergeContext** local_merge_filters) { +Status LocalMergeContext::snapshot(uint32_t expected_stage, LocalMergeContextSnapshot* snapshot) { + std::lock_guard l(_mtx); + if (expected_stage != _stage) { + return Status::OK(); + } + if (!_merger) { + return Status::InternalError("local merge context merger is nullptr"); + } + snapshot->merger = _merger; + snapshot->producers = _producers; + return Status::OK(); +} + +std::string LocalMergeContext::debug_string() { + std::shared_ptr merger; + std::vector> producers; + uint32_t stage = 0; + { + std::lock_guard l(_mtx); + merger = _merger; + producers = _producers; + stage = _stage; + } + + std::string result = + fmt::format("stage: {}, {}\n", stage, + merger ? merger->debug_string() : "local merge context merger is nullptr"); + for (const auto& producer : producers) { + result += fmt::format("{}\n", producer->debug_string()); + } + return result; +} + +Status RuntimeFilterMgr::get_local_merge_snapshot(int filter_id, uint32_t expected_stage, + LocalMergeContextSnapshot* snapshot) { if (!_is_global) [[unlikely]] { return Status::InternalError( "A local merge filter can not be registered in Local RuntimeFilterMgr"); @@ -132,17 +165,10 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id, std::lock_guard l(_lock); auto iter = _local_merge_map.find(filter_id); if (iter == _local_merge_map.end()) { - // Filter may have been removed during a recursive CTE stage reset. // Return OK with nullptr to let the caller skip gracefully. - *local_merge_filters = nullptr; return Status::OK(); } - *local_merge_filters = &iter->second; - if (!iter->second.merger) { - return Status::InternalError("local merge context merger is nullptr for filter_id: {}", - filter_id); - } - return Status::OK(); + return iter->second.snapshot(expected_stage, snapshot); } Status RuntimeFilterMgr::register_producer_filter( @@ -260,6 +286,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptradd_rf_size(request->filter_size())) { + uint64_t received_sum_size = cnt_val.merger->get_received_sum_size(); cnt_val.sync_size_callbacks.resize(cnt_val.source_addrs.size()); for (size_t i = 0; i < cnt_val.source_addrs.size(); ++i) { auto& addr = cnt_val.source_addrs[i]; @@ -294,7 +321,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptrrequest_->set_filter_id(filter_id); - closure->request_->set_filter_size(cnt_val.merger->get_received_sum_size()); + closure->request_->set_filter_size(received_sum_size); stub->sync_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), closure.get()); closure.release(); @@ -304,13 +331,13 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptrfilter_id(), &local_merge_filters)); - if (local_merge_filters == nullptr) { + LocalMergeContextSnapshot snapshot; + RETURN_IF_ERROR(get_local_merge_snapshot(request->filter_id(), request->stage(), &snapshot)); + if (!snapshot.merger) { // Filter was removed during a recursive CTE stage reset; discard stale request. return Status::OK(); } - for (auto producer : local_merge_filters->producers) { + for (const auto& producer : snapshot.producers) { producer->set_synced_size(request->filter_size()); } return Status::OK(); @@ -318,18 +345,23 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) std::string RuntimeFilterMgr::debug_string() { std::string result = "Local Merger Info:\n"; - std::lock_guard l(_lock); - for (const auto& [filter_id, ctx] : _local_merge_map) { - result += fmt::format("{}\n", ctx.merger->debug_string()); - for (const auto& producer : ctx.producers) { - result += fmt::format("{}\n", producer->debug_string()); + std::vector> local_merge_contexts; + std::vector> consumers; + { + std::lock_guard l(_lock); + for (auto& [filter_id, ctx] : _local_merge_map) { + local_merge_contexts.emplace_back(filter_id, &ctx); + } + for (const auto& [filter_id, filter_consumers] : _consumer_map) { + consumers.insert(consumers.end(), filter_consumers.begin(), filter_consumers.end()); } } + for (const auto& [filter_id, ctx] : local_merge_contexts) { + result += fmt::format("filter_id: {}, {}", filter_id, ctx->debug_string()); + } result += "Consumer Info:\n"; - for (const auto& [filter_id, consumers] : _consumer_map) { - for (const auto& consumer : consumers) { - result += fmt::format("{}\n", consumer->debug_string()); - } + for (const auto& consumer : consumers) { + result += fmt::format("{}\n", consumer->debug_string()); } return result; } @@ -373,10 +405,9 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr q RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data)); - RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get())); + RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get(), &is_ready)); cnt_val.arrive_id.insert(UniqueId(request->fragment_instance_id())); - is_ready = cnt_val.merger->ready(); // update is_ready in locked scope } if (is_ready) { diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h b/be/src/exec/runtime_filter/runtime_filter_mgr.h index 536eb63e152caa..bf45ee249046e3 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.h +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h @@ -55,16 +55,25 @@ template class HandleErrorBrpcCallback; class SyncSizeCallback; -struct LocalMergeContext { - std::mutex mtx; +struct LocalMergeContextSnapshot { std::shared_ptr merger; std::vector> producers; - // Tracks the recursive CTE round. When a producer from a newer round - // registers, the context is reset (merger recreated, old producers dropped). - uint32_t stage = 0; +}; +class LocalMergeContext { +public: Status register_producer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, std::shared_ptr producer); + Status snapshot(uint32_t expected_stage, LocalMergeContextSnapshot* snapshot); + std::string debug_string(); + +private: + std::mutex _mtx; + std::shared_ptr _merger; + std::vector> _producers; + // Tracks the recursive CTE round. When a producer from a newer round + // registers, the context is reset (merger recreated, old producers dropped). + uint32_t _stage = 0; }; struct GlobalMergeContext { @@ -102,7 +111,8 @@ class RuntimeFilterMgr { const TRuntimeFilterDesc& desc, std::shared_ptr producer); - Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** local_merge_filters); + Status get_local_merge_snapshot(int filter_id, uint32_t expected_stage, + LocalMergeContextSnapshot* snapshot); // Create local producer. This producer is hold by RuntimeFilterProducerHelper. Status register_producer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, diff --git a/be/src/exec/runtime_filter/runtime_filter_producer.cpp b/be/src/exec/runtime_filter/runtime_filter_producer.cpp index 7da8c6871121f6..dcdad765d8402c 100644 --- a/be/src/exec/runtime_filter/runtime_filter_producer.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_producer.cpp @@ -53,20 +53,20 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table // when global consumer not exist, send_to_local_targets will do nothing, so merge rf is useless return Status::OK(); } - LocalMergeContext* context = nullptr; - RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _wrapper->filter_id(), &context)); - if (context == nullptr) { + LocalMergeContextSnapshot snapshot; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_snapshot( + _wrapper->filter_id(), _stage, &snapshot)); + if (!snapshot.merger) { // Filter was removed during a recursive CTE stage reset; this producer is stale. return Status::OK(); } - std::lock_guard l(context->mtx); - RETURN_IF_ERROR(context->merger->merge_from(this)); - if (context->merger->ready()) { + bool ready = false; + RETURN_IF_ERROR(snapshot.merger->merge_from(this, &ready)); + if (ready) { if (_has_remote_target) { - RETURN_IF_ERROR(_send_to_remote_targets(state, context->merger.get())); + RETURN_IF_ERROR(_send_to_remote_targets(state, snapshot.merger.get())); } else { - RETURN_IF_ERROR(_send_to_local_targets(state, context->merger.get(), true)); + RETURN_IF_ERROR(_send_to_local_targets(state, snapshot.merger.get(), true)); } } return Status::OK(); @@ -123,26 +123,26 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt set_state(State::WAITING_FOR_SYNCED_SIZE); if (_need_do_merge(state)) { - LocalMergeContext* merger_context = nullptr; - RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _wrapper->filter_id(), &merger_context)); - if (merger_context == nullptr) { + LocalMergeContextSnapshot snapshot; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_snapshot( + _wrapper->filter_id(), _stage, &snapshot)); + if (!snapshot.merger) { // Filter was removed during a recursive CTE stage reset; this producer is stale. return Status::OK(); } - std::lock_guard merger_lock(merger_context->mtx); - if (merger_context->merger->add_rf_size(local_filter_size)) { - if (!_has_remote_target) { - for (auto filter : merger_context->producers) { - filter->set_synced_size(merger_context->merger->get_received_sum_size()); - } - return Status::OK(); - } else { - local_filter_size = merger_context->merger->get_received_sum_size(); + uint64_t received_sum_size = 0; + bool ready_to_sync = snapshot.merger->add_rf_size(local_filter_size); + if (!ready_to_sync) { + return Status::OK(); + } + received_sum_size = snapshot.merger->get_received_sum_size(); + if (!_has_remote_target) { + for (const auto& filter : snapshot.producers) { + filter->set_synced_size(received_sum_size); } - } else { return Status::OK(); } + local_filter_size = received_sum_size; } else if (!_has_remote_target) { set_synced_size(local_filter_size); diff --git a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp index da9d5b66291795..e340d160a9948a 100644 --- a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp @@ -36,23 +36,23 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { FAIL_IF_ERROR_OR_CATCH_EXCEPTION( RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); merger->set_expected_producer_num(2); - ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED); + bool ready = false; std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(first_product_state); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); - ASSERT_FALSE(merger->ready()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); + ASSERT_FALSE(ready); ASSERT_EQ(merger->_wrapper->_state, first_expected_state); std::shared_ptr producer2; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[1]->register_producer_runtime_filter(desc, &producer2)); producer2->set_wrapper_state_and_ready_to_publish(second_product_state); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get())); - ASSERT_TRUE(merger->ready()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get(), &ready)); + ASSERT_TRUE(ready); ASSERT_EQ(merger->_wrapper->_state, second_expected_state); } @@ -64,15 +64,15 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { FAIL_IF_ERROR_OR_CATCH_EXCEPTION( RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); merger->set_expected_producer_num(1); - ASSERT_FALSE(merger->ready()); + bool ready = false; std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123)); producer->set_wrapper_state_and_ready_to_publish(state); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); - ASSERT_TRUE(merger->ready()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); + ASSERT_TRUE(ready); PMergeFilterRequest request; void* data = nullptr; @@ -104,7 +104,6 @@ TEST_F(RuntimeFilterMergerTest, add_rf_size) { ASSERT_FALSE(merger->add_rf_size(123)); ASSERT_TRUE(merger->add_rf_size(1)); ASSERT_EQ(merger->get_received_sum_size(), 124); - ASSERT_FALSE(merger->ready()); try { ASSERT_TRUE(merger->add_rf_size(1)); @@ -119,21 +118,22 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) { auto desc = TRuntimeFilterDescBuilder().build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); merger->set_expected_producer_num(1); - ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED); + bool ready = false; std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); // ready wrapper + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); + ASSERT_TRUE(ready); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY); std::shared_ptr producer2; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[1]->register_producer_runtime_filter(desc, &producer2)); producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); - auto st = merger->merge_from(producer2.get()); + auto st = merger->merge_from(producer2.get(), &ready); ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR); } diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp index d6ccc080961333..599cf9e35e2231 100644 --- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp @@ -103,28 +103,31 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { .ok()); EXPECT_NE(producer_filter, nullptr); - LocalMergeContext* local_merge_filters = nullptr; + LocalMergeContextSnapshot snapshot; // filter_id not yet registered: global mgr returns OK with nullptr // (graceful skip for recursive CTE stage reset). - EXPECT_TRUE(global_runtime_filter_mgr - ->get_local_merge_producer_filters(filter_id, &local_merge_filters) - .ok()); - EXPECT_EQ(local_merge_filters, nullptr); + EXPECT_TRUE( + global_runtime_filter_mgr + ->get_local_merge_snapshot(filter_id, producer_filter->stage(), &snapshot) + .ok()); + EXPECT_EQ(snapshot.merger, nullptr); // local mgr always returns error (not supported) - EXPECT_FALSE(local_runtime_filter_mgr - ->get_local_merge_producer_filters(filter_id, &local_merge_filters) - .ok()); + EXPECT_FALSE( + local_runtime_filter_mgr + ->get_local_merge_snapshot(filter_id, producer_filter->stage(), &snapshot) + .ok()); // Register local merge filter EXPECT_TRUE( global_runtime_filter_mgr ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) .ok()); - EXPECT_TRUE(global_runtime_filter_mgr - ->get_local_merge_producer_filters(filter_id, &local_merge_filters) - .ok()); - EXPECT_NE(local_merge_filters, nullptr); - EXPECT_EQ(local_merge_filters->producers.size(), 1); - local_merge_filters->producers.front()->_rf_state = + EXPECT_TRUE( + global_runtime_filter_mgr + ->get_local_merge_snapshot(filter_id, producer_filter->stage(), &snapshot) + .ok()); + EXPECT_NE(snapshot.merger, nullptr); + EXPECT_EQ(snapshot.producers.size(), 1); + snapshot.producers.front()->_rf_state = RuntimeFilterProducer::State ::WAITING_FOR_SYNCED_SIZE; } { From c3d64bdfdd9350cfa3c83fb466e5b6988b269a35 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 26 Jun 2026 16:24:20 +0800 Subject: [PATCH 2/4] update --- .../runtime_filter/runtime_filter_mgr.cpp | 88 +++++++------------ .../exec/runtime_filter/runtime_filter_mgr.h | 33 +++---- .../runtime_filter_producer.cpp | 28 +++--- .../runtime_filter_mgr_test.cpp | 27 +++--- 4 files changed, 76 insertions(+), 100 deletions(-) diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp index 7b39239251ab3b..80acc157643d2a 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp @@ -91,84 +91,57 @@ Status RuntimeFilterMgr::register_local_merger_producer_filter( SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; - LocalMergeContext* context; - { - std::lock_guard l(_lock); - context = &_local_merge_map[key]; // may inplace construct default object + std::lock_guard l(_lock); + auto& context = _local_merge_map[key]; + if (!context || producer->stage() > context->stage()) { + std::shared_ptr merger; + RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc, &merger)); + context = std::make_shared(producer->stage(), std::move(merger)); } - RETURN_IF_ERROR(context->register_producer(query_ctx, &desc, producer)); + context->register_producer(producer); return Status::OK(); } -Status LocalMergeContext::register_producer(const QueryContext* query_ctx, - const TRuntimeFilterDesc* desc, - std::shared_ptr producer) { - std::lock_guard l(_mtx); - if (producer->stage() > _stage) { - // New recursive CTE round: discard stale merger and producers from - // the previous round and recreate the merger for the new round. - _merger.reset(); - _producers.clear(); - _stage = producer->stage(); - } - if (!_merger) { - RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &_merger)); - } +void LocalMergeContext::register_producer(std::shared_ptr producer) { _producers.emplace_back(producer); _merger->set_expected_producer_num(cast_set(_producers.size())); // Sync the local merger's stage from the producer so that outgoing merge RPCs // (via _push_to_remote) carry the correct recursive CTE round number. _merger->set_stage(producer->stage()); - return Status::OK(); -} - -Status LocalMergeContext::snapshot(uint32_t expected_stage, LocalMergeContextSnapshot* snapshot) { - std::lock_guard l(_mtx); - if (expected_stage != _stage) { - return Status::OK(); - } - if (!_merger) { - return Status::InternalError("local merge context merger is nullptr"); - } - snapshot->merger = _merger; - snapshot->producers = _producers; - return Status::OK(); } std::string LocalMergeContext::debug_string() { - std::shared_ptr merger; - std::vector> producers; - uint32_t stage = 0; - { - std::lock_guard l(_mtx); - merger = _merger; - producers = _producers; - stage = _stage; - } - - std::string result = - fmt::format("stage: {}, {}\n", stage, - merger ? merger->debug_string() : "local merge context merger is nullptr"); - for (const auto& producer : producers) { + std::string result = fmt::format( + "stage: {}, {}\n", _stage, + _merger ? _merger->debug_string() : "local merge context merger is nullptr"); + for (const auto& producer : _producers) { result += fmt::format("{}\n", producer->debug_string()); } return result; } -Status RuntimeFilterMgr::get_local_merge_snapshot(int filter_id, uint32_t expected_stage, - LocalMergeContextSnapshot* snapshot) { +Status RuntimeFilterMgr::get_local_merge_context(int filter_id, uint32_t expected_stage, + std::shared_ptr* context) { if (!_is_global) [[unlikely]] { return Status::InternalError( "A local merge filter can not be registered in Local RuntimeFilterMgr"); } + context->reset(); std::lock_guard l(_lock); auto iter = _local_merge_map.find(filter_id); if (iter == _local_merge_map.end()) { // Return OK with nullptr to let the caller skip gracefully. return Status::OK(); } - return iter->second.snapshot(expected_stage, snapshot); + if (!iter->second) { + return Status::InternalError("local merge context is nullptr for filter_id: {}", filter_id); + } + if (expected_stage != iter->second->stage()) { + return Status::OK(); + } + *context = iter->second; + return Status::OK(); } Status RuntimeFilterMgr::register_producer_filter( @@ -331,13 +304,13 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptrfilter_id(), request->stage(), &snapshot)); - if (!snapshot.merger) { + std::shared_ptr context; + RETURN_IF_ERROR(get_local_merge_context(request->filter_id(), request->stage(), &context)); + if (!context) { // Filter was removed during a recursive CTE stage reset; discard stale request. return Status::OK(); } - for (const auto& producer : snapshot.producers) { + for (const auto& producer : context->producers()) { producer->set_synced_size(request->filter_size()); } return Status::OK(); @@ -345,19 +318,20 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) std::string RuntimeFilterMgr::debug_string() { std::string result = "Local Merger Info:\n"; - std::vector> local_merge_contexts; + std::vector>> local_merge_contexts; std::vector> consumers; { std::lock_guard l(_lock); for (auto& [filter_id, ctx] : _local_merge_map) { - local_merge_contexts.emplace_back(filter_id, &ctx); + local_merge_contexts.emplace_back(filter_id, ctx); } for (const auto& [filter_id, filter_consumers] : _consumer_map) { consumers.insert(consumers.end(), filter_consumers.begin(), filter_consumers.end()); } } for (const auto& [filter_id, ctx] : local_merge_contexts) { - result += fmt::format("filter_id: {}, {}", filter_id, ctx->debug_string()); + result += fmt::format("filter_id: {}, {}", filter_id, + ctx ? ctx->debug_string() : "local merge context is nullptr\n"); } result += "Consumer Info:\n"; for (const auto& consumer : consumers) { diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h b/be/src/exec/runtime_filter/runtime_filter_mgr.h index bf45ee249046e3..a163781665522f 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.h +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include "common/status.h" @@ -55,24 +56,26 @@ template class HandleErrorBrpcCallback; class SyncSizeCallback; -struct LocalMergeContextSnapshot { - std::shared_ptr merger; - std::vector> producers; -}; - class LocalMergeContext { public: - Status register_producer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, - std::shared_ptr producer); - Status snapshot(uint32_t expected_stage, LocalMergeContextSnapshot* snapshot); + LocalMergeContext(uint32_t stage, std::shared_ptr merger) + : _merger(std::move(merger)), _stage(stage) {} + + void register_producer(std::shared_ptr producer); std::string debug_string(); + uint32_t stage() const { return _stage; } + const std::shared_ptr& merger() const { return _merger; } + const std::vector>& producers() const { + return _producers; + } + private: - std::mutex _mtx; std::shared_ptr _merger; std::vector> _producers; // Tracks the recursive CTE round. When a producer from a newer round - // registers, the context is reset (merger recreated, old producers dropped). + // registers, RuntimeFilterMgr replaces the whole context and old in-flight + // users keep the previous context alive through shared_ptr. uint32_t _stage = 0; }; @@ -111,8 +114,8 @@ class RuntimeFilterMgr { const TRuntimeFilterDesc& desc, std::shared_ptr producer); - Status get_local_merge_snapshot(int filter_id, uint32_t expected_stage, - LocalMergeContextSnapshot* snapshot); + Status get_local_merge_context(int filter_id, uint32_t expected_stage, + std::shared_ptr* context); // Create local producer. This producer is hold by RuntimeFilterProducerHelper. Status register_producer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, @@ -128,8 +131,8 @@ class RuntimeFilterMgr { void remove_filter(int32_t filter_id) { std::lock_guard l(_lock); _consumer_map.erase(filter_id); - // NOTE: _local_merge_map is NOT erased here. It is reset lazily in - // LocalMergeContext::register_producer when a producer from a newer + // NOTE: _local_merge_map is NOT erased here. It is replaced lazily in + // register_local_merger_producer_filter when a producer from a newer // recursive CTE round registers. Erasing eagerly here would race with // multi-fragment REBUILD: a consumer-only fragment's remove_filter could // delete the entry that the producer fragment just re-registered. @@ -154,7 +157,7 @@ class RuntimeFilterMgr { // key: "filter-id" std::map>> _consumer_map; std::set _producer_id_set; - std::map _local_merge_map; + std::map> _local_merge_map; std::unique_ptr _tracker; diff --git a/be/src/exec/runtime_filter/runtime_filter_producer.cpp b/be/src/exec/runtime_filter/runtime_filter_producer.cpp index dcdad765d8402c..fcff6f4405fa07 100644 --- a/be/src/exec/runtime_filter/runtime_filter_producer.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_producer.cpp @@ -53,20 +53,20 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table // when global consumer not exist, send_to_local_targets will do nothing, so merge rf is useless return Status::OK(); } - LocalMergeContextSnapshot snapshot; - RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_snapshot( - _wrapper->filter_id(), _stage, &snapshot)); - if (!snapshot.merger) { + std::shared_ptr context; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context( + _wrapper->filter_id(), _stage, &context)); + if (!context) { // Filter was removed during a recursive CTE stage reset; this producer is stale. return Status::OK(); } bool ready = false; - RETURN_IF_ERROR(snapshot.merger->merge_from(this, &ready)); + RETURN_IF_ERROR(context->merger()->merge_from(this, &ready)); if (ready) { if (_has_remote_target) { - RETURN_IF_ERROR(_send_to_remote_targets(state, snapshot.merger.get())); + RETURN_IF_ERROR(_send_to_remote_targets(state, context->merger().get())); } else { - RETURN_IF_ERROR(_send_to_local_targets(state, snapshot.merger.get(), true)); + RETURN_IF_ERROR(_send_to_local_targets(state, context->merger().get(), true)); } } return Status::OK(); @@ -123,21 +123,21 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt set_state(State::WAITING_FOR_SYNCED_SIZE); if (_need_do_merge(state)) { - LocalMergeContextSnapshot snapshot; - RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_snapshot( - _wrapper->filter_id(), _stage, &snapshot)); - if (!snapshot.merger) { + std::shared_ptr context; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context( + _wrapper->filter_id(), _stage, &context)); + if (!context) { // Filter was removed during a recursive CTE stage reset; this producer is stale. return Status::OK(); } uint64_t received_sum_size = 0; - bool ready_to_sync = snapshot.merger->add_rf_size(local_filter_size); + bool ready_to_sync = context->merger()->add_rf_size(local_filter_size); if (!ready_to_sync) { return Status::OK(); } - received_sum_size = snapshot.merger->get_received_sum_size(); + received_sum_size = context->merger()->get_received_sum_size(); if (!_has_remote_target) { - for (const auto& filter : snapshot.producers) { + for (const auto& filter : context->producers()) { filter->set_synced_size(received_sum_size); } return Status::OK(); diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp index 599cf9e35e2231..e9c2df288fbbe8 100644 --- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp @@ -103,31 +103,30 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { .ok()); EXPECT_NE(producer_filter, nullptr); - LocalMergeContextSnapshot snapshot; + std::shared_ptr context; // filter_id not yet registered: global mgr returns OK with nullptr // (graceful skip for recursive CTE stage reset). - EXPECT_TRUE( - global_runtime_filter_mgr - ->get_local_merge_snapshot(filter_id, producer_filter->stage(), &snapshot) - .ok()); - EXPECT_EQ(snapshot.merger, nullptr); + EXPECT_TRUE(global_runtime_filter_mgr + ->get_local_merge_context(filter_id, producer_filter->stage(), &context) + .ok()); + EXPECT_EQ(context, nullptr); // local mgr always returns error (not supported) EXPECT_FALSE( local_runtime_filter_mgr - ->get_local_merge_snapshot(filter_id, producer_filter->stage(), &snapshot) + ->get_local_merge_context(filter_id, producer_filter->stage(), &context) .ok()); // Register local merge filter EXPECT_TRUE( global_runtime_filter_mgr ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) .ok()); - EXPECT_TRUE( - global_runtime_filter_mgr - ->get_local_merge_snapshot(filter_id, producer_filter->stage(), &snapshot) - .ok()); - EXPECT_NE(snapshot.merger, nullptr); - EXPECT_EQ(snapshot.producers.size(), 1); - snapshot.producers.front()->_rf_state = + EXPECT_TRUE(global_runtime_filter_mgr + ->get_local_merge_context(filter_id, producer_filter->stage(), &context) + .ok()); + EXPECT_NE(context, nullptr); + EXPECT_NE(context->merger(), nullptr); + EXPECT_EQ(context->producers().size(), 1); + context->producers().front()->_rf_state = RuntimeFilterProducer::State ::WAITING_FOR_SYNCED_SIZE; } { From e41df92c573a1d85a6590ea49bff4d83ac4bdd9b Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 26 Jun 2026 16:46:49 +0800 Subject: [PATCH 3/4] update --- .../runtime_filter/runtime_filter_merger.h | 12 +++++++ .../runtime_filter/runtime_filter_mgr.cpp | 36 +++++++++---------- .../exec/runtime_filter/runtime_filter_mgr.h | 21 +++-------- .../runtime_filter_producer.cpp | 12 +++---- .../runtime_filter_merger_test.cpp | 8 +++++ .../runtime_filter_mgr_test.cpp | 6 ++-- 6 files changed, 49 insertions(+), 46 deletions(-) diff --git a/be/src/exec/runtime_filter/runtime_filter_merger.h b/be/src/exec/runtime_filter/runtime_filter_merger.h index c18d261ecc34ce..267ffc05e6675b 100644 --- a/be/src/exec/runtime_filter/runtime_filter_merger.h +++ b/be/src/exec/runtime_filter/runtime_filter_merger.h @@ -107,6 +107,18 @@ class RuntimeFilterMerger : public RuntimeFilter { return _received_sum_size; } + bool ready() const { return _rf_state == State::READY; } + + void set_wrapper_state_and_ready_to_apply(RuntimeFilterWrapper::State state, + std::string reason = "") { + std::unique_lock l(_rmtx); + if (_rf_state == State::READY) { + return; + } + _wrapper->set_state(state, reason); + _rf_state = State::READY; + } + private: RuntimeFilterMerger(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc) : RuntimeFilter(desc), _rf_state(State::WAITING_FOR_PRODUCT) {} diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp index 80acc157643d2a..1545333e3481d8 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp @@ -93,29 +93,26 @@ Status RuntimeFilterMgr::register_local_merger_producer_filter( std::lock_guard l(_lock); auto& context = _local_merge_map[key]; - if (!context || producer->stage() > context->stage()) { - std::shared_ptr merger; - RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc, &merger)); - context = std::make_shared(producer->stage(), std::move(merger)); + if (!context || producer->stage() > context->stage) { + auto new_context = std::make_shared(); + RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc, &new_context->merger)); + new_context->stage = producer->stage(); + context = new_context; } - context->register_producer(producer); - return Status::OK(); -} - -void LocalMergeContext::register_producer(std::shared_ptr producer) { - _producers.emplace_back(producer); - _merger->set_expected_producer_num(cast_set(_producers.size())); + context->producers.emplace_back(producer); + context->merger->set_expected_producer_num(cast_set(context->producers.size())); // Sync the local merger's stage from the producer so that outgoing merge RPCs // (via _push_to_remote) carry the correct recursive CTE round number. - _merger->set_stage(producer->stage()); + context->merger->set_stage(producer->stage()); + return Status::OK(); } std::string LocalMergeContext::debug_string() { - std::string result = fmt::format( - "stage: {}, {}\n", _stage, - _merger ? _merger->debug_string() : "local merge context merger is nullptr"); - for (const auto& producer : _producers) { + std::string result = + fmt::format("stage: {}, {}\n", stage, + merger ? merger->debug_string() : "local merge context merger is nullptr"); + for (const auto& producer : producers) { result += fmt::format("{}\n", producer->debug_string()); } return result; @@ -137,7 +134,7 @@ Status RuntimeFilterMgr::get_local_merge_context(int filter_id, uint32_t expecte if (!iter->second) { return Status::InternalError("local merge context is nullptr for filter_id: {}", filter_id); } - if (expected_stage != iter->second->stage()) { + if (expected_stage != iter->second->stage) { return Status::OK(); } *context = iter->second; @@ -259,7 +256,6 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptradd_rf_size(request->filter_size())) { - uint64_t received_sum_size = cnt_val.merger->get_received_sum_size(); cnt_val.sync_size_callbacks.resize(cnt_val.source_addrs.size()); for (size_t i = 0; i < cnt_val.source_addrs.size(); ++i) { auto& addr = cnt_val.source_addrs[i]; @@ -294,7 +290,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptrrequest_->set_filter_id(filter_id); - closure->request_->set_filter_size(received_sum_size); + closure->request_->set_filter_size(cnt_val.merger->get_received_sum_size()); stub->sync_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), closure.get()); closure.release(); @@ -310,7 +306,7 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) // Filter was removed during a recursive CTE stage reset; discard stale request. return Status::OK(); } - for (const auto& producer : context->producers()) { + for (const auto& producer : context->producers) { producer->set_synced_size(request->filter_size()); } return Status::OK(); diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h b/be/src/exec/runtime_filter/runtime_filter_mgr.h index a163781665522f..b0b05289e36e5c 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.h +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h @@ -28,7 +28,6 @@ #include #include #include -#include #include #include "common/status.h" @@ -56,27 +55,15 @@ template class HandleErrorBrpcCallback; class SyncSizeCallback; -class LocalMergeContext { -public: - LocalMergeContext(uint32_t stage, std::shared_ptr merger) - : _merger(std::move(merger)), _stage(stage) {} - - void register_producer(std::shared_ptr producer); +struct LocalMergeContext { std::string debug_string(); - uint32_t stage() const { return _stage; } - const std::shared_ptr& merger() const { return _merger; } - const std::vector>& producers() const { - return _producers; - } - -private: - std::shared_ptr _merger; - std::vector> _producers; + std::shared_ptr merger; + std::vector> producers; // Tracks the recursive CTE round. When a producer from a newer round // registers, RuntimeFilterMgr replaces the whole context and old in-flight // users keep the previous context alive through shared_ptr. - uint32_t _stage = 0; + uint32_t stage = 0; }; struct GlobalMergeContext { diff --git a/be/src/exec/runtime_filter/runtime_filter_producer.cpp b/be/src/exec/runtime_filter/runtime_filter_producer.cpp index fcff6f4405fa07..5a3429116272b1 100644 --- a/be/src/exec/runtime_filter/runtime_filter_producer.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_producer.cpp @@ -61,12 +61,12 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table return Status::OK(); } bool ready = false; - RETURN_IF_ERROR(context->merger()->merge_from(this, &ready)); + RETURN_IF_ERROR(context->merger->merge_from(this, &ready)); if (ready) { if (_has_remote_target) { - RETURN_IF_ERROR(_send_to_remote_targets(state, context->merger().get())); + RETURN_IF_ERROR(_send_to_remote_targets(state, context->merger.get())); } else { - RETURN_IF_ERROR(_send_to_local_targets(state, context->merger().get(), true)); + RETURN_IF_ERROR(_send_to_local_targets(state, context->merger.get(), true)); } } return Status::OK(); @@ -131,13 +131,13 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt return Status::OK(); } uint64_t received_sum_size = 0; - bool ready_to_sync = context->merger()->add_rf_size(local_filter_size); + bool ready_to_sync = context->merger->add_rf_size(local_filter_size); if (!ready_to_sync) { return Status::OK(); } - received_sum_size = context->merger()->get_received_sum_size(); + received_sum_size = context->merger->get_received_sum_size(); if (!_has_remote_target) { - for (const auto& filter : context->producers()) { + for (const auto& filter : context->producers) { filter->set_synced_size(received_sum_size); } return Status::OK(); diff --git a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp index e340d160a9948a..4dba455cd78e54 100644 --- a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp @@ -36,6 +36,7 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { FAIL_IF_ERROR_OR_CATCH_EXCEPTION( RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); merger->set_expected_producer_num(2); + ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED); bool ready = false; @@ -45,6 +46,7 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { producer->set_wrapper_state_and_ready_to_publish(first_product_state); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); ASSERT_FALSE(ready); + ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, first_expected_state); std::shared_ptr producer2; @@ -53,6 +55,7 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { producer2->set_wrapper_state_and_ready_to_publish(second_product_state); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get(), &ready)); ASSERT_TRUE(ready); + ASSERT_TRUE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, second_expected_state); } @@ -64,6 +67,7 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { FAIL_IF_ERROR_OR_CATCH_EXCEPTION( RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); merger->set_expected_producer_num(1); + ASSERT_FALSE(merger->ready()); bool ready = false; std::shared_ptr producer; @@ -73,6 +77,7 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { producer->set_wrapper_state_and_ready_to_publish(state); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); ASSERT_TRUE(ready); + ASSERT_TRUE(merger->ready()); PMergeFilterRequest request; void* data = nullptr; @@ -104,6 +109,7 @@ TEST_F(RuntimeFilterMergerTest, add_rf_size) { ASSERT_FALSE(merger->add_rf_size(123)); ASSERT_TRUE(merger->add_rf_size(1)); ASSERT_EQ(merger->get_received_sum_size(), 124); + ASSERT_FALSE(merger->ready()); try { ASSERT_TRUE(merger->add_rf_size(1)); @@ -118,6 +124,7 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) { auto desc = TRuntimeFilterDescBuilder().build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); merger->set_expected_producer_num(1); + ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED); bool ready = false; @@ -127,6 +134,7 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) { producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); ASSERT_TRUE(ready); + ASSERT_TRUE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY); std::shared_ptr producer2; diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp index e9c2df288fbbe8..536fb156aa8204 100644 --- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp @@ -124,9 +124,9 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { ->get_local_merge_context(filter_id, producer_filter->stage(), &context) .ok()); EXPECT_NE(context, nullptr); - EXPECT_NE(context->merger(), nullptr); - EXPECT_EQ(context->producers().size(), 1); - context->producers().front()->_rf_state = + EXPECT_NE(context->merger, nullptr); + EXPECT_EQ(context->producers.size(), 1); + context->producers.front()->_rf_state = RuntimeFilterProducer::State ::WAITING_FOR_SYNCED_SIZE; } { From 7d55640cc36903ac0028744526361fccf067a6ec Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 26 Jun 2026 19:41:38 +0800 Subject: [PATCH 4/4] update --- be/src/exec/runtime_filter/runtime_filter_mgr.cpp | 15 +++++++++------ be/src/exec/runtime_filter/runtime_filter_mgr.h | 10 +++++----- be/src/runtime/runtime_state.cpp | 2 +- .../runtime_filter/runtime_filter_mgr_test.cpp | 11 +++++------ 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp index 1545333e3481d8..bcb61358253b18 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp @@ -75,7 +75,7 @@ Status RuntimeFilterMgr::register_consumer_filter( return Status::OK(); } -Status RuntimeFilterMgr::register_local_merger_producer_filter( +Status RuntimeFilterMgr::register_local_merge_producer_filter( const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, std::shared_ptr producer) { if (!_is_global) [[unlikely]] { @@ -108,7 +108,7 @@ Status RuntimeFilterMgr::register_local_merger_producer_filter( return Status::OK(); } -std::string LocalMergeContext::debug_string() { +std::string LocalMergeContext::debug_string() const { std::string result = fmt::format("stage: {}, {}\n", stage, merger ? merger->debug_string() : "local merge context merger is nullptr"); @@ -314,20 +314,23 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) std::string RuntimeFilterMgr::debug_string() { std::string result = "Local Merger Info:\n"; - std::vector>> local_merge_contexts; + std::vector> local_merge_contexts; std::vector> consumers; { std::lock_guard l(_lock); for (auto& [filter_id, ctx] : _local_merge_map) { - local_merge_contexts.emplace_back(filter_id, ctx); + if (ctx) { + local_merge_contexts.emplace_back(filter_id, *ctx); + } else { + result += fmt::format("filter_id: {}, local merge context is nullptr\n", filter_id); + } } for (const auto& [filter_id, filter_consumers] : _consumer_map) { consumers.insert(consumers.end(), filter_consumers.begin(), filter_consumers.end()); } } for (const auto& [filter_id, ctx] : local_merge_contexts) { - result += fmt::format("filter_id: {}, {}", filter_id, - ctx ? ctx->debug_string() : "local merge context is nullptr\n"); + result += fmt::format("filter_id: {}, {}", filter_id, ctx.debug_string()); } result += "Consumer Info:\n"; for (const auto& consumer : consumers) { diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h b/be/src/exec/runtime_filter/runtime_filter_mgr.h index b0b05289e36e5c..a5170c5b85d142 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.h +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h @@ -56,7 +56,7 @@ class HandleErrorBrpcCallback; class SyncSizeCallback; struct LocalMergeContext { - std::string debug_string(); + std::string debug_string() const; std::shared_ptr merger; std::vector> producers; @@ -97,9 +97,9 @@ class RuntimeFilterMgr { int node_id, std::shared_ptr* consumer_filter); - Status register_local_merger_producer_filter(const QueryContext* query_ctx, - const TRuntimeFilterDesc& desc, - std::shared_ptr producer); + Status register_local_merge_producer_filter(const QueryContext* query_ctx, + const TRuntimeFilterDesc& desc, + std::shared_ptr producer); Status get_local_merge_context(int filter_id, uint32_t expected_stage, std::shared_ptr* context); @@ -119,7 +119,7 @@ class RuntimeFilterMgr { std::lock_guard l(_lock); _consumer_map.erase(filter_id); // NOTE: _local_merge_map is NOT erased here. It is replaced lazily in - // register_local_merger_producer_filter when a producer from a newer + // register_local_merge_producer_filter when a producer from a newer // recursive CTE round registers. Erasing eagerly here would race with // multi-fragment REBUILD: a consumer-only fragment's remove_filter could // delete the entry that the producer fragment just re-registered. diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 9e173acab7e190..9b5a9b81b2c141 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -499,7 +499,7 @@ Status RuntimeState::register_producer_runtime_filter( DORIS_CHECK(pfc); (*producer_filter)->set_stage(pfc->rec_cte_stage()); } - RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter( + RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter( _query_ctx, desc, *producer_filter)); return Status::OK(); } diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp index 536fb156aa8204..5a50e762c2527f 100644 --- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp @@ -77,12 +77,12 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { // producer_filter should not be nullptr EXPECT_FALSE( global_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) + ->register_local_merge_producer_filter(ctx.get(), desc, producer_filter) .ok()); // local merge filter should not be registered in local mgr EXPECT_FALSE( local_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) + ->register_local_merge_producer_filter(ctx.get(), desc, producer_filter) .ok()); // producer should not registered in global mgr EXPECT_FALSE(global_runtime_filter_mgr @@ -116,10 +116,9 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { ->get_local_merge_context(filter_id, producer_filter->stage(), &context) .ok()); // Register local merge filter - EXPECT_TRUE( - global_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) - .ok()); + EXPECT_TRUE(global_runtime_filter_mgr + ->register_local_merge_producer_filter(ctx.get(), desc, producer_filter) + .ok()); EXPECT_TRUE(global_runtime_filter_mgr ->get_local_merge_context(filter_id, producer_filter->stage(), &context) .ok());