Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,49 @@ Status CloudCumulativeCompaction::modify_rowsets() {
_tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) {
RETURN_IF_ERROR(process_old_version_delete_bitmap());
}
// agg delete bitmap for pre rowsets
if (config::enable_agg_and_remove_pre_rowsets_delete_bitmap &&
_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) {
OlapStopWatch watch;
std::vector<RowsetSharedPtr> pre_rowsets {};
{
std::shared_lock rlock(_tablet->get_header_lock());
for (const auto& it2 : cloud_tablet()->rowset_map()) {
if (it2.first.second < _output_rowset->start_version()) {
pre_rowsets.emplace_back(it2.second);
}
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
auto pre_rowsets_delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_id());
std::map<std::string, int64_t> pre_rowset_to_versions;
cloud_tablet()->agg_delete_bitmap_for_compaction(
_output_rowset->start_version(), _output_rowset->end_version(), pre_rowsets,
pre_rowsets_delete_bitmap, pre_rowset_to_versions);
// update delete bitmap to ms
DBUG_EXECUTE_IF(
"CumulativeCompaction.modify_rowsets.cloud_update_delete_bitmap_without_lock.block",
DBUG_BLOCK);
auto status = _engine.meta_mgr().cloud_update_delete_bitmap_without_lock(
*cloud_tablet(), pre_rowsets_delete_bitmap.get(), pre_rowset_to_versions,
_output_rowset->start_version(), _output_rowset->end_version());
if (!status.ok()) {
LOG(WARNING) << "failed to agg pre rowsets delete bitmap to ms. tablet_id="
<< _tablet->tablet_id() << ", pre rowset num=" << pre_rowsets.size()
<< ", output version=" << _output_rowset->version().to_string()
<< ", status=" << status.to_string();
} else {
LOG(INFO) << "agg pre rowsets delete bitmap to ms. tablet_id=" << _tablet->tablet_id()
<< ", pre rowset num=" << pre_rowsets.size()
<< ", output version=" << _output_rowset->version().to_string()
<< ", cost(us)=" << watch.get_elapse_time_us();
}
}
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset", {
LOG(INFO) << "delete_expired_stale_rowsets for tablet=" << _tablet->tablet_id();
_engine.tablet_mgr().vacuum_stale_rowsets(CountDownLatch(1));
});
return Status::OK();
}

Expand Down Expand Up @@ -440,8 +483,9 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
"test fail to update delete bitmap for tablet_id {}",
cloud_tablet()->tablet_id());
});
std::map<std::string, int64_t> rowset_to_versions;
RETURN_IF_ERROR(_engine.meta_mgr().cloud_update_delete_bitmap_without_lock(
*cloud_tablet(), new_delete_bitmap.get()));
*cloud_tablet(), new_delete_bitmap.get(), rowset_to_versions));

