Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,17 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
// replace existed rowset with `to_add` rowset. This may occur when:
// 1. schema change converts rowsets which have been double written to new tablet
// 2. cumu compaction picks single overlapping input rowset to perform compaction
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
// add existed rowset to unused_rowsets to remove delete bitmap
if (auto find_it = _rs_version_map.find(rs->version());
find_it != _rs_version_map.end()) {
DCHECK(find_it->second->rowset_id() != rs->rowset_id())
<< "tablet_id=" << tablet_id()
<< ", rowset_id=" << rs->rowset_id().to_string()
<< ", existed rowset=" << find_it->second->rowset_id().to_string();
_unused_rowsets.emplace(find_it->second->rowset_id(), find_it->second);

@swjtu-zhanglei swjtu-zhanglei Jun 6, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does here need std::lock_guard < std::mutex > lock(_gc_mutex);?

}
}
_tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
_rs_version_map[rs->version()] = rs;
_tablet_meta->add_rowsets_unchecked({rs});
Expand Down Expand Up @@ -459,6 +470,14 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {

if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write() &&
!deleted_stale_rowsets.empty()) {
// record expired rowsets in unused rowsets
{
std::lock_guard<std::mutex> lock(_gc_mutex);
for (const auto& rowset : expired_rowsets) {
_unused_rowsets.emplace(rowset->rowset_id(), rowset);
}
}

// agg delete bitmap for pre rowsets; record unused delete bitmap key ranges
OlapStopWatch watch;
for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
Expand All @@ -467,9 +486,13 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
agg_delete_bitmap_for_stale_rowsets(version, remove_delete_bitmap_key_ranges);
// add remove delete bitmap
if (!remove_delete_bitmap_key_ranges.empty()) {
std::vector<RowsetId> rowset_ids;
for (const auto& rs : unused_rowsets) {
rowset_ids.push_back(rs->rowset_id());
}
std::lock_guard<std::mutex> lock(_gc_mutex);
_unused_delete_bitmap.push_back(
std::make_pair(unused_rowsets, remove_delete_bitmap_key_ranges));
std::make_pair(rowset_ids, remove_delete_bitmap_key_ranges));
}
}
LOG(INFO) << "agg pre rowsets delete bitmap. tablet_id=" << tablet_id()
Expand All @@ -479,23 +502,34 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
return expired_rowsets.size();
}

