Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) {
return Status::OK();
}

void LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
bool LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard<bthread::Mutex> lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
Expand All @@ -482,14 +482,15 @@ void LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_

if (_close_load_cnt < _total_streams) {
// do not return commit info if there is remaining streams.
return;
return false;
}

for (auto& [_, index_stream] : _index_streams_map) {
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets);
}
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablets->size();
return true;
}

void LoadStream::_report_result(StreamId stream, const Status& status,
Expand Down Expand Up @@ -680,9 +681,25 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
std::vector<int64_t> success_tablet_ids;
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
bool all_closed =
close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets);
_report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true);
brpc::StreamClose(id);
std::lock_guard<bthread::Mutex> lock_guard(_lock);
// if incremental stream, we need to wait for all non-incremental streams to be closed
// before closing incremental streams. We need a fencing mechanism to avoid use after closing
// across different be.
if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams() > 0) {
_closing_stream_ids.push_back(id);
} else {
brpc::StreamClose(id);
}

if (all_closed) {
for (auto& closing_id : _closing_stream_ids) {
brpc::StreamClose(closing_id);
}
_closing_stream_ids.clear();
}
} break;
case PStreamHeader::GET_SCHEMA: {
_report_schema(id, hdr);
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ class LoadStream : public brpc::StreamInputHandler {
}
}

void close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
// return true if all streams are closed, otherwise return false
bool close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

// callbacks called by brpc
Expand Down Expand Up @@ -177,6 +178,7 @@ class LoadStream : public brpc::StreamInputHandler {
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
std::shared_ptr<ResourceContext> _resource_ctx;
std::vector<int64_t> _closing_stream_ids;
bool _is_incremental = false;
};

Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/sink/load_stream_map_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams,
_src_id(src_id),
_num_streams(num_streams),
_use_cnt(num_use),
_num_incremental_streams(0),
_pool(pool),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
Expand All @@ -42,6 +43,9 @@ std::shared_ptr<LoadStreamStubs> LoadStreamMap::get_or_create(int64_t dst_id, bo
if (streams != nullptr) {
return streams;
}
if (incremental) {
_num_incremental_streams.fetch_add(1);
}
streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id, _src_id,
_tablet_schema_for_index,
_enable_unique_mow_for_index, incremental);
Expand Down Expand Up @@ -118,7 +122,7 @@ void LoadStreamMap::close_load(bool incremental) {
tablets_to_commit.push_back(tablet);
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
}
auto st = streams->close_load(tablets_to_commit);
auto st = streams->close_load(tablets_to_commit, _num_incremental_streams.load());
if (!st.ok()) {
LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental")
<< " streams failed: " << st << ", load_id=" << _load_id;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/load_stream_map_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class LoadStreamMap {
const int64_t _src_id;
const int _num_streams;
std::atomic<int> _use_cnt;
std::atomic<int> _num_incremental_streams;
std::mutex _mutex;
std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> _streams_for_node;
LoadStreamMapPool* _pool = nullptr;
Expand Down
11 changes: 7 additions & 4 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64
}

// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) {
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit,
int num_incremental_streams) {
if (!_is_open.load()) {
return _status;
}
Expand All @@ -264,6 +265,7 @@ Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commi
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets() = tablet;
}
header.set_num_incremental_streams(num_incremental_streams);
_status = _encode_and_send(header);
if (!_status.ok()) {
LOG(WARNING) << "stream " << _stream_id << " close failed: " << _status;
Expand Down Expand Up @@ -541,7 +543,8 @@ Status LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
return status;
}

Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_commit) {
Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_commit,
int num_incremental_streams) {
if (!_open_success.load()) {
return Status::InternalError("streams not open");
}
Expand All @@ -550,10 +553,10 @@ Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_comm
for (auto& stream : _streams) {
Status st;
if (first) {
st = stream->close_load(tablets_to_commit);
st = stream->close_load(tablets_to_commit, num_incremental_streams);
first = false;
} else {
st = stream->close_load({});
st = stream->close_load({}, num_incremental_streams);
}
if (!st.ok()) {
LOG(WARNING) << "close_load failed: " << st << "; stream: " << *stream;
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
int32_t segment_id, const SegmentStatistics& segment_stat);

// CLOSE_LOAD
Status close_load(const std::vector<PTabletID>& tablets_to_commit);
Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams);

// GET_SCHEMA
Status get_schema(const std::vector<PTabletID>& tablets);
Expand Down Expand Up @@ -320,7 +320,7 @@ class LoadStreamStubs {
}
}

Status close_load(const std::vector<PTabletID>& tablets_to_commit);
Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams);

std::unordered_set<int64_t> success_tablets() {
std::unordered_set<int64_t> s;
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,7 @@ message PStreamHeader {
optional TabletSchemaPB flush_schema = 11;
optional uint64 offset = 12;
optional FileType file_type = 13;
optional int64 num_incremental_streams = 14;
}

message PGetWalQueueSizeRequest{
Expand Down
Loading