Version version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
Expand Down
27 changes: 22 additions & 5 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
// So dont need to sync it.
if (sync_delete_bitmap && tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_state() == TABLET_RUNNING) {
DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.sync_tablet_delete_bitmap.block",
DBUG_BLOCK);
DeleteBitmap delete_bitmap(tablet_id);
int64_t old_max_version = req.start_version() - 1;
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),
Expand Down Expand Up @@ -1329,10 +1331,12 @@ Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t loc
return st;
}

Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock(const CloudTablet& tablet,
DeleteBitmap* delete_bitmap) {
LOG(INFO) << "cloud_update_delete_bitmap_without_lock , tablet_id: " << tablet.tablet_id()
<< ",delete_bitmap size:" << delete_bitmap->get_delete_bitmap_count();
Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock(
const CloudTablet& tablet, DeleteBitmap* delete_bitmap,
std::map<std::string, int64_t>& rowset_to_versions, int64_t pre_rowset_agg_start_version,
int64_t pre_rowset_agg_end_version) {
LOG(INFO) << "cloud_update_delete_bitmap_without_lock, tablet_id: " << tablet.tablet_id()
<< ", delete_bitmap size: " << delete_bitmap->delete_bitmap.size();
UpdateDeleteBitmapRequest req;
UpdateDeleteBitmapResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -1341,17 +1345,30 @@ Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock(const CloudTablet&
req.set_tablet_id(tablet.tablet_id());
// use a fake lock id to resolve compatibility issues
req.set_lock_id(-3);
req.set_unlock(true);
req.set_without_lock(true);
for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
req.add_rowset_ids(std::get<0>(key).to_string());
req.add_segment_ids(std::get<1>(key));
req.add_versions(std::get<2>(key));
if (pre_rowset_agg_end_version > 0) {
DCHECK(rowset_to_versions.find(std::get<0>(key).to_string()) !=
rowset_to_versions.end())
<< "rowset_to_versions not found for key=" << std::get<0>(key).to_string();
req.add_pre_rowset_versions(rowset_to_versions[std::get<0>(key).to_string()]);
}
DCHECK(pre_rowset_agg_end_version <= 0 || pre_rowset_agg_end_version == std::get<2>(key))
<< "pre_rowset_agg_end_version=" << pre_rowset_agg_end_version
<< " not equal to version=" << std::get<2>(key);
// To save space, convert array and bitmap containers to run containers
bitmap.runOptimize();
std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
bitmap.write(bitmap_data.data());
*(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
}
if (pre_rowset_agg_start_version > 0 && pre_rowset_agg_end_version > 0) {
req.set_pre_rowset_agg_start_version(pre_rowset_agg_start_version);
req.set_pre_rowset_agg_end_version(pre_rowset_agg_end_version);
}
return retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap);
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ class CloudMetaMgr {
DeleteBitmap* delete_bitmap, int64_t txn_id = -1,
bool is_explicit_txn = false, int64_t next_visible_version = -1);

Status cloud_update_delete_bitmap_without_lock(const CloudTablet& tablet,
DeleteBitmap* delete_bitmap);
Status cloud_update_delete_bitmap_without_lock(
const CloudTablet& tablet, DeleteBitmap* delete_bitmap,
std::map<std::string, int64_t>& rowset_to_versions,
int64_t pre_rowset_agg_start_version = 0, int64_t pre_rowset_agg_end_version = 0);

Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator);
Expand Down
1 change: 0 additions & 1 deletion be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,6 @@ void CloudStorageEngine::_check_tablet_delete_bitmap_score_callback() {
}
uint64_t max_delete_bitmap_score = 0;
uint64_t max_base_rowset_delete_bitmap_score = 0;
std::vector<CloudTabletSPtr> tablets;
tablet_mgr().get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score,
&max_base_rowset_delete_bitmap_score);
if (max_delete_bitmap_score > 0) {
Expand Down
79 changes: 78 additions & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
}
std::vector<RowsetSharedPtr> expired_rowsets;
// ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
std::vector<RowsetSharedPtr> stale_rowsets;
std::vector<std::pair<Version, std::vector<RowsetSharedPtr>>> deleted_stale_rowsets;
int64_t expired_stale_sweep_endtime =
::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
std::vector<std::string> version_to_delete;
Expand All @@ -428,6 +428,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
for (int64_t path_id : path_ids) {
int64_t start_version = -1;
int64_t end_version = -1;
std::vector<RowsetSharedPtr> stale_rowsets;
// delete stale versions in version graph
auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
for (auto& v_ts : version_path->timestamped_versions()) {
Expand All @@ -454,6 +455,9 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
}
Version version(start_version, end_version);
version_to_delete.emplace_back(version.to_string());
if (!stale_rowsets.empty()) {
deleted_stale_rowsets.emplace_back(version, std::move(stale_rowsets));
}
}
_reconstruct_version_tracker_if_necessary();
}
Expand All @@ -462,7 +466,31 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
if (config::enable_mow_verbose_log) {
LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
}

add_unused_rowsets(expired_rowsets);
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write() &&
!deleted_stale_rowsets.empty()) {
// agg delete bitmap for pre rowsets; record unused delete bitmap key ranges
OlapStopWatch watch;
for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
// agg delete bitmap for pre rowset
DeleteBitmapKeyRanges remove_delete_bitmap_key_ranges;
agg_delete_bitmap_for_stale_rowsets(version, remove_delete_bitmap_key_ranges);
// add remove delete bitmap
if (!remove_delete_bitmap_key_ranges.empty()) {
std::vector<RowsetId> rowset_ids;
for (const auto& rs : unused_rowsets) {
rowset_ids.push_back(rs->rowset_id());
}
std::lock_guard<std::mutex> lock(_gc_mutex);
_unused_delete_bitmap.push_back(
std::make_pair(rowset_ids, remove_delete_bitmap_key_ranges));
}
}
LOG(INFO) << "agg pre rowsets delete bitmap. tablet_id=" << tablet_id()
<< ", size=" << deleted_stale_rowsets.size()
<< ", cost(us)=" << watch.get_elapse_time_us();
}
return expired_rowsets.size();
}