bool CloudTablet::need_remove_pre_rowset_delete_bitmap() {
bool CloudTablet::need_remove_unused_rowsets() {
std::lock_guard<std::mutex> lock(_gc_mutex);
return !_unused_delete_bitmap.empty();
return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty();
}

void CloudTablet::remove_pre_rowset_delete_bitmap() {
void CloudTablet::remove_unused_rowsets() {
std::lock_guard<std::mutex> lock(_gc_mutex);
// 1. remove unused rowsets and delete bitmap
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
auto&& rs = it->second;
if (rs.use_count() > 1) {
LOG(WARNING) << "Rowset " << rs->rowset_id() << " has " << rs.use_count()
<< " references. Can not remove delete bitmap.";
++it;
continue;
}
tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
it = _unused_rowsets.erase(it);
}

// 2. remove delete bitmap of pre rowsets
for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
auto& rowsets = std::get<0>(*it);
auto& rowset_ids = std::get<0>(*it);
bool find_unused_rowset = false;
for (const auto& rowset : rowsets) {
if (rowset.use_count() > 1) {
for (const auto& rowset_id : rowset_ids) {
if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use"
<< ", tablet_id=" << tablet_id()
<< ", rowset_id=" << rowset->rowset_id().to_string()
<< ", version=" << rowset->version().to_string()
<< ", use_count=" << rowset.use_count();
<< ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id;
find_unused_rowset = true;
break;
}
Expand All @@ -508,8 +542,10 @@ void CloudTablet::remove_pre_rowset_delete_bitmap() {
tablet_meta()->delete_bitmap().remove(key_ranges);
it = _unused_delete_bitmap.erase(it);
}
if (!_unused_delete_bitmap.empty()) {

if (!_unused_rowsets.empty() || !_unused_delete_bitmap.empty()) {
LOG(INFO) << "tablet_id=" << tablet_id()
<< ", unused_rowset size=" << _unused_rowsets.size()
<< ", unused_delete_bitmap size=" << _unused_delete_bitmap.size();
}
}
Expand Down
8 changes: 4 additions & 4 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ class CloudTablet final : public BaseTablet {
DeleteBitmapPtr& new_delete_bitmap,
std::map<std::string, int64_t>& pre_rowset_to_versions);

bool need_remove_pre_rowset_delete_bitmap();
void remove_pre_rowset_delete_bitmap();
bool need_remove_unused_rowsets();
void remove_unused_rowsets();

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
Expand Down Expand Up @@ -340,8 +340,8 @@ class CloudTablet final : public BaseTablet {

// unused_rowsets, [start_version, end_version]
std::mutex _gc_mutex;
std::vector<std::pair<std::vector<RowsetSharedPtr>, DeleteBitmapKeyRanges>>
_unused_delete_bitmap;
std::unordered_map<RowsetId, RowsetSharedPtr> _unused_rowsets;
std::vector<std::pair<std::vector<RowsetId>, DeleteBitmapKeyRanges>> _unused_delete_bitmap;
};

using CloudTabletSPtr = std::shared_ptr<CloudTablet>;
Expand Down
20 changes: 10 additions & 10 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,19 +266,19 @@ void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) {
.tag("num_tablets", tablets_to_vacuum.size());

{
LOG_INFO("begin to remove pre rowsets delete bitmap");
std::vector<std::shared_ptr<CloudTablet>> tablets_to_remove_delete_bitmap;
tablets_to_remove_delete_bitmap.reserve(_tablet_map->size());
_tablet_map->traverse([&tablets_to_remove_delete_bitmap](auto&& t) {
if (t->need_remove_pre_rowset_delete_bitmap()) {
tablets_to_remove_delete_bitmap.push_back(t);
LOG_INFO("begin to remove unused rowsets");
std::vector<std::shared_ptr<CloudTablet>> tablets_to_remove_unused_rowsets;
tablets_to_remove_unused_rowsets.reserve(_tablet_map->size());
_tablet_map->traverse([&tablets_to_remove_unused_rowsets](auto&& t) {
if (t->need_remove_unused_rowsets()) {
tablets_to_remove_unused_rowsets.push_back(t);
}
});
for (auto& t : tablets_to_remove_delete_bitmap) {
t->remove_pre_rowset_delete_bitmap();
for (auto& t : tablets_to_remove_unused_rowsets) {
t->remove_unused_rowsets();
}
LOG_INFO("finish remove pre rowsets delete bitmap")
.tag("num_tablets", tablets_to_remove_delete_bitmap.size());
LOG_INFO("finish remove unused rowsets")
.tag("num_tablets", tablets_to_remove_unused_rowsets.size());
if (config::enable_check_agg_and_remove_pre_rowsets_delete_bitmap) {
int64_t max_useless_rowset_count = 0;
int64_t tablet_id_with_max_useless_rowset_count = 0;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1832,6 +1832,8 @@ void BaseTablet::agg_delete_bitmap_for_stale_rowsets(
remove_delete_bitmap_key_ranges.emplace_back(start_key, end_key);
}
}
DBUG_EXECUTE_IF("BaseTablet.agg_delete_bitmap_for_stale_rowsets.merge_delete_bitmap.block",
DBUG_BLOCK);
tablet_meta()->delete_bitmap().merge(*new_delete_bitmap);
}

Expand Down
41 changes: 27 additions & 14 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,15 +950,28 @@ void StorageEngine::_clean_unused_rowset_metas() {
for (auto data_dir : data_dirs) {
static_cast<void>(
RowsetMetaManager::traverse_rowset_metas(data_dir->get_meta(), clean_rowset_func));
// 1. delete delete_bitmap
std::set<int64_t> tablets_to_save_meta;
for (auto& rowset_meta : invalid_rowset_metas) {
static_cast<void>(RowsetMetaManager::remove(
data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id()));
TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id());
if (tablet && tablet->tablet_meta()->enable_unique_key_merge_on_write()) {
tablet->tablet_meta()->delete_bitmap().remove_rowset_cache_version(
rowset_meta->rowset_id());
tablet->tablet_meta()->remove_rowset_delete_bitmap(rowset_meta->rowset_id(),
rowset_meta->version());
tablets_to_save_meta.emplace(tablet->tablet_id());
}
}
for (const auto& tablet_id : tablets_to_save_meta) {
auto tablet = _tablet_manager->get_tablet(tablet_id);
if (tablet) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->save_meta();
}
}
// 2. delete rowset meta
for (auto& rowset_meta : invalid_rowset_metas) {
static_cast<void>(RowsetMetaManager::remove(
data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id()));
}
LOG(INFO) << "remove " << invalid_rowset_metas.size()
<< " invalid rowset meta from dir: " << data_dir->path();
invalid_rowset_metas.clear();
Expand Down Expand Up @@ -1192,6 +1205,7 @@ void StorageEngine::_parse_default_rowset_type() {
}

void StorageEngine::start_delete_unused_rowset() {
DBUG_EXECUTE_IF("StorageEngine::start_delete_unused_rowset.block", DBUG_BLOCK);
LOG(INFO) << "start to delete unused rowset, size: " << _unused_rowsets.size()
<< ", unused delete bitmap size: " << _unused_delete_bitmap.size();
std::vector<RowsetSharedPtr> unused_rowsets_copy;
Expand Down Expand Up @@ -1250,13 +1264,6 @@ void StorageEngine::start_delete_unused_rowset() {
it = _unused_delete_bitmap.erase(it);
}
}
for (const auto& tablet_id : tablets_to_save_meta) {
auto tablet = _tablet_manager->get_tablet(tablet_id);
if (tablet) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->save_meta();
}
}
LOG(INFO) << "collected " << unused_rowsets_copy.size() << " unused rowsets to remove, skipped "
<< due_to_use_count << " rowsets due to use count > 1, skipped "
<< due_to_not_delete_file << " rowsets due to don't need to delete file, skipped "
Expand All @@ -1268,14 +1275,20 @@ void StorageEngine::start_delete_unused_rowset() {
// delete delete_bitmap of unused rowsets
if (auto tablet = _tablet_manager->get_tablet(rs->rowset_meta()->tablet_id());
tablet && tablet->enable_unique_key_merge_on_write()) {
tablet->tablet_meta()->delete_bitmap().remove({rs->rowset_id(), 0, 0},
{rs->rowset_id(), UINT32_MAX, 0});
tablet->tablet_meta()->delete_bitmap().remove_rowset_cache_version(rs->rowset_id());
tablet->tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
tablets_to_save_meta.emplace(tablet->tablet_id());
}
Status status = rs->remove();
unused_rowsets_counter << -1;
VLOG_NOTICE << "remove rowset:" << rs->rowset_id() << " finished. status:" << status;
}
for (const auto& tablet_id : tablets_to_save_meta) {
auto tablet = _tablet_manager->get_tablet(tablet_id);
if (tablet) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->save_meta();
}
}
LOG(INFO) << "removed all collected unused rowsets";
}

Expand Down
26 changes: 12 additions & 14 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,18 @@ void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tco
}
}

void TabletMeta::remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version& version) {
Comment thread
mymeiyi marked this conversation as resolved.
if (_enable_unique_key_merge_on_write) {
delete_bitmap().remove({rowset_id, 0, 0}, {rowset_id, UINT32_MAX, 0});
if (config::enable_mow_verbose_log) {
LOG_INFO("delete rowset delete bitmap. tablet={}, rowset={}, version={}", tablet_id(),
rowset_id.to_string(), version.to_string());
}
size_t rowset_cache_version_size = delete_bitmap().remove_rowset_cache_version(rowset_id);
_check_mow_rowset_cache_version_size(rowset_cache_version_size);
}
}

Status TabletMeta::create_from_file(const string& file_path) {
TabletMetaPB tablet_meta_pb;
RETURN_IF_ERROR(load_from_file(file_path, &tablet_meta_pb));
Expand Down Expand Up @@ -945,28 +957,14 @@ void TabletMeta::revise_delete_bitmap_unlocked(const DeleteBitmap& delete_bitmap
}

void TabletMeta::delete_stale_rs_meta_by_version(const Version& version) {
size_t rowset_cache_version_size = 0;
auto it = _stale_rs_metas.begin();
while (it != _stale_rs_metas.end()) {
if ((*it)->version() == version) {
if (_enable_unique_key_merge_on_write) {
// remove rowset delete bitmap
delete_bitmap().remove({(*it)->rowset_id(), 0, 0},
{(*it)->rowset_id(), UINT32_MAX, 0});
rowset_cache_version_size =
delete_bitmap().remove_rowset_cache_version((*it)->rowset_id());
if (config::enable_mow_verbose_log) {
LOG_INFO(
"delete stale rowset's delete bitmap. tablet={}, version={}, rowset={}",
tablet_id(), version.to_string(), (*it)->rowset_id().to_string());
}
}
it = _stale_rs_metas.erase(it);
} else {
it++;
}
}
_check_mow_rowset_cache_version_size(rowset_cache_version_size);
}

RowsetMetaSharedPtr TabletMeta::acquire_rs_meta_by_version(const Version& version) const {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class TabletMeta : public MetadataAdder<TabletMeta> {

DeleteBitmapPtr delete_bitmap_ptr() { return _delete_bitmap; }
DeleteBitmap& delete_bitmap() { return *_delete_bitmap; }
void remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version& version);

bool enable_unique_key_merge_on_write() const { return _enable_unique_key_merge_on_write; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql1 --
1 99
2 99
3 99
4 99
5 99

-- !sql2 --
1 101
2 100
3 100
4 100
5 100

-- !sql3 --
1 101
2 100
3 100
4 100
5 100

Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ suite('test_mow_agg_delete_bitmap', 'multi_cluster,docker') {
waitForCompaction(tablet)
logger.info("after compaction 1")
getTabletStatus(tablet)
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets") // cloud
def local_dm = getLocalDeleteBitmapStatus(tablet)
logger.info("local_dm 0.2: " + local_dm)
assertEquals(0, local_dm.delete_bitmap_count)
Expand Down
Loading
Loading