From 1cdf44902ed42e446d9c346b6a62c721ee11ff59 Mon Sep 17 00:00:00 2001 From: liaoxin Date: Thu, 22 Jan 2026 16:54:05 +0800 Subject: [PATCH] [fix](load_stream) Fix use-after-free in TabletStream async lambdas The async lambdas in TabletStream captured raw 'this' pointer, which could become dangling when the TabletStream object is destroyed before the async task completes (e.g., when thrift connection is broken). Fix by using shared_from_this() to capture shared_ptr instead of raw pointer, ensuring the object stays alive until all async tasks complete. --- be/src/runtime/load_stream.cpp | 32 ++++++++++++++++++-------------- be/src/runtime/load_stream.h | 2 +- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index b00681427bb7c4..6a291d143ff1e3 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -148,12 +148,14 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data uint32_t new_segid = mapping->at(segid); DCHECK(new_segid != std::numeric_limits::max()); butil::IOBuf buf = data->movable(); - auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable { - signal::set_signal_task_id(_load_id); + auto self = shared_from_this(); + auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable { + signal::set_signal_task_id(self->_load_id); g_load_stream_flush_running_threads << -1; - auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); + auto st = + self->_load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); if (!st.ok() && !config::is_cloud_mode()) { - auto res = ExecEnv::get_tablet(_id); + auto res = ExecEnv::get_tablet(self->_id); TabletSharedPtr tablet = res.has_value() ? std::dynamic_pointer_cast(res.value()) : nullptr; if (tablet) { @@ -164,7 +166,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", { file_type = static_cast(-1); }); if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { - st = _load_stream_writer->close_writer(new_segid, file_type); + st = self->_load_stream_writer->close_writer(new_segid, file_type); } else { st = Status::InternalError( "appent data failed, file type error, file type = {}, " @@ -175,8 +177,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", { st = Status::InternalError("fault injection"); }); if (!st.ok()) { - _status.update(st); - LOG(WARNING) << "write data failed " << st << ", " << *this; + self->_status.update(st); + LOG(WARNING) << "write data failed " << st << ", " << *self; } }; auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks; @@ -248,14 +250,15 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data } DCHECK(new_segid != std::numeric_limits::max()); - auto add_segment_func = [this, new_segid, stat]() { - signal::set_signal_task_id(_load_id); - auto st = _load_stream_writer->add_segment(new_segid, stat); + auto self = shared_from_this(); + auto add_segment_func = [self, new_segid, stat]() { + signal::set_signal_task_id(self->_load_id); + auto st = self->_load_stream_writer->add_segment(new_segid, stat); DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", { st = Status::InternalError("fault injection"); }); if (!st.ok()) { - _status.update(st); - LOG(INFO) << "add segment failed " << *this; + self->_status.update(st); + LOG(INFO) << "add segment failed " << *self; } }; Status st = Status::OK(); @@ -275,8 +278,9 @@ Status TabletStream::_run_in_heavy_work_pool(std::function fn) { std::unique_lock lock(mu); bthread::ConditionVariable cv; auto st = Status::OK(); - auto func = [this, &mu, &cv, &st, &fn] { - signal::set_signal_task_id(_load_id); + auto self = shared_from_this(); + auto func = [self, &mu, &cv, &st, &fn] { + signal::set_signal_task_id(self->_load_id); st = fn(); std::lock_guard lock(mu); cv.notify_one(); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 20d9cc63254ed1..1dd45d7465095e 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -42,7 +42,7 @@ class OlapTableSchemaParam; // origin_segid(index) -> new_segid(value in vector) using SegIdMapping = std::vector; using FailedTablets = std::vector>; -class TabletStream { +class TabletStream : public std::enable_shared_from_this { public: TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);