Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 48 additions & 26 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,12 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
uint32_t new_segid = mapping->at(segid);
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
auto self = shared_from_this();
auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable {
signal::set_signal_task_id(self->_load_id);
auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable {
signal::set_signal_task_id(_load_id);
g_load_stream_flush_running_threads << -1;
auto st =
self->_load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
if (!st.ok() && !config::is_cloud_mode()) {
auto res = ExecEnv::get_tablet(self->_id);
auto res = ExecEnv::get_tablet(_id);
TabletSharedPtr tablet =
res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
if (tablet) {
Expand All @@ -166,7 +164,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
{ file_type = static_cast<FileType>(-1); });
if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) {
st = self->_load_stream_writer->close_writer(new_segid, file_type);
st = _load_stream_writer->close_writer(new_segid, file_type);
} else {
st = Status::InternalError(
"appent data failed, file type error, file type = {}, "
Expand All @@ -177,8 +175,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
self->_status.update(st);
LOG(WARNING) << "write data failed " << st << ", " << *self;
_status.update(st);
LOG(WARNING) << "write data failed " << st << ", " << *this;
}
};
auto load_stream_flush_token_max_tasks = config::load_stream_flush_token_max_tasks;
Expand Down Expand Up @@ -250,15 +248,14 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
}
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());

auto self = shared_from_this();
auto add_segment_func = [self, new_segid, stat]() {
signal::set_signal_task_id(self->_load_id);
auto st = self->_load_stream_writer->add_segment(new_segid, stat);
auto add_segment_func = [this, new_segid, stat]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat);
DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
self->_status.update(st);
LOG(INFO) << "add segment failed " << *self;
_status.update(st);
LOG(INFO) << "add segment failed " << *this;
}
};
Status st = Status::OK();
Expand All @@ -278,9 +275,8 @@ Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
auto st = Status::OK();
auto self = shared_from_this();
auto func = [self, &mu, &cv, &st, &fn] {
signal::set_signal_task_id(self->_load_id);
auto func = [this, &mu, &cv, &st, &fn] {
signal::set_signal_task_id(_load_id);
st = fn();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
Expand All @@ -294,21 +290,37 @@ Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
return st;
}

void TabletStream::pre_close() {
void TabletStream::wait_for_flush_tasks() {
{
std::lock_guard lock_guard(_lock);
if (_flush_tasks_done) {
return;
}
_flush_tasks_done = true;
}

if (!_status.ok()) {
// cancel all pending tasks, wait all running tasks to finish
_flush_token->shutdown();
return;
}

SCOPED_TIMER(_close_wait_timer);
_status.update(_run_in_heavy_work_pool([this]() {
// Use heavy_work_pool to avoid blocking bthread
auto st = _run_in_heavy_work_pool([this]() {
_flush_token->wait();
Comment thread
liaoxin01 marked this conversation as resolved.
return Status::OK();
}));
// it is necessary to check status after wait_func,
// for create_rowset could fail during add_segment when loading to MOW table,
// in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
});
if (!st.ok()) {
// If heavy_work_pool is unavailable, fall back to shutdown
// which will cancel pending tasks and wait for running tasks
_flush_token->shutdown();
_status.update(st);
}
}
Comment thread
liaoxin01 marked this conversation as resolved.

void TabletStream::pre_close() {
SCOPED_TIMER(_close_wait_timer);
wait_for_flush_tasks();

if (!_status.ok()) {
return;
}
Expand Down Expand Up @@ -347,6 +359,16 @@ IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
}

IndexStream::~IndexStream() {
// Ensure all TabletStreams have their flush tokens properly handled before destruction.
// In normal flow, close() should have called pre_close() on all tablet streams.
// But if IndexStream is destroyed without close() being called (e.g., on_idle_timeout),
// we need to wait for flush tasks here to ensure flush tokens are properly shut down.
for (auto& [_, tablet_stream] : _tablet_streams_map) {
tablet_stream->wait_for_flush_tasks();
}
}

Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) {
SCOPED_TIMER(_append_data_timer);
int64_t tablet_id = header.tablet_id();
Expand Down
7 changes: 6 additions & 1 deletion be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class OlapTableSchemaParam;
// origin_segid(index) -> new_segid(value in vector)
using SegIdMapping = std::vector<uint32_t>;
using FailedTablets = std::vector<std::pair<int64_t, Status>>;
class TabletStream : public std::enable_shared_from_this<TabletStream> {
class TabletStream {
Comment thread
liaoxin01 marked this conversation as resolved.
public:
TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);
Expand All @@ -54,6 +54,9 @@ class TabletStream : public std::enable_shared_from_this<TabletStream> {
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
// Wait for all pending flush tasks to complete and shut down the flush token.
// Safe to call multiple times.
void wait_for_flush_tasks();
void pre_close();
Status close();
int64_t id() const { return _id; }
Expand All @@ -70,6 +73,7 @@ class TabletStream : public std::enable_shared_from_this<TabletStream> {
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
bool _check_num_segments = true;
bool _flush_tasks_done = false;
bthread::Mutex _lock;
AtomicStatus _status;
PUniqueId _load_id;
Expand All @@ -88,6 +92,7 @@ class IndexStream {
IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr* load_stream_mgr,
RuntimeProfile* profile);
~IndexStream();

Status append_data(const PStreamHeader& header, butil::IOBuf* data);

Expand Down
Loading