diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 42ac15ea51ac43..2a324f6a6cefad 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -147,12 +147,14 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data uint32_t new_segid = mapping->at(segid); DCHECK(new_segid != std::numeric_limits::max()); butil::IOBuf buf = data->movable(); - auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable { - signal::set_signal_task_id(_load_id); + auto self = shared_from_this(); + auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable { + signal::set_signal_task_id(self->_load_id); g_load_stream_flush_running_threads << -1; - auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); + auto st = + self->_load_stream_writer->append_data(new_segid, header.offset(), buf, file_type); if (!st.ok() && !config::is_cloud_mode()) { - auto res = ExecEnv::get_tablet(_id); + auto res = ExecEnv::get_tablet(self->_id); TabletSharedPtr tablet = res.has_value() ? std::dynamic_pointer_cast(res.value()) : nullptr; if (tablet) { @@ -163,7 +165,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type", { file_type = static_cast(-1); }); if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) { - st = _load_stream_writer->close_writer(new_segid, file_type); + st = self->_load_stream_writer->close_writer(new_segid, file_type); } else { st = Status::InternalError( "appent data failed, file type error, file type = {}, " @@ -174,8 +176,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DBUG_EXECUTE_IF("TabletStream.append_data.append_failed", { st = Status::InternalError("fault injection"); }); if (!st.ok()) { - _status.update(st); - LOG(WARNING) << "write data failed " << st << ", " << *this; + self->_status.update(st); + LOG(WARNING) << "write data failed " << st << ", " << *self; } }; auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks; @@ -247,14 +249,15 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data } DCHECK(new_segid != std::numeric_limits::max()); - auto add_segment_func = [this, new_segid, stat]() { - signal::set_signal_task_id(_load_id); - auto st = _load_stream_writer->add_segment(new_segid, stat); + auto self = shared_from_this(); + auto add_segment_func = [self, new_segid, stat]() { + signal::set_signal_task_id(self->_load_id); + auto st = self->_load_stream_writer->add_segment(new_segid, stat); DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed", { st = Status::InternalError("fault injection"); }); if (!st.ok()) { - _status.update(st); - LOG(INFO) << "add segment failed " << *this; + self->_status.update(st); + LOG(INFO) << "add segment failed " << *self; } }; Status st = Status::OK(); @@ -274,8 +277,9 @@ Status TabletStream::_run_in_heavy_work_pool(std::function fn) { std::unique_lock lock(mu); bthread::ConditionVariable cv; auto st = Status::OK(); - auto func = [this, &mu, &cv, &st, &fn] { - signal::set_signal_task_id(_load_id); + auto self = shared_from_this(); + auto func = [self, &mu, &cv, &st, &fn] { + signal::set_signal_task_id(self->_load_id); st = fn(); std::lock_guard lock(mu); cv.notify_one(); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 9aefa3d9093ea4..d3f6e02558ed30 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -42,7 +42,7 @@ class OlapTableSchemaParam; // origin_segid(index) -> new_segid(value in vector) using SegIdMapping = std::vector; using FailedTablets = std::vector>; -class TabletStream { +class TabletStream : public std::enable_shared_from_this { public: TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);