Expand All @@ -481,6 +509,7 @@ void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets

void CloudTablet::remove_unused_rowsets() {
int64_t removed_rowsets_num = 0;
int64_t removed_delete_bitmap_num = 0;
OlapStopWatch watch;
std::lock_guard<std::mutex> lock(_gc_mutex);
// 1. remove unused rowsets's cache data and delete bitmap
Expand All @@ -500,8 +529,32 @@ void CloudTablet::remove_unused_rowsets() {
removed_rowsets_num++;
}

// 2. remove delete bitmap of pre rowsets
for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
auto& rowset_ids = std::get<0>(*it);
bool find_unused_rowset = false;
for (const auto& rowset_id : rowset_ids) {
if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use"
<< ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id;
find_unused_rowset = true;
break;
}
}
if (find_unused_rowset) {
++it;
continue;
}
auto& key_ranges = std::get<1>(*it);
tablet_meta()->delete_bitmap().remove(key_ranges);
it = _unused_delete_bitmap.erase(it);
removed_delete_bitmap_num++;
}

LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size()
<< ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
<< ", removed_rowsets_num=" << removed_rowsets_num
<< ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
<< ", cost(us)=" << watch.get_elapse_time_us();
}

Expand Down Expand Up @@ -1010,6 +1063,30 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
return st;
}

void CloudTablet::agg_delete_bitmap_for_compaction(
int64_t start_version, int64_t end_version, const std::vector<RowsetSharedPtr>& pre_rowsets,
DeleteBitmapPtr& new_delete_bitmap,
std::map<std::string, int64_t>& pre_rowset_to_versions) {
for (auto& rowset : pre_rowsets) {
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
auto d = tablet_meta()->delete_bitmap().get_agg_without_cache(
{rowset->rowset_id(), seg_id, end_version}, start_version);
if (d->isEmpty()) {
continue;
}
VLOG_DEBUG << "agg delete bitmap for tablet_id=" << tablet_id()
<< ", rowset_id=" << rowset->rowset_id() << ", seg_id=" << seg_id
<< ", rowset_version=" << rowset->version().to_string()
<< ". compaction start_version=" << start_version
<< ", end_version=" << end_version
<< ". delete_bitmap cardinality=" << d->cardinality();
DeleteBitmap::BitmapKey end_key {rowset->rowset_id(), seg_id, end_version};
new_delete_bitmap->set(end_key, *d);
pre_rowset_to_versions[rowset->rowset_id().to_string()] = rowset->version().second;
}
}
}

