diff --git a/be/src/exec/runtime_filter/runtime_filter_merger.h b/be/src/exec/runtime_filter/runtime_filter_merger.h index 9405d2405a52fb..267ffc05e6675b 100644 --- a/be/src/exec/runtime_filter/runtime_filter_merger.h +++ b/be/src/exec/runtime_filter/runtime_filter_merger.h @@ -46,6 +46,7 @@ class RuntimeFilterMerger : public RuntimeFilter { } std::string debug_string() override { + std::unique_lock l(_rmtx); return fmt::format( "Merger: ({}, expected_producer_num: {}, received_producer_num: {}, " "received_rf_size_num: {}, received_sum_size: {})", @@ -54,12 +55,15 @@ class RuntimeFilterMerger : public RuntimeFilter { } // If input is a disabled predicate, the final result is a disabled predicate. - Status merge_from(const RuntimeFilter* other) { + // Returns true only for the call that makes the merger ready. + Status merge_from(const RuntimeFilter* other, bool* ready) { + std::unique_lock l(_rmtx); _received_producer_num++; if (_expected_producer_num < _received_producer_num) { return Status::InternalError( "runtime filter merger input product more than expected, {}", debug_string()); } + *ready = _received_producer_num == _expected_producer_num; if (_received_producer_num == _expected_producer_num) { _rf_state = State::READY; } @@ -72,6 +76,7 @@ class RuntimeFilterMerger : public RuntimeFilter { } void set_expected_producer_num(int num) { + std::unique_lock l(_rmtx); if (_received_producer_num > 0 || _received_rf_size_num > 0) { throw Exception(ErrorCode::INTERNAL_ERROR, "runtime filter merger set expected producer after receive data, {}", @@ -80,9 +85,13 @@ class RuntimeFilterMerger : public RuntimeFilter { _expected_producer_num = num; } - int get_expected_producer_num() const { return _expected_producer_num; } + int get_expected_producer_num() { + std::unique_lock l(_rmtx); + return _expected_producer_num; + } bool add_rf_size(uint64_t size) { + std::unique_lock l(_rmtx); _received_rf_size_num++; if (_expected_producer_num < _received_rf_size_num) { throw Exception(ErrorCode::INTERNAL_ERROR, @@ -93,7 +102,10 @@ class RuntimeFilterMerger : public RuntimeFilter { return (_received_rf_size_num == _expected_producer_num); } - uint64_t get_received_sum_size() const { return _received_sum_size; } + uint64_t get_received_sum_size() { + std::unique_lock l(_rmtx); + return _received_sum_size; + } bool ready() const { return _rf_state == State::READY; } diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp index f8e687f09b7f7b..bcb61358253b18 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp @@ -75,7 +75,7 @@ Status RuntimeFilterMgr::register_consumer_filter( return Status::OK(); } -Status RuntimeFilterMgr::register_local_merger_producer_filter( +Status RuntimeFilterMgr::register_local_merge_producer_filter( const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, std::shared_ptr producer) { if (!_is_global) [[unlikely]] { @@ -91,57 +91,53 @@ Status RuntimeFilterMgr::register_local_merger_producer_filter( SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; - LocalMergeContext* context; - { - std::lock_guard l(_lock); - context = &_local_merge_map[key]; // may inplace construct default object + std::lock_guard l(_lock); + auto& context = _local_merge_map[key]; + if (!context || producer->stage() > context->stage) { + auto new_context = std::make_shared(); + RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc, &new_context->merger)); + new_context->stage = producer->stage(); + context = new_context; } - RETURN_IF_ERROR(context->register_producer(query_ctx, &desc, producer)); - return Status::OK(); -} - -Status LocalMergeContext::register_producer(const QueryContext* query_ctx, - const TRuntimeFilterDesc* desc, - std::shared_ptr producer) { - std::lock_guard l(mtx); - if (producer->stage() > stage) { - // New recursive CTE round: discard stale merger and producers from - // the previous round and recreate the merger for the new round. - merger.reset(); - producers.clear(); - stage = producer->stage(); - } - if (!merger) { - RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger)); - } - producers.emplace_back(producer); - merger->set_expected_producer_num(cast_set(producers.size())); + context->producers.emplace_back(producer); + context->merger->set_expected_producer_num(cast_set(context->producers.size())); // Sync the local merger's stage from the producer so that outgoing merge RPCs // (via _push_to_remote) carry the correct recursive CTE round number. - merger->set_stage(producer->stage()); + context->merger->set_stage(producer->stage()); return Status::OK(); } -Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id, - LocalMergeContext** local_merge_filters) { +std::string LocalMergeContext::debug_string() const { + std::string result = + fmt::format("stage: {}, {}\n", stage, + merger ? merger->debug_string() : "local merge context merger is nullptr"); + for (const auto& producer : producers) { + result += fmt::format("{}\n", producer->debug_string()); + } + return result; +} + +Status RuntimeFilterMgr::get_local_merge_context(int filter_id, uint32_t expected_stage, + std::shared_ptr* context) { if (!_is_global) [[unlikely]] { return Status::InternalError( "A local merge filter can not be registered in Local RuntimeFilterMgr"); } + context->reset(); std::lock_guard l(_lock); auto iter = _local_merge_map.find(filter_id); if (iter == _local_merge_map.end()) { - // Filter may have been removed during a recursive CTE stage reset. // Return OK with nullptr to let the caller skip gracefully. - *local_merge_filters = nullptr; return Status::OK(); } - *local_merge_filters = &iter->second; - if (!iter->second.merger) { - return Status::InternalError("local merge context merger is nullptr for filter_id: {}", - filter_id); + if (!iter->second) { + return Status::InternalError("local merge context is nullptr for filter_id: {}", filter_id); } + if (expected_stage != iter->second->stage) { + return Status::OK(); + } + *context = iter->second; return Status::OK(); } @@ -304,13 +300,13 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptrfilter_id(), &local_merge_filters)); - if (local_merge_filters == nullptr) { + std::shared_ptr context; + RETURN_IF_ERROR(get_local_merge_context(request->filter_id(), request->stage(), &context)); + if (!context) { // Filter was removed during a recursive CTE stage reset; discard stale request. return Status::OK(); } - for (auto producer : local_merge_filters->producers) { + for (const auto& producer : context->producers) { producer->set_synced_size(request->filter_size()); } return Status::OK(); @@ -318,18 +314,27 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) std::string RuntimeFilterMgr::debug_string() { std::string result = "Local Merger Info:\n"; - std::lock_guard l(_lock); - for (const auto& [filter_id, ctx] : _local_merge_map) { - result += fmt::format("{}\n", ctx.merger->debug_string()); - for (const auto& producer : ctx.producers) { - result += fmt::format("{}\n", producer->debug_string()); + std::vector> local_merge_contexts; + std::vector> consumers; + { + std::lock_guard l(_lock); + for (auto& [filter_id, ctx] : _local_merge_map) { + if (ctx) { + local_merge_contexts.emplace_back(filter_id, *ctx); + } else { + result += fmt::format("filter_id: {}, local merge context is nullptr\n", filter_id); + } + } + for (const auto& [filter_id, filter_consumers] : _consumer_map) { + consumers.insert(consumers.end(), filter_consumers.begin(), filter_consumers.end()); } } + for (const auto& [filter_id, ctx] : local_merge_contexts) { + result += fmt::format("filter_id: {}, {}", filter_id, ctx.debug_string()); + } result += "Consumer Info:\n"; - for (const auto& [filter_id, consumers] : _consumer_map) { - for (const auto& consumer : consumers) { - result += fmt::format("{}\n", consumer->debug_string()); - } + for (const auto& consumer : consumers) { + result += fmt::format("{}\n", consumer->debug_string()); } return result; } @@ -373,10 +378,9 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr q RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data)); - RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get())); + RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get(), &is_ready)); cnt_val.arrive_id.insert(UniqueId(request->fragment_instance_id())); - is_ready = cnt_val.merger->ready(); // update is_ready in locked scope } if (is_ready) { diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h b/be/src/exec/runtime_filter/runtime_filter_mgr.h index 536eb63e152caa..a5170c5b85d142 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.h +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h @@ -56,15 +56,14 @@ class HandleErrorBrpcCallback; class SyncSizeCallback; struct LocalMergeContext { - std::mutex mtx; + std::string debug_string() const; + std::shared_ptr merger; std::vector> producers; // Tracks the recursive CTE round. When a producer from a newer round - // registers, the context is reset (merger recreated, old producers dropped). + // registers, RuntimeFilterMgr replaces the whole context and old in-flight + // users keep the previous context alive through shared_ptr. uint32_t stage = 0; - - Status register_producer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, - std::shared_ptr producer); }; struct GlobalMergeContext { @@ -98,11 +97,12 @@ class RuntimeFilterMgr { int node_id, std::shared_ptr* consumer_filter); - Status register_local_merger_producer_filter(const QueryContext* query_ctx, - const TRuntimeFilterDesc& desc, - std::shared_ptr producer); + Status register_local_merge_producer_filter(const QueryContext* query_ctx, + const TRuntimeFilterDesc& desc, + std::shared_ptr producer); - Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** local_merge_filters); + Status get_local_merge_context(int filter_id, uint32_t expected_stage, + std::shared_ptr* context); // Create local producer. This producer is hold by RuntimeFilterProducerHelper. Status register_producer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, @@ -118,8 +118,8 @@ class RuntimeFilterMgr { void remove_filter(int32_t filter_id) { std::lock_guard l(_lock); _consumer_map.erase(filter_id); - // NOTE: _local_merge_map is NOT erased here. It is reset lazily in - // LocalMergeContext::register_producer when a producer from a newer + // NOTE: _local_merge_map is NOT erased here. It is replaced lazily in + // register_local_merge_producer_filter when a producer from a newer // recursive CTE round registers. Erasing eagerly here would race with // multi-fragment REBUILD: a consumer-only fragment's remove_filter could // delete the entry that the producer fragment just re-registered. @@ -144,7 +144,7 @@ class RuntimeFilterMgr { // key: "filter-id" std::map>> _consumer_map; std::set _producer_id_set; - std::map _local_merge_map; + std::map> _local_merge_map; std::unique_ptr _tracker; diff --git a/be/src/exec/runtime_filter/runtime_filter_producer.cpp b/be/src/exec/runtime_filter/runtime_filter_producer.cpp index 7da8c6871121f6..5a3429116272b1 100644 --- a/be/src/exec/runtime_filter/runtime_filter_producer.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_producer.cpp @@ -53,16 +53,16 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table // when global consumer not exist, send_to_local_targets will do nothing, so merge rf is useless return Status::OK(); } - LocalMergeContext* context = nullptr; - RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _wrapper->filter_id(), &context)); - if (context == nullptr) { + std::shared_ptr context; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context( + _wrapper->filter_id(), _stage, &context)); + if (!context) { // Filter was removed during a recursive CTE stage reset; this producer is stale. return Status::OK(); } - std::lock_guard l(context->mtx); - RETURN_IF_ERROR(context->merger->merge_from(this)); - if (context->merger->ready()) { + bool ready = false; + RETURN_IF_ERROR(context->merger->merge_from(this, &ready)); + if (ready) { if (_has_remote_target) { RETURN_IF_ERROR(_send_to_remote_targets(state, context->merger.get())); } else { @@ -123,26 +123,26 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt set_state(State::WAITING_FOR_SYNCED_SIZE); if (_need_do_merge(state)) { - LocalMergeContext* merger_context = nullptr; - RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _wrapper->filter_id(), &merger_context)); - if (merger_context == nullptr) { + std::shared_ptr context; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context( + _wrapper->filter_id(), _stage, &context)); + if (!context) { // Filter was removed during a recursive CTE stage reset; this producer is stale. return Status::OK(); } - std::lock_guard merger_lock(merger_context->mtx); - if (merger_context->merger->add_rf_size(local_filter_size)) { - if (!_has_remote_target) { - for (auto filter : merger_context->producers) { - filter->set_synced_size(merger_context->merger->get_received_sum_size()); - } - return Status::OK(); - } else { - local_filter_size = merger_context->merger->get_received_sum_size(); + uint64_t received_sum_size = 0; + bool ready_to_sync = context->merger->add_rf_size(local_filter_size); + if (!ready_to_sync) { + return Status::OK(); + } + received_sum_size = context->merger->get_received_sum_size(); + if (!_has_remote_target) { + for (const auto& filter : context->producers) { + filter->set_synced_size(received_sum_size); } - } else { return Status::OK(); } + local_filter_size = received_sum_size; } else if (!_has_remote_target) { set_synced_size(local_filter_size); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 9e173acab7e190..9b5a9b81b2c141 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -499,7 +499,7 @@ Status RuntimeState::register_producer_runtime_filter( DORIS_CHECK(pfc); (*producer_filter)->set_stage(pfc->rec_cte_stage()); } - RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter( + RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter( _query_ctx, desc, *producer_filter)); return Status::OK(); } diff --git a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp index da9d5b66291795..4dba455cd78e54 100644 --- a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp @@ -39,11 +39,13 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED); + bool ready = false; std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(first_product_state); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); + ASSERT_FALSE(ready); ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, first_expected_state); @@ -51,7 +53,8 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[1]->register_producer_runtime_filter(desc, &producer2)); producer2->set_wrapper_state_and_ready_to_publish(second_product_state); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get())); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get(), &ready)); + ASSERT_TRUE(ready); ASSERT_TRUE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, second_expected_state); } @@ -66,12 +69,14 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { merger->set_expected_producer_num(1); ASSERT_FALSE(merger->ready()); + bool ready = false; std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123)); producer->set_wrapper_state_and_ready_to_publish(state); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); + ASSERT_TRUE(ready); ASSERT_TRUE(merger->ready()); PMergeFilterRequest request; @@ -122,18 +127,21 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) { ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED); + bool ready = false; std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); // ready wrapper + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); + ASSERT_TRUE(ready); + ASSERT_TRUE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY); std::shared_ptr producer2; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[1]->register_producer_runtime_filter(desc, &producer2)); producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); - auto st = merger->merge_from(producer2.get()); + auto st = merger->merge_from(producer2.get(), &ready); ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR); } diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp index d6ccc080961333..5a50e762c2527f 100644 --- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp @@ -77,12 +77,12 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { // producer_filter should not be nullptr EXPECT_FALSE( global_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) + ->register_local_merge_producer_filter(ctx.get(), desc, producer_filter) .ok()); // local merge filter should not be registered in local mgr EXPECT_FALSE( local_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) + ->register_local_merge_producer_filter(ctx.get(), desc, producer_filter) .ok()); // producer should not registered in global mgr EXPECT_FALSE(global_runtime_filter_mgr @@ -103,28 +103,29 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { .ok()); EXPECT_NE(producer_filter, nullptr); - LocalMergeContext* local_merge_filters = nullptr; + std::shared_ptr context; // filter_id not yet registered: global mgr returns OK with nullptr // (graceful skip for recursive CTE stage reset). EXPECT_TRUE(global_runtime_filter_mgr - ->get_local_merge_producer_filters(filter_id, &local_merge_filters) + ->get_local_merge_context(filter_id, producer_filter->stage(), &context) .ok()); - EXPECT_EQ(local_merge_filters, nullptr); + EXPECT_EQ(context, nullptr); // local mgr always returns error (not supported) - EXPECT_FALSE(local_runtime_filter_mgr - ->get_local_merge_producer_filters(filter_id, &local_merge_filters) - .ok()); - // Register local merge filter - EXPECT_TRUE( - global_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) + EXPECT_FALSE( + local_runtime_filter_mgr + ->get_local_merge_context(filter_id, producer_filter->stage(), &context) .ok()); + // Register local merge filter + EXPECT_TRUE(global_runtime_filter_mgr + ->register_local_merge_producer_filter(ctx.get(), desc, producer_filter) + .ok()); EXPECT_TRUE(global_runtime_filter_mgr - ->get_local_merge_producer_filters(filter_id, &local_merge_filters) + ->get_local_merge_context(filter_id, producer_filter->stage(), &context) .ok()); - EXPECT_NE(local_merge_filters, nullptr); - EXPECT_EQ(local_merge_filters->producers.size(), 1); - local_merge_filters->producers.front()->_rf_state = + EXPECT_NE(context, nullptr); + EXPECT_NE(context->merger, nullptr); + EXPECT_EQ(context->producers.size(), 1); + context->producers.front()->_rf_state = RuntimeFilterProducer::State ::WAITING_FOR_SYNCED_SIZE; } {