From fd50f53c38660e10540fd3368f3e2f1a418c3186 Mon Sep 17 00:00:00 2001 From: Xin Liao Date: Fri, 23 Jan 2026 16:04:04 +0800 Subject: [PATCH] [fix](load_stream) Fix use-after-free in TabletStream async lambdas (#60148) The async lambdas in TabletStream captured raw 'this' pointer, which could become dangling when the TabletStream object is destroyed before the async task completes (e.g., when thrift connection is broken). Fix by using shared_from_this() to capture shared_ptr instead of raw pointer, ensuring the object stays alive until all async tasks complete. --- be/src/runtime/load_stream.cpp | 32 ++++++++++++++++++-------------- be/src/runtime/load_stream.h | 2 +- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 42ac15ea51ac43..2a324f6a6cefad 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -147,12 +147,14 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data uint32_t new_segid = mapping->at(segid); DCHECK(new_segid != std::numeric_limits::max()); butil::IOBuf buf = data->movable(); - auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable { - signal::set_signal_task_id(_load_id); + auto self = shared_from_this(); + auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable { + signal::set_signal_task_id(self->_load_id); g_load_stream_flush_running_threads << -1; - auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); + auto st = + self->_load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); if (!st.ok() && !config::is_cloud_mode()) { - auto res = ExecEnv::get_tablet(_id); + auto res = ExecEnv::get_tablet(self->_id); TabletSharedPtr tablet = res.has_value() ? std::dynamic_pointer_cast(res.value()) : nullptr; if (tablet) { @@ -163,7 +165,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", { file_type = static_cast(-1); }); if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { - st = _load_stream_writer->close_writer(new_segid, file_type); + st = self->_load_stream_writer->close_writer(new_segid, file_type); } else { st = Status::InternalError( "appent data failed, file type error, file type = {}, " @@ -174,8 +176,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", { st = Status::InternalError("fault injection"); }); if (!st.ok()) { - _status.update(st); - LOG(WARNING) << "write data failed " << st << ", " << *this; + self->_status.update(st); + LOG(WARNING) << "write data failed " << st << ", " << *self; } }; auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks; @@ -247,14 +249,15 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data } DCHECK(new_segid != std::numeric_limits::max()); - auto add_segment_func = [this, new_segid, stat]() { - signal::set_signal_task_id(_load_id); - auto st = _load_stream_writer->add_segment(new_segid, stat); + auto self = shared_from_this(); + auto add_segment_func = [self, new_segid, stat]() { + signal::set_signal_task_id(self->_load_id); + auto st = self->_load_stream_writer->add_segment(new_segid, stat); DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", { st = Status::InternalError("fault injection"); }); if (!st.ok()) { - _status.update(st); - LOG(INFO) << "add segment failed " << *this; + self->_status.update(st); + LOG(INFO) << "add segment failed " << *self; } }; Status st = Status::OK(); @@ -274,8 +277,9 @@ Status TabletStream::_run_in_heavy_work_pool(std::function fn) { std::unique_lock lock(mu); bthread::ConditionVariable cv; auto st = Status::OK(); - auto func = [this, &mu, &cv, &st, &fn] { - signal::set_signal_task_id(_load_id); + auto self = shared_from_this(); + auto func = [self, &mu, &cv, &st, &fn] { + signal::set_signal_task_id(self->_load_id); st = fn(); std::lock_guard lock(mu); cv.notify_one(); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 9aefa3d9093ea4..d3f6e02558ed30 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -42,7 +42,7 @@ class OlapTableSchemaParam; // origin_segid(index) -> new_segid(value in vector) using SegIdMapping = std::vector; using FailedTablets = std::vector>; -class TabletStream { +class TabletStream : public std::enable_shared_from_this { public: TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);