From e666fa3eba92991c048c36150e5d86fc0e7cdee2 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Tue, 16 Sep 2025 23:36:35 +0800 Subject: [PATCH 1/9] [fix](load_stream) close brpc stream after load stream is closed Otherwise, auto partition on multi bes may lead to segment num mismatch problem. --- be/src/runtime/load_stream.cpp | 16 ++++++++++++---- be/src/runtime/load_stream.h | 4 +++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index d042e4ea7520ef..e620ca01604fc5 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -463,7 +463,7 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) { return Status::OK(); } -void LoadStream::close(int64_t src_id, const std::vector& tablets_to_commit, + bool LoadStream::close(int64_t src_id, const std::vector& tablets_to_commit, std::vector* success_tablet_ids, FailedTablets* failed_tablets) { std::lock_guard lock_guard(_lock); SCOPED_TIMER(_close_wait_timer); @@ -482,7 +482,7 @@ void LoadStream::close(int64_t src_id, const std::vector& tablets_to_ if (_close_load_cnt < _total_streams) { // do not return commit info if there is remaining streams. - return; + return false; } for (auto& [_, index_stream] : _index_streams_map) { @@ -490,6 +490,7 @@ void LoadStream::close(int64_t src_id, const std::vector& tablets_to_ } LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() << ", failed_tablet_num=" << failed_tablets->size(); + return true; } void LoadStream::_report_result(StreamId stream, const Status& status, @@ -680,9 +681,16 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* std::vector success_tablet_ids; FailedTablets failed_tablets; std::vector tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); - close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); + bool all_closed = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true); - brpc::StreamClose(id); + std::lock_guard lock_guard(_lock); + _closing_stream_ids.push_back(id); + if (all_closed) { + for (auto& closing_id : _closing_stream_ids) { + brpc::StreamClose(closing_id); + _closing_stream_ids.clear(); + } + } } break; case PStreamHeader::GET_SCHEMA: { _report_schema(id, hdr); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 6e84a3f12f8b5c..9aefa3d9093ea4 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -129,7 +129,8 @@ class LoadStream : public brpc::StreamInputHandler { } } - void close(int64_t src_id, const std::vector& tablets_to_commit, + // return true if all streams are closed, otherwise return false + bool close(int64_t src_id, const std::vector& tablets_to_commit, std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); // callbacks called by brpc @@ -177,6 +178,7 @@ class LoadStream : public brpc::StreamInputHandler { RuntimeProfile::Counter* _close_wait_timer = nullptr; LoadStreamMgr* _load_stream_mgr = nullptr; std::shared_ptr _resource_ctx; + std::vector _closing_stream_ids; bool _is_incremental = false; }; From a0610c16c2e2e10216777391b8417de4132f305a Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Tue, 16 Sep 2025 23:41:55 +0800 Subject: [PATCH 2/9] fix --- be/src/runtime/load_stream.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index e620ca01604fc5..900056711f476b 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -463,7 +463,7 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) { return Status::OK(); } - bool LoadStream::close(int64_t src_id, const std::vector& tablets_to_commit, +bool LoadStream::close(int64_t src_id, const std::vector& tablets_to_commit, std::vector* success_tablet_ids, FailedTablets* failed_tablets) { std::lock_guard lock_guard(_lock); SCOPED_TIMER(_close_wait_timer); @@ -681,7 +681,8 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* std::vector success_tablet_ids; FailedTablets failed_tablets; std::vector tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); - bool all_closed = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); + bool all_closed = + close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true); std::lock_guard lock_guard(_lock); _closing_stream_ids.push_back(id); From 875e4dfd566519d9fda78ebf98177200e6e34f77 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Wed, 17 Sep 2025 09:57:48 +0800 Subject: [PATCH 3/9] fix --- be/src/runtime/load_stream.cpp | 10 +++++++++- be/src/vec/sink/load_stream_map_pool.cpp | 6 +++++- be/src/vec/sink/load_stream_map_pool.h | 1 + be/src/vec/sink/load_stream_stub.cpp | 4 +++- gensrc/proto/internal_service.proto | 1 + 5 files changed, 19 insertions(+), 3 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 900056711f476b..185dd19d2b9ea5 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -685,7 +685,15 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true); std::lock_guard lock_guard(_lock); - _closing_stream_ids.push_back(id); + // if incremental stream, we need to wait for all non-incremental streams to be closed + // before closing incremental streams. We need a fencing mechanism to avoid use after closing + // across different be. + if (hdr.has_num_incremental_streams() && hdr.num_incremental_streams() > 0) { + _closing_stream_ids.push_back(id); + } else { + brpc::StreamClose(id); + } + if (all_closed) { for (auto& closing_id : _closing_stream_ids) { brpc::StreamClose(closing_id); diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index 24a1cb77a489cc..e68977ea64efd4 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -29,6 +29,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, _src_id(src_id), _num_streams(num_streams), _use_cnt(num_use), + _num_incremental_streams(0), _pool(pool), _tablet_schema_for_index(std::make_shared()), _enable_unique_mow_for_index(std::make_shared()) { @@ -42,6 +43,9 @@ std::shared_ptr LoadStreamMap::get_or_create(int64_t dst_id, bo if (streams != nullptr) { return streams; } + if (incremental) { + _num_incremental_streams.fetch_add(1); + } streams = std::make_shared(_num_streams, _load_id, _src_id, _tablet_schema_for_index, _enable_unique_mow_for_index, incremental); @@ -118,7 +122,7 @@ void LoadStreamMap::close_load(bool incremental) { tablets_to_commit.push_back(tablet); tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]); } - auto st = streams->close_load(tablets_to_commit); + auto st = streams->close_load(tablets_to_commit, _num_incremental_streams.load()); if (!st.ok()) { LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental") << " streams failed: " << st << ", load_id=" << _load_id; diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index ab8a40d0c9148d..6186aa85c860dd 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -112,6 +112,7 @@ class LoadStreamMap { const int64_t _src_id; const int _num_streams; std::atomic _use_cnt; + std::atomic _num_incremental_streams; std::mutex _mutex; std::unordered_map> _streams_for_node; LoadStreamMapPool* _pool = nullptr; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 17ae49eb5706dc..77a4deb4b0680a 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -253,7 +253,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 } // CLOSE_LOAD -Status LoadStreamStub::close_load(const std::vector& tablets_to_commit) { +Status LoadStreamStub::close_load(const std::vector& tablets_to_commit, + int num_incremental_streams) { if (!_is_open.load()) { return _status; } @@ -264,6 +265,7 @@ Status LoadStreamStub::close_load(const std::vector& tablets_to_commi for (const auto& tablet : tablets_to_commit) { *header.add_tablets() = tablet; } + header.set_num_incremental_streams(num_incremental_streams); _status = _encode_and_send(header); if (!_status.ok()) { LOG(WARNING) << "stream " << _stream_id << " close failed: " << _status; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index e070743d252cd9..d73d01ab318b9c 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -990,6 +990,7 @@ message PStreamHeader { optional TabletSchemaPB flush_schema = 11; optional uint64 offset = 12; optional FileType file_type = 13; + optional int64 num_incremental_streams = 14; } message PGetWalQueueSizeRequest{ From 904862f6678d51629856f71a308620b5240f426b Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Wed, 17 Sep 2025 11:52:32 +0800 Subject: [PATCH 4/9] fix --- be/src/vec/sink/load_stream_stub.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 066805087a6dd4..9e8f90c87000f8 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -147,7 +147,7 @@ class LoadStreamStub : public std::enable_shared_from_this { int32_t segment_id, const SegmentStatistics& segment_stat); // CLOSE_LOAD - Status close_load(const std::vector& tablets_to_commit); + Status close_load(const std::vector& tablets_to_commit, int num_incremental_streams); // GET_SCHEMA Status get_schema(const std::vector& tablets); From 93e129b97be1a3c5e7c66aa8406596a7036a97af Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Wed, 17 Sep 2025 14:51:49 +0800 Subject: [PATCH 5/9] fix --- be/src/vec/sink/load_stream_stub.cpp | 6 +++--- be/src/vec/sink/load_stream_stub.h | 2 +- .../suites/compaction/test_config_prune_delete_sign.groovy | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 77a4deb4b0680a..28e0980bdd8dae 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -543,7 +543,7 @@ Status LoadStreamStubs::open(BrpcClientCache* client_cache return status; } -Status LoadStreamStubs::close_load(const std::vector& tablets_to_commit) { +Status LoadStreamStubs::close_load(const std::vector& tablets_to_commit, int num_incremental_streams) { if (!_open_success.load()) { return Status::InternalError("streams not open"); } @@ -552,10 +552,10 @@ Status LoadStreamStubs::close_load(const std::vector& tablets_to_comm for (auto& stream : _streams) { Status st; if (first) { - st = stream->close_load(tablets_to_commit); + st = stream->close_load(tablets_to_commit, num_incremental_streams); first = false; } else { - st = stream->close_load({}); + st = stream->close_load({}, num_incremental_streams); } if (!st.ok()) { LOG(WARNING) << "close_load failed: " << st << "; stream: " << *stream; diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 9e8f90c87000f8..15e51cdd8e67da 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -320,7 +320,7 @@ class LoadStreamStubs { } } - Status close_load(const std::vector& tablets_to_commit); + Status close_load(const std::vector& tablets_to_commit, int num_incremental_streams); std::unordered_set success_tablets() { std::unordered_set s; diff --git a/regression-test/suites/compaction/test_config_prune_delete_sign.groovy b/regression-test/suites/compaction/test_config_prune_delete_sign.groovy index 92f89f1066fdf8..f08d99fc05f6f1 100644 --- a/regression-test/suites/compaction/test_config_prune_delete_sign.groovy +++ b/regression-test/suites/compaction/test_config_prune_delete_sign.groovy @@ -75,7 +75,6 @@ suite("test_config_prune_delete_sign", "nonConcurrent") { // the base compacton would report -808, which means the base compaction is not triggered. // so we need to insert a row to make sure the base compaction happens. sql "insert into ${table1} values(60,60,60);" - trigger_and_wait_compaction(table1, "cumulative") def tablets = sql_return_maparray """ show tablets from ${table1}; """ logger.info("tablets: ${tablets}") From 085d67026befac50bd8953374d919ea29e0257a5 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Wed, 17 Sep 2025 16:15:15 +0800 Subject: [PATCH 6/9] fix --- be/src/vec/sink/load_stream_stub.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 28e0980bdd8dae..7f6ef126fd7665 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -543,7 +543,8 @@ Status LoadStreamStubs::open(BrpcClientCache* client_cache return status; } -Status LoadStreamStubs::close_load(const std::vector& tablets_to_commit, int num_incremental_streams) { +Status LoadStreamStubs::close_load(const std::vector& tablets_to_commit, + int num_incremental_streams) { if (!_open_success.load()) { return Status::InternalError("streams not open"); } From 20ef3738ac87f0cbe42d8518489bf3c5426b4c59 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Wed, 17 Sep 2025 17:36:24 +0800 Subject: [PATCH 7/9] fix --- .../suites/compaction/test_config_prune_delete_sign.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/regression-test/suites/compaction/test_config_prune_delete_sign.groovy b/regression-test/suites/compaction/test_config_prune_delete_sign.groovy index f08d99fc05f6f1..92f89f1066fdf8 100644 --- a/regression-test/suites/compaction/test_config_prune_delete_sign.groovy +++ b/regression-test/suites/compaction/test_config_prune_delete_sign.groovy @@ -75,6 +75,7 @@ suite("test_config_prune_delete_sign", "nonConcurrent") { // the base compacton would report -808, which means the base compaction is not triggered. // so we need to insert a row to make sure the base compaction happens. sql "insert into ${table1} values(60,60,60);" + trigger_and_wait_compaction(table1, "cumulative") def tablets = sql_return_maparray """ show tablets from ${table1}; """ logger.info("tablets: ${tablets}") From 2c23e8d629755dcc58c40fc0be87611cc08ed051 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Thu, 18 Sep 2025 09:03:28 +0800 Subject: [PATCH 8/9] Update be/src/runtime/load_stream.cpp Co-authored-by: Xin Liao --- be/src/runtime/load_stream.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 185dd19d2b9ea5..3caed84d61f52c 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -696,9 +696,9 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* if (all_closed) { for (auto& closing_id : _closing_stream_ids) { - brpc::StreamClose(closing_id); - _closing_stream_ids.clear(); + brpc::StreamClose(closing_id); } + _closing_stream_ids.clear(); } } break; case PStreamHeader::GET_SCHEMA: { From 66060fb14401c36633d91de6e55cbc82a9f7c191 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Thu, 18 Sep 2025 10:11:28 +0800 Subject: [PATCH 9/9] Fix formatting in load_stream.cpp --- be/src/runtime/load_stream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 3caed84d61f52c..42ac15ea51ac43 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -696,7 +696,7 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* if (all_closed) { for (auto& closing_id : _closing_stream_ids) { - brpc::StreamClose(closing_id); + brpc::StreamClose(closing_id); } _closing_stream_ids.clear(); }