Status CloudTablet::sync_meta() {
if (!config::enable_file_cache) {
return Status::OK();
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ class CloudTablet final : public BaseTablet {
// check that if the delete bitmap in delete bitmap cache has the same cardinality with the expected_delete_bitmap's
Status check_delete_bitmap_cache(int64_t txn_id, DeleteBitmap* expected_delete_bitmap) override;

void agg_delete_bitmap_for_compaction(int64_t start_version, int64_t end_version,
const std::vector<RowsetSharedPtr>& pre_rowsets,
DeleteBitmapPtr& new_delete_bitmap,
std::map<std::string, int64_t>& pre_rowset_to_versions);

bool need_remove_unused_rowsets();

void add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets);
Expand Down Expand Up @@ -327,6 +332,7 @@ class CloudTablet final : public BaseTablet {
// unused_rowsets, [start_version, end_version]
std::mutex _gc_mutex;
std::unordered_map<RowsetId, RowsetSharedPtr> _unused_rowsets;
std::vector<std::pair<std::vector<RowsetId>, DeleteBitmapKeyRanges>> _unused_delete_bitmap;
};

using CloudTabletSPtr = std::shared_ptr<CloudTablet>;
Expand Down
48 changes: 39 additions & 9 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,36 @@ void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) {
}
LOG_INFO("finish remove unused rowsets")
.tag("num_tablets", tablets_to_remove_unused_rowsets.size());
if (config::enable_check_agg_and_remove_pre_rowsets_delete_bitmap) {
int64_t max_useless_rowset_count = 0;
int64_t tablet_id_with_max_useless_rowset_count = 0;
int64_t max_useless_rowset_version_count = 0;
int64_t tablet_id_with_max_useless_rowset_version_count = 0;
OlapStopWatch watch;
_tablet_map->traverse([&](auto&& tablet) {
int64_t useless_rowset_count = 0;
int64_t useless_rowset_version_count = 0;
tablet->check_agg_delete_bitmap_for_stale_rowsets(useless_rowset_count,
useless_rowset_version_count);
if (useless_rowset_count > max_useless_rowset_count) {
max_useless_rowset_count = useless_rowset_count;
tablet_id_with_max_useless_rowset_count = tablet->tablet_id();
}
if (useless_rowset_version_count > max_useless_rowset_version_count) {
max_useless_rowset_version_count = useless_rowset_version_count;
tablet_id_with_max_useless_rowset_version_count = tablet->tablet_id();
}
});
g_max_rowsets_with_useless_delete_bitmap.set_value(max_useless_rowset_count);
g_max_rowsets_with_useless_delete_bitmap_version.set_value(
max_useless_rowset_version_count);
LOG(INFO) << "finish check_agg_delete_bitmap_for_stale_rowsets, cost(us)="
<< watch.get_elapse_time_us()
<< ". max useless rowset count=" << max_useless_rowset_count
<< ", tablet_id=" << tablet_id_with_max_useless_rowset_count
<< ", max useless rowset version count=" << max_useless_rowset_version_count
<< ", tablet_id=" << tablet_id_with_max_useless_rowset_version_count;
}
}
}

Expand Down Expand Up @@ -495,17 +525,17 @@ void CloudTabletMgr::get_topn_tablet_delete_bitmap_score(
}
std::stringstream ss;
for (auto& i : buf) {
ss << i.first->tablet_id() << ":" << i.second << ",";
ss << i.first->tablet_id() << ": " << i.second << ", ";
}
LOG(INFO) << "get_topn_tablet_delete_bitmap_score, n=" << n
<< ",tablet size=" << weak_tablets.size()
<< ",total_delete_map_count=" << total_delete_map_count
<< ",cost(us)=" << watch.get_elapse_time_us()
<< ",max_delete_bitmap_score=" << *max_delete_bitmap_score
<< ",max_delete_bitmap_score_tablet_id=" << max_delete_bitmap_score_tablet_id
<< ",max_base_rowset_delete_bitmap_score=" << *max_base_rowset_delete_bitmap_score
<< ",max_base_rowset_delete_bitmap_score_tablet_id="
<< max_base_rowset_delete_bitmap_score_tablet_id << ",tablets=[" << ss.str() << "]";
<< ", tablet size=" << weak_tablets.size()
<< ", total_delete_map_count=" << total_delete_map_count
<< ", cost(us)=" << watch.get_elapse_time_us()
<< ", max_delete_bitmap_score=" << *max_delete_bitmap_score
<< ", max_delete_bitmap_score_tablet_id=" << max_delete_bitmap_score_tablet_id
<< ", max_base_rowset_delete_bitmap_score=" << *max_base_rowset_delete_bitmap_score
<< ", max_base_rowset_delete_bitmap_score_tablet_id="
<< max_base_rowset_delete_bitmap_score_tablet_id << ", tablets=[" << ss.str() << "]";
}

void CloudTabletMgr::put_tablet_for_UT(std::shared_ptr<CloudTablet> tablet) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,8 @@ DEFINE_mInt32(publish_version_gap_logging_threshold, "200");
DEFINE_mBool(enable_mow_get_agg_by_cache, "true");
// get agg correctness check for mow table
DEFINE_mBool(enable_mow_get_agg_correctness_check_core, "false");
DEFINE_mBool(enable_agg_and_remove_pre_rowsets_delete_bitmap, "true");
DEFINE_mBool(enable_check_agg_and_remove_pre_rowsets_delete_bitmap, "false");

// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
Expand Down
Loading
Loading