From 2a1ef376fadd28084325d272d1a559cb8ca1b0fc Mon Sep 17 00:00:00 2001 From: liaoxin Date: Tue, 27 Jan 2026 22:41:06 +0800 Subject: [PATCH] [fix](load_stream) Fix flush token deadlock by ensuring wait_for_flush_tasks is called before destruction Problem: When TabletStream is destroyed without pre_close() being called (e.g., on_idle_timeout scenario), the _flush_token destructor calls shutdown() which triggers deadlock detection if called from the pool thread. Root cause: - on_idle_timeout() directly calls brpc::StreamClose() without calling LoadStream::close() - This triggers the destruction chain without calling pre_close() on TabletStreams - If flush tasks are still running, TabletStream may be destroyed in pool thread Solution: - Add IndexStream::~IndexStream() to ensure wait_for_flush_tasks() is called on all TabletStreams - Add TabletStream::wait_for_flush_tasks() to wait for all flush tasks to complete - This ensures _flush_token is properly handled before TabletStream destruction - Revert commit c18ef17f7e4 (shared_from_this) as it is no longer needed --- be/src/runtime/load_stream.cpp | 77 ++++++++++++++++++++++------------ be/src/runtime/load_stream.h | 7 +++- 2 files changed, 57 insertions(+), 27 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 2a324f6a6cefad..f94e7864377542 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -147,14 +147,12 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data uint32_t new_segid = mapping->at(segid); DCHECK(new_segid != std::numeric_limits::max()); butil::IOBuf buf = data->movable(); - auto self = shared_from_this(); - auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable { - signal::set_signal_task_id(self->_load_id); + auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable { + signal::set_signal_task_id(_load_id); g_load_stream_flush_running_threads << -1; - auto st = - self->_load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); + auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); if (!st.ok() && !config::is_cloud_mode()) { - auto res = ExecEnv::get_tablet(self->_id); + auto res = ExecEnv::get_tablet(_id); TabletSharedPtr tablet = res.has_value() ? std::dynamic_pointer_cast(res.value()) : nullptr; if (tablet) { @@ -165,7 +163,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", { file_type = static_cast(-1); }); if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { - st = self->_load_stream_writer->close_writer(new_segid, file_type); + st = _load_stream_writer->close_writer(new_segid, file_type); } else { st = Status::InternalError( "appent data failed, file type error, file type = {}, " @@ -176,8 +174,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", { st = Status::InternalError("fault injection"); }); if (!st.ok()) { - self->_status.update(st); - LOG(WARNING) << "write data failed " << st << ", " << *self; + _status.update(st); + LOG(WARNING) << "write data failed " << st << ", " << *this; } }; auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks; @@ -249,15 +247,14 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data } DCHECK(new_segid != std::numeric_limits::max()); - auto self = shared_from_this(); - auto add_segment_func = [self, new_segid, stat]() { - signal::set_signal_task_id(self->_load_id); - auto st = self->_load_stream_writer->add_segment(new_segid, stat); + auto add_segment_func = [this, new_segid, stat]() { + signal::set_signal_task_id(_load_id); + auto st = _load_stream_writer->add_segment(new_segid, stat); DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", { st = Status::InternalError("fault injection"); }); if (!st.ok()) { - self->_status.update(st); - LOG(INFO) << "add segment failed " << *self; + _status.update(st); + LOG(INFO) << "add segment failed " << *this; } }; Status st = Status::OK(); @@ -277,9 +274,8 @@ Status TabletStream::_run_in_heavy_work_pool(std::function fn) { std::unique_lock lock(mu); bthread::ConditionVariable cv; auto st = Status::OK(); - auto self = shared_from_this(); - auto func = [self, &mu, &cv, &st, &fn] { - signal::set_signal_task_id(self->_load_id); + auto func = [this, &mu, &cv, &st, &fn] { + signal::set_signal_task_id(_load_id); st = fn(); std::lock_guard lock(mu); cv.notify_one(); @@ -293,21 +289,40 @@ Status TabletStream::_run_in_heavy_work_pool(std::function fn) { return st; } -void TabletStream::pre_close() { +void TabletStream::wait_for_flush_tasks() { + { + std::lock_guard lock_guard(_lock); + if (_flush_tasks_done) { + return; + } + _flush_tasks_done = true; + } + if (!_status.ok()) { - // cancel all pending tasks, wait all running tasks to finish _flush_token->shutdown(); return; } - SCOPED_TIMER(_close_wait_timer); - _status.update(_run_in_heavy_work_pool([this]() { + // Note: Do not use SCOPED_TIMER here because this function may be called + // from IndexStream::~IndexStream() during LoadStream destruction, at which + // point the RuntimeProfile (and _close_wait_timer) may already be destroyed. + // Use heavy_work_pool to avoid blocking bthread + auto st = _run_in_heavy_work_pool([this]() { _flush_token->wait(); return Status::OK(); - })); - // it is necessary to check status after wait_func, - // for create_rowset could fail during add_segment when loading to MOW table, - // in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump. + }); + if (!st.ok()) { + // If heavy_work_pool is unavailable, fall back to shutdown + // which will cancel pending tasks and wait for running tasks + _flush_token->shutdown(); + _status.update(st); + } +} + +void TabletStream::pre_close() { + SCOPED_TIMER(_close_wait_timer); + wait_for_flush_tasks(); + if (!_status.ok()) { return; } @@ -346,6 +361,16 @@ IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); } +IndexStream::~IndexStream() { + // Ensure all TabletStreams have their flush tokens properly handled before destruction. + // In normal flow, close() should have called pre_close() on all tablet streams. + // But if IndexStream is destroyed without close() being called (e.g., on_idle_timeout), + // we need to wait for flush tasks here to ensure flush tokens are properly shut down. + for (auto& [_, tablet_stream] : _tablet_streams_map) { + tablet_stream->wait_for_flush_tasks(); + } +} + Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { SCOPED_TIMER(_append_data_timer); int64_t tablet_id = header.tablet_id(); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index d3f6e02558ed30..be147f9415c77f 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -42,7 +42,7 @@ class OlapTableSchemaParam; // origin_segid(index) -> new_segid(value in vector) using SegIdMapping = std::vector; using FailedTablets = std::vector>; -class TabletStream : public std::enable_shared_from_this { +class TabletStream { public: TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile); @@ -54,6 +54,9 @@ class TabletStream : public std::enable_shared_from_this { Status add_segment(const PStreamHeader& header, butil::IOBuf* data); void add_num_segments(int64_t num_segments) { _num_segments += num_segments; } void disable_num_segments_check() { _check_num_segments = false; } + // Wait for all pending flush tasks to complete and shut down the flush token. + // Safe to call multiple times. + void wait_for_flush_tasks(); void pre_close(); Status close(); int64_t id() const { return _id; } @@ -70,6 +73,7 @@ class TabletStream : public std::enable_shared_from_this { std::atomic _next_segid; int64_t _num_segments = 0; bool _check_num_segments = true; + bool _flush_tasks_done = false; bthread::Mutex _lock; AtomicStatus _status; PUniqueId _load_id; @@ -88,6 +92,7 @@ class IndexStream { IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, std::shared_ptr schema, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile); + ~IndexStream(); Status append_data(const PStreamHeader& header, butil::IOBuf* data);