diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 097bdc9bf4ab1b..8546ac04fcf92e 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -447,6 +447,10 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() { { static_cast(_tablet.get())->delete_expired_stale_rowsets(); }); } } + DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset", { + LOG(INFO) << "delete_expired_stale_rowsets for tablet=" << _tablet->tablet_id(); + _engine.tablet_mgr().vacuum_stale_rowsets(CountDownLatch(1)); + }); return Status::OK(); } diff --git a/be/src/cloud/cloud_delete_bitmap_action.cpp b/be/src/cloud/cloud_delete_bitmap_action.cpp deleted file mode 100644 index 3d834bfe7b373c..00000000000000 --- a/be/src/cloud/cloud_delete_bitmap_action.cpp +++ /dev/null @@ -1,182 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "cloud_delete_bitmap_action.h" - -#include -#include -#include -#include -#include - -#include // IWYU pragma: keep -#include -#include -#include -#include -#include -#include -#include -#include - -#include "cloud/cloud_meta_mgr.h" -#include "cloud/cloud_tablet.h" -#include "cloud/cloud_tablet_mgr.h" -#include "common/logging.h" -#include "common/status.h" -#include "gutil/strings/substitute.h" -#include "http/http_channel.h" -#include "http/http_headers.h" -#include "http/http_request.h" -#include "http/http_status.h" -#include "olap/olap_define.h" -#include "olap/storage_engine.h" -#include "olap/tablet_manager.h" -#include "util/doris_metrics.h" -#include "util/stopwatch.hpp" - -namespace doris { -#include "common/compile_check_begin.h" -using namespace ErrorCode; - -namespace { - -constexpr std::string_view HEADER_JSON = "application/json"; - -} // namespace - -CloudDeleteBitmapAction::CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, - CloudStorageEngine& engine, - TPrivilegeHier::type hier, - TPrivilegeType::type ptype) - : HttpHandlerWithAuth(exec_env, hier, ptype), - _engine(engine), - _delete_bitmap_action_type(ctype) {} - -static Status _check_param(HttpRequest* req, uint64_t* tablet_id) { - const auto& req_tablet_id = req->param(TABLET_ID_KEY); - if (req_tablet_id.empty()) { - return Status::InternalError("tablet id is empty!"); - } - try { - *tablet_id = std::stoull(req_tablet_id); - } catch (const std::exception& e) { - return Status::InternalError("convert tablet_id failed, {}", e.what()); - } - return Status::OK(); -} - -Status CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req, - std::string* json_result) { - uint64_t tablet_id = 0; - // check & retrieve tablet_id from req if it contains - RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed"); - if (tablet_id == 0) { - return Status::InternalError("check param failed: missing tablet_id"); - } - - CloudTabletSPtr tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id)); - if (tablet == nullptr) { - return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); - } - - auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); - auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality(); - auto size = tablet->tablet_meta()->delete_bitmap().get_size(); - LOG(INFO) << "show_local_delete_bitmap_count,tablet_id=" << tablet_id << ",count=" << count - << ",cardinality=" << cardinality << ",size=" << size; - - rapidjson::Document root; - root.SetObject(); - root.AddMember("delete_bitmap_count", count, root.GetAllocator()); - root.AddMember("cardinality", cardinality, root.GetAllocator()); - root.AddMember("size", size, root.GetAllocator()); - - // to json string - rapidjson::StringBuffer strbuf; - rapidjson::PrettyWriter writer(strbuf); - root.Accept(writer); - *json_result = std::string(strbuf.GetString()); - - return Status::OK(); -} - -Status CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req, - std::string* json_result) { - uint64_t tablet_id = 0; - // check & retrieve tablet_id from req if it contains - RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed"); - if (tablet_id == 0) { - return Status::InternalError("check param failed: missing tablet_id"); - } - TabletMetaSharedPtr tablet_meta; - auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); - if (!st.ok()) { - LOG(WARNING) << "failed to get_tablet_meta tablet=" << tablet_id - << ", st=" << st.to_string(); - return st; - } - auto tablet = std::make_shared(_engine, std::move(tablet_meta)); - st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true); - if (!st.ok()) { - LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st; - return st; - } - auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); - auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality(); - auto size = tablet->tablet_meta()->delete_bitmap().get_size(); - LOG(INFO) << "show_ms_delete_bitmap_count,tablet_id=" << tablet_id << ",count=" << count - << ",cardinality=" << cardinality << ",size=" << size; - - rapidjson::Document root; - root.SetObject(); - root.AddMember("delete_bitmap_count", count, root.GetAllocator()); - root.AddMember("cardinality", cardinality, root.GetAllocator()); - root.AddMember("size", size, root.GetAllocator()); - - // to json string - rapidjson::StringBuffer strbuf; - rapidjson::PrettyWriter writer(strbuf); - root.Accept(writer); - *json_result = std::string(strbuf.GetString()); - - return Status::OK(); -} - -void CloudDeleteBitmapAction::handle(HttpRequest* req) { - req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data()); - if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) { - std::string json_result; - Status st = _handle_show_local_delete_bitmap_count(req, &json_result); - if (!st.ok()) { - HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); - } else { - HttpChannel::send_reply(req, HttpStatus::OK, json_result); - } - } else if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_MS) { - std::string json_result; - Status st = _handle_show_ms_delete_bitmap_count(req, &json_result); - if (!st.ok()) { - HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); - } else { - HttpChannel::send_reply(req, HttpStatus::OK, json_result); - } - } -} - -#include "common/compile_check_end.h" -} // namespace doris \ No newline at end of file diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index c044b8361b7af3..b4cf9d2e31c153 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -493,18 +493,16 @@ void CloudTablet::remove_unused_rowsets() { ++it; continue; } + tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version()); rs->clear_cache(); it = _unused_rowsets.erase(it); g_unused_rowsets_count << -1; removed_rowsets_num++; } - if (removed_rowsets_num > 0) { - LOG(INFO) << "tablet_id=" << tablet_id() - << ", unused_rowset size=" << _unused_rowsets.size() - << ", removed_rowsets_num=" << removed_rowsets_num - << ", cost(us)=" << watch.get_elapse_time_us(); - } + LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size() + << ", removed_rowsets_num=" << removed_rowsets_num + << ", cost(us)=" << watch.get_elapse_time_us(); } void CloudTablet::update_base_size(const Rowset& rs) { diff --git a/be/src/http/action/delete_bitmap_action.cpp b/be/src/http/action/delete_bitmap_action.cpp new file mode 100644 index 00000000000000..59783d1c055535 --- /dev/null +++ b/be/src/http/action/delete_bitmap_action.cpp @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "delete_bitmap_action.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/cloud_tablet_mgr.h" +#include "cloud/config.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/olap_define.h" +#include "olap/tablet_manager.h" + +namespace doris { +#include "common/compile_check_begin.h" +using namespace ErrorCode; + +namespace { + +constexpr std::string_view HEADER_JSON = "application/json"; + +} // namespace + +DeleteBitmapAction::DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, + BaseStorageEngine& engine, TPrivilegeHier::type hier, + TPrivilegeType::type ptype) + : HttpHandlerWithAuth(exec_env, hier, ptype), + _engine(engine), + _delete_bitmap_action_type(ctype) {} + +static Status _check_param(HttpRequest* req, uint64_t* tablet_id, bool* verbose) { + const auto& req_tablet_id = req->param(TABLET_ID_KEY); + if (req_tablet_id.empty()) { + return Status::InternalError("tablet id is empty!"); + } + try { + *tablet_id = std::stoull(req_tablet_id); + } catch (const std::exception& e) { + return Status::InternalError("convert tablet_id failed, {}", e.what()); + } + if (*tablet_id == 0) { + return Status::InternalError("check param failed: invalid tablet_id"); + } + *verbose = iequal(req->param("verbose"), "true"); + return Status::OK(); +} + +static void _show_delete_bitmap(DeleteBitmap& dm, bool verbose, std::string* json_result) { + auto count = dm.get_delete_bitmap_count(); + auto cardinality = dm.cardinality(); + auto size = dm.get_size(); + rapidjson::Document root; + root.SetObject(); + root.AddMember("delete_bitmap_count", count, root.GetAllocator()); + root.AddMember("cardinality", cardinality, root.GetAllocator()); + root.AddMember("size", size, root.GetAllocator()); + if (verbose) { + std::string pre_rowset_id = ""; + int64_t pre_segment_id = -1; + std::vector version_vector; + rapidjson::Document dm_arr; + dm_arr.SetObject(); + + auto add_rowset_delete_bitmap_info = [&]() { + std::string key = + "rowset: " + pre_rowset_id + ", segment: " + std::to_string(pre_segment_id); + rapidjson::Value key_value; + key_value.SetString(key.data(), static_cast(key.length()), + root.GetAllocator()); + rapidjson::Document version_arr; + version_arr.SetArray(); + for (const auto& str : version_vector) { + rapidjson::Value value; + value.SetString(str.c_str(), static_cast(str.length()), + root.GetAllocator()); + version_arr.PushBack(value, root.GetAllocator()); + } + dm_arr.AddMember(key_value, version_arr, root.GetAllocator()); + version_vector.clear(); + }; + + for (auto& [id, bitmap] : dm.delete_bitmap) { + auto& [rowset_id, segment_id, version] = id; + if (rowset_id.to_string() != pre_rowset_id || segment_id != pre_segment_id) { + // add previous result + if (!pre_rowset_id.empty()) { + add_rowset_delete_bitmap_info(); + } + pre_rowset_id = rowset_id.to_string(); + pre_segment_id = segment_id; + } + std::string str = fmt::format("v: {}, c: {}, s: {}", version, bitmap.cardinality(), + bitmap.getSizeInBytes()); + version_vector.push_back(str); + } + // add last result + if (!version_vector.empty()) { + add_rowset_delete_bitmap_info(); + } + root.AddMember("delete_bitmap", dm_arr, root.GetAllocator()); + } + + // to json string + rapidjson::StringBuffer strbuf; + rapidjson::PrettyWriter writer(strbuf); + root.Accept(writer); + *json_result = std::string(strbuf.GetString()); +} + +Status DeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req, + std::string* json_result) { + uint64_t tablet_id = 0; + bool verbose = false; + RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &verbose), "check param failed"); + + BaseTabletSPtr tablet = nullptr; + if (config::is_cloud_mode()) { + tablet = DORIS_TRY(_engine.to_cloud().tablet_mgr().get_tablet(tablet_id)); + DBUG_EXECUTE_IF( + "DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets", + { _engine.to_cloud().tablet_mgr().vacuum_stale_rowsets(CountDownLatch(1)); }); + } else { + tablet = _engine.to_local().tablet_manager()->get_tablet(tablet_id); + DBUG_EXECUTE_IF( + "DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_" + "rowset", + { _engine.to_local().start_delete_unused_rowset(); }); + } + if (tablet == nullptr) { + return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); + } + auto dm = tablet->tablet_meta()->delete_bitmap().snapshot(); + _show_delete_bitmap(dm, verbose, json_result); + return Status::OK(); +} + +Status DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req, + std::string* json_result) { + uint64_t tablet_id = 0; + bool verbose = false; + RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &verbose), "check param failed"); + + TabletMetaSharedPtr tablet_meta; + auto st = _engine.to_cloud().meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); + if (!st.ok()) { + LOG(WARNING) << "failed to get_tablet_meta for tablet=" << tablet_id + << ", st=" << st.to_string(); + return st; + } + auto tablet = std::make_shared(_engine.to_cloud(), std::move(tablet_meta)); + st = _engine.to_cloud().meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true); + if (!st.ok()) { + LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st; + return st; + } + auto dm = tablet->tablet_meta()->delete_bitmap().snapshot(); + _show_delete_bitmap(dm, verbose, json_result); + return Status::OK(); +} + +void DeleteBitmapAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data()); + if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) { + std::string json_result; + Status st = _handle_show_local_delete_bitmap_count(req, &json_result); + if (!st.ok()) { + HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); + } else { + HttpChannel::send_reply(req, HttpStatus::OK, json_result); + } + } else if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_MS) { + std::string json_result; + Status st = _handle_show_ms_delete_bitmap_count(req, &json_result); + if (!st.ok()) { + HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); + } else { + HttpChannel::send_reply(req, HttpStatus::OK, json_result); + } + } +} + +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/cloud/cloud_delete_bitmap_action.h b/be/src/http/action/delete_bitmap_action.h similarity index 79% rename from be/src/cloud/cloud_delete_bitmap_action.h rename to be/src/http/action/delete_bitmap_action.h index ce507ee9991757..284e8dbcf5705b 100644 --- a/be/src/cloud/cloud_delete_bitmap_action.h +++ b/be/src/http/action/delete_bitmap_action.h @@ -21,9 +21,9 @@ #include -#include "cloud/cloud_storage_engine.h" #include "common/status.h" #include "http/http_handler_with_auth.h" +#include "olap/storage_engine.h" #include "olap/tablet.h" namespace doris { @@ -35,13 +35,12 @@ class ExecEnv; enum class DeleteBitmapActionType { COUNT_LOCAL = 1, COUNT_MS = 2 }; /// This action is used for viewing the delete bitmap status -class CloudDeleteBitmapAction : public HttpHandlerWithAuth { +class DeleteBitmapAction : public HttpHandlerWithAuth { public: - CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, - CloudStorageEngine& engine, TPrivilegeHier::type hier, - TPrivilegeType::type ptype); + DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, BaseStorageEngine& engine, + TPrivilegeHier::type hier, TPrivilegeType::type ptype); - ~CloudDeleteBitmapAction() override = default; + ~DeleteBitmapAction() override = default; void handle(HttpRequest* req) override; @@ -50,7 +49,7 @@ class CloudDeleteBitmapAction : public HttpHandlerWithAuth { Status _handle_show_ms_delete_bitmap_count(HttpRequest* req, std::string* json_result); private: - CloudStorageEngine& _engine; + BaseStorageEngine& _engine; DeleteBitmapActionType _delete_bitmap_action_type; }; #include "common/compile_check_end.h" diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 9b173db26fb1e1..6dc6296d2083f5 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1231,6 +1231,8 @@ Status CompactionMixin::modify_rowsets() { LOG(WARNING) << "failed to remove old version delete bitmap, st: " << st; } } + DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset", + { tablet()->delete_expired_stale_rowset(); }); return Status::OK(); } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 7188402e95612e..02234b326aad9f 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -944,15 +944,28 @@ void StorageEngine::_clean_unused_rowset_metas() { for (auto data_dir : data_dirs) { static_cast( RowsetMetaManager::traverse_rowset_metas(data_dir->get_meta(), clean_rowset_func)); + // 1. delete delete_bitmap + std::set tablets_to_save_meta; for (auto& rowset_meta : invalid_rowset_metas) { - static_cast(RowsetMetaManager::remove( - data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id())); TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id()); if (tablet && tablet->tablet_meta()->enable_unique_key_merge_on_write()) { - tablet->tablet_meta()->delete_bitmap().remove_rowset_cache_version( - rowset_meta->rowset_id()); + tablet->tablet_meta()->remove_rowset_delete_bitmap(rowset_meta->rowset_id(), + rowset_meta->version()); + tablets_to_save_meta.emplace(tablet->tablet_id()); } } + for (const auto& tablet_id : tablets_to_save_meta) { + auto tablet = _tablet_manager->get_tablet(tablet_id); + if (tablet) { + std::shared_lock rlock(tablet->get_header_lock()); + tablet->save_meta(); + } + } + // 2. delete rowset meta + for (auto& rowset_meta : invalid_rowset_metas) { + static_cast(RowsetMetaManager::remove( + data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id())); + } LOG(INFO) << "remove " << invalid_rowset_metas.size() << " invalid rowset meta from dir: " << data_dir->path(); invalid_rowset_metas.clear(); @@ -1186,6 +1199,7 @@ void StorageEngine::_parse_default_rowset_type() { } void StorageEngine::start_delete_unused_rowset() { + DBUG_EXECUTE_IF("StorageEngine::start_delete_unused_rowset.block", DBUG_BLOCK); LOG(INFO) << "start to delete unused rowset, size: " << _unused_rowsets.size(); std::vector unused_rowsets_copy; unused_rowsets_copy.reserve(_unused_rowsets.size()); @@ -1218,20 +1232,27 @@ void StorageEngine::start_delete_unused_rowset() { << due_to_use_count << " rowsets due to use count > 1, skipped " << due_to_not_delete_file << " rowsets due to don't need to delete file, skipped " << due_to_delayed_expired_ts << " rowsets due to delayed expired timestamp."; + std::set tablets_to_save_meta; for (auto&& rs : unused_rowsets_copy) { VLOG_NOTICE << "start to remove rowset:" << rs->rowset_id() << ", version:" << rs->version(); // delete delete_bitmap of unused rowsets if (auto tablet = _tablet_manager->get_tablet(rs->rowset_meta()->tablet_id()); tablet && tablet->enable_unique_key_merge_on_write()) { - tablet->tablet_meta()->delete_bitmap().remove({rs->rowset_id(), 0, 0}, - {rs->rowset_id(), UINT32_MAX, 0}); - tablet->tablet_meta()->delete_bitmap().remove_rowset_cache_version(rs->rowset_id()); + tablet->tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version()); + tablets_to_save_meta.emplace(tablet->tablet_id()); } Status status = rs->remove(); unused_rowsets_counter << -1; VLOG_NOTICE << "remove rowset:" << rs->rowset_id() << " finished. status:" << status; } + for (const auto& tablet_id : tablets_to_save_meta) { + auto tablet = _tablet_manager->get_tablet(tablet_id); + if (tablet) { + std::shared_lock rlock(tablet->get_header_lock()); + tablet->save_meta(); + } + } LOG(INFO) << "removed all collected unused rowsets"; } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 9ff7ec74edaf31..5c4770e3a3344f 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -861,6 +861,8 @@ void Tablet::delete_expired_stale_rowset() { if (config::enable_mow_verbose_log) { LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id()); } + DBUG_EXECUTE_IF("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset", + { _engine.start_delete_unused_rowset(); }); } Status Tablet::capture_consistent_versions_unlocked(const Version& spec_version, diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 5516196641d092..3a0ff3419ee09c 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -453,6 +453,18 @@ void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tco } } +void TabletMeta::remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version& version) { + if (_enable_unique_key_merge_on_write) { + delete_bitmap().remove({rowset_id, 0, 0}, {rowset_id, UINT32_MAX, 0}); + if (config::enable_mow_verbose_log) { + LOG_INFO("delete rowset delete bitmap. tablet={}, rowset={}, version={}", tablet_id(), + rowset_id.to_string(), version.to_string()); + } + size_t rowset_cache_version_size = delete_bitmap().remove_rowset_cache_version(rowset_id); + _check_mow_rowset_cache_version_size(rowset_cache_version_size); + } +} + Status TabletMeta::create_from_file(const string& file_path) { TabletMetaPB tablet_meta_pb; RETURN_IF_ERROR(load_from_file(file_path, &tablet_meta_pb)); @@ -943,28 +955,14 @@ void TabletMeta::revise_delete_bitmap_unlocked(const DeleteBitmap& delete_bitmap } void TabletMeta::delete_stale_rs_meta_by_version(const Version& version) { - size_t rowset_cache_version_size = 0; auto it = _stale_rs_metas.begin(); while (it != _stale_rs_metas.end()) { if ((*it)->version() == version) { - if (_enable_unique_key_merge_on_write) { - // remove rowset delete bitmap - delete_bitmap().remove({(*it)->rowset_id(), 0, 0}, - {(*it)->rowset_id(), UINT32_MAX, 0}); - rowset_cache_version_size = - delete_bitmap().remove_rowset_cache_version((*it)->rowset_id()); - if (config::enable_mow_verbose_log) { - LOG_INFO( - "delete stale rowset's delete bitmap. tablet={}, version={}, rowset={}", - tablet_id(), version.to_string(), (*it)->rowset_id().to_string()); - } - } it = _stale_rs_metas.erase(it); } else { it++; } } - _check_mow_rowset_cache_version_size(rowset_cache_version_size); } RowsetMetaSharedPtr TabletMeta::acquire_rs_meta_by_version(const Version& version) const { diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index a0a2ab3d32144f..388ddc439dc31e 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -242,6 +242,7 @@ class TabletMeta : public MetadataAdder { ColumnPB* column); DeleteBitmap& delete_bitmap() { return *_delete_bitmap; } + void remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version& version); bool enable_unique_key_merge_on_write() const { return _enable_unique_key_merge_on_write; } diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 43506800c43c69..32ea86b09b1d92 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -25,7 +25,6 @@ #include #include "cloud/cloud_compaction_action.h" -#include "cloud/cloud_delete_bitmap_action.h" #include "cloud/config.h" #include "cloud/injection_point_action.h" #include "common/config.h" @@ -43,6 +42,7 @@ #include "http/action/compaction_score_action.h" #include "http/action/config_action.h" #include "http/action/debug_point_action.h" +#include "http/action/delete_bitmap_action.h" #include "http/action/download_action.h" #include "http/action/download_binlog_action.h" #include "http/action/file_cache_action.h" @@ -384,6 +384,13 @@ void HttpService::register_local_handler(StorageEngine& engine) { _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status", run_status_compaction_action); + + DeleteBitmapAction* count_delete_bitmap_action = + _pool.add(new DeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine, + TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count_local", + count_delete_bitmap_action); + CheckTabletSegmentAction* check_tablet_segment_action = _pool.add(new CheckTabletSegmentAction( _env, engine, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::POST, "/api/check_tablet_segment_lost", @@ -432,14 +439,14 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) { TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status", run_status_compaction_action); - CloudDeleteBitmapAction* count_local_delete_bitmap_action = - _pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine, - TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); + DeleteBitmapAction* count_local_delete_bitmap_action = + _pool.add(new DeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine, + TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count_local", count_local_delete_bitmap_action); - CloudDeleteBitmapAction* count_ms_delete_bitmap_action = - _pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_MS, _env, engine, - TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); + DeleteBitmapAction* count_ms_delete_bitmap_action = + _pool.add(new DeleteBitmapAction(DeleteBitmapActionType::COUNT_MS, _env, engine, + TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count_ms", count_ms_delete_bitmap_action); #ifdef ENABLE_INJECTION_POINT diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 18b3571299f304..0970ded03e9850 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -372,6 +372,8 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); } + DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK); + if (!_state->skip_storage_engine_merge()) { TOlapScanNode& olap_scan_node = ((pipeline::OlapScanLocalState*)_local_state)->olap_scan_node(); diff --git a/regression-test/data/compaction/test_mow_delete_unused_rowset_dm_docker.out b/regression-test/data/compaction/test_mow_delete_unused_rowset_dm_docker.out new file mode 100644 index 00000000000000..9896795dfd4231 --- /dev/null +++ b/regression-test/data/compaction/test_mow_delete_unused_rowset_dm_docker.out @@ -0,0 +1,22 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +1 99 +2 99 +3 99 +4 99 +5 99 + +-- !sql2 -- +1 101 +2 100 +3 100 +4 100 +5 100 + +-- !sql3 -- +1 101 +2 100 +3 100 +4 100 +5 100 + diff --git a/regression-test/data/compaction/test_schema_change_add_key_column.csv.gz b/regression-test/data/compaction/test_schema_change_add_key_column.csv.gz new file mode 100644 index 00000000000000..bc9d3dd70ea8a5 Binary files /dev/null and b/regression-test/data/compaction/test_schema_change_add_key_column.csv.gz differ diff --git a/regression-test/data/compaction/test_schema_change_add_key_column1.csv.gz b/regression-test/data/compaction/test_schema_change_add_key_column1.csv.gz new file mode 100644 index 00000000000000..83f1ecd41c002e Binary files /dev/null and b/regression-test/data/compaction/test_schema_change_add_key_column1.csv.gz differ diff --git a/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy b/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy new file mode 100644 index 00000000000000..f286a2becaec49 --- /dev/null +++ b/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy @@ -0,0 +1,253 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// when compaction for one rowsets with multiple segments, the delete bitmap can be deleted +suite("test_mow_compact_multi_segments", "nonConcurrent") { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + def tableName = "test_mow_compact_multi_segments" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def getTabletStatus = { tablet, rowsetIndex, lastRowsetSegmentNum, enableAssert = false -> + String compactionUrl = tablet["CompactionStatus"] + def (code, out, err) = curl("GET", compactionUrl) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + assertTrue(tabletJson.rowsets.size() >= rowsetIndex) + def rowset = tabletJson.rowsets.get(rowsetIndex - 1) + logger.info("rowset: ${rowset}") + int start_index = rowset.indexOf("]") + int end_index = rowset.indexOf("DATA") + def segmentNumStr = rowset.substring(start_index + 1, end_index).trim() + logger.info("segmentNumStr: ${segmentNumStr}") + if (enableAssert) { + assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr)) + } else { + return lastRowsetSegmentNum == Integer.parseInt(segmentNumStr); + } + } + + def getLocalDeleteBitmapStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + def waitForCompaction = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + def running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // batch_size is 4164 in csv_reader.cpp + // _batch_size is 8192 in vtablet_writer.cpp + onFinish { + GetDebugPoint().clearDebugPointsForAllBEs() + reset_be_param("doris_scanner_row_bytes") + reset_be_param("tablet_rowset_stale_sweep_time_sec") + } + GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush") + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset") + GetDebugPoint().enableDebugPointForAllBEs("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset") + get_be_param("doris_scanner_row_bytes") + set_be_param("doris_scanner_row_bytes", "1") + get_be_param("tablet_rowset_stale_sweep_time_sec") + set_be_param("tablet_rowset_stale_sweep_time_sec", "0") + + tableName = "test_compact_multi_segments_" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `v3` int(11) NULL, + `v4` int(11) NULL + ) unique KEY(`k1`, `k2`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + assertEquals(1, tablets.size()) + def tablet = tablets[0] + String tablet_id = tablet.TabletId + def backend_id = tablet.BackendId + + // load 1 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'compress_type', 'GZ' + file 'test_schema_change_add_key_column.csv.gz' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(8192, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + sql "sync" + def rowCount1 = sql """ select count() from ${tableName}; """ + logger.info("rowCount1: ${rowCount1}") + // check generate 3 segments + getTabletStatus(tablet, 2, 3) + + // trigger compaction + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id: "${tablet.TabletId}", start_version: 2, end_version: 2]) + def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + logger.info("compact json: " + compactJson) + // check generate 1 segments + for (int i = 0; i < 20; i++) { + if (getTabletStatus(tablet, 2, 1, false)) { + break + } + sleep(100) + } + getTabletStatus(tablet, 2, 1) + sql """ select * from ${tableName} limit 1; """ + + // load 2 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'compress_type', 'GZ' + file 'test_schema_change_add_key_column1.csv.gz' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20480, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + sql "sync" + def rowCount2 = sql """ select count() from ${tableName}; """ + logger.info("rowCount2: ${rowCount2}") + // check generate 3 segments + getTabletStatus(tablet, 3, 6) + def local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local delete bitmap 1: " + local_dm) + + // trigger compaction for load 2 + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id: "${tablet.TabletId}", start_version: 3, end_version: 3]) + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + compactJson = parseJson(out.trim()) + logger.info("compact json: " + compactJson) + waitForCompaction(tablet) + // check generate 1 segments + for (int i = 0; i < 20; i++) { + if (getTabletStatus(tablet, 3, 1, false)) { + break + } + sleep(100) + } + getTabletStatus(tablet, 3, 1) + + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets") // cloud + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") // local + local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local delete bitmap 2: " + local_dm) + assertEquals(1, local_dm["delete_bitmap_count"]) +} diff --git a/regression-test/suites/compaction/test_mow_delete_unused_rowset_dm_docker.groovy b/regression-test/suites/compaction/test_mow_delete_unused_rowset_dm_docker.groovy new file mode 100644 index 00000000000000..53bc4f10e51e36 --- /dev/null +++ b/regression-test/suites/compaction/test_mow_delete_unused_rowset_dm_docker.groovy @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.atomic.AtomicBoolean +import org.apache.doris.regression.suite.ClusterOptions +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_mow_delete_unused_rowset_dm_docker", "docker") { + logger.info("test_mow_delete_unused_rowset_dm_docker") + def options = new ClusterOptions() + options.cloudMode = false + options.setFeNum(1) + options.setBeNum(1) + options.enableDebugPoints() + options.feConfigs.add("enable_workload_group=false") + // beConfigs + options.beConfigs.add('compaction_promotion_version_count=5') + options.beConfigs.add('tablet_rowset_stale_sweep_time_sec=0') + options.beConfigs.add('enable_mow_verbose_log=true') + options.beConfigs.add('enable_java_support=false') + + def testTable = "test_mow_delete_unused_rowset_dm_docker" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + + def triggerCompaction = { tablet -> + def compact_type = "cumulative" + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def waitForCompaction = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + def running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def getLocalDeleteBitmapStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + docker(options) { + sql """ DROP TABLE IF EXISTS ${testTable} """ + sql """ + create table ${testTable} (`k` int NOT NULL, `v` int NOT NULL) + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + logger.info("tablets: " + tablets) + assertEquals(1, tablets.size()) + def tablet = tablets[0] + + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset") + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") + + // 1. write some data + sql """ INSERT INTO ${testTable} VALUES (1,98); """ + sql """ INSERT INTO ${testTable} VALUES (1,99),(2,99); """ + sql """ INSERT INTO ${testTable} VALUES (3,99); """ + sql """ INSERT INTO ${testTable} VALUES (4,99); """ + sql """ INSERT INTO ${testTable} VALUES (5,99); """ + sql "sync" + order_qt_sql1 """ select * from ${testTable}; """ + + // 2. trigger compaction to generate base rowset + getTabletStatus(tablet) + assertTrue(triggerCompaction(tablet).contains("Success")) + waitForCompaction(tablet) + def tablet_status = getTabletStatus(tablet) + assertEquals(2, tablet_status["rowsets"].size()) + + // 3. wait for no delete bitmap and no stale rowsets + def local_dm = getLocalDeleteBitmapStatus(tablet) + assertEquals(0, local_dm["delete_bitmap_count"]) + tablet_status = getTabletStatus(tablet) + assertEquals(0, tablet_status["stale_rowsets"].size()) + + // 3. write some data + sql """ INSERT INTO ${testTable} VALUES (1,100); """ + sql """ INSERT INTO ${testTable} VALUES (1,101),(2,100); """ + sql """ INSERT INTO ${testTable} VALUES (3,100); """ + sql """ INSERT INTO ${testTable} VALUES (4,100); """ + sql """ INSERT INTO ${testTable} VALUES (5,100); """ + sql """ sync """ + order_qt_sql2 "select * from ${testTable}" + tablet_status = getTabletStatus(tablet) + assertEquals(7, tablet_status["rowsets"].size()) + + // 4. trigger compaction + GetDebugPoint().enableDebugPointForAllBEs("StorageEngine::start_delete_unused_rowset.block") + assertTrue(triggerCompaction(tablet).contains("Success")) + waitForCompaction(tablet) + tablet_status = getTabletStatus(tablet) + assertEquals(3, tablet_status["rowsets"].size()) + + // 5. block delete unused rowset, there are delete bitmaps; wait for no stale rowsets + GetDebugPoint().disableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") + local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local_dm 1: " + local_dm) + assertEquals(6, local_dm["delete_bitmap_count"]) + tablet_status = getTabletStatus(tablet) + assertEquals(0, tablet_status["stale_rowsets"].size()) + + // 6. restart be. check delete bitmap count + cluster.restartBackends() + tablet_status = getTabletStatus(tablet) + logger.info("tablet status after restart: " + tablet_status) + for (int i = 0; i < 300; i++) { + local_dm = getLocalDeleteBitmapStatus(tablet) + if (local_dm["delete_bitmap_count"] == 5) { + break + } + sleep(20) + } + local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local_dm 2: " + local_dm) + assertEquals(5, local_dm["delete_bitmap_count"]) + order_qt_sql3 """ select * from ${testTable}; """ + + // 7. restart be to check to the deleted delete bitmap is stored to local storage + cluster.restartBackends() + tablet_status = getTabletStatus(tablet) + logger.info("tablet status after restart2: " + tablet_status) + for (int i = 0; i < 300; i++) { + local_dm = getLocalDeleteBitmapStatus(tablet) + if (local_dm["delete_bitmap_count"] == 5) { + break + } + sleep(20) + } + local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local_dm 3: " + local_dm) + assertEquals(5, local_dm["delete_bitmap_count"]) + } +} diff --git a/regression-test/suites/compaction/test_mow_stale_rowset_delete_bitmap.groovy b/regression-test/suites/compaction/test_mow_stale_rowset_delete_bitmap.groovy new file mode 100644 index 00000000000000..b91a19784e6eb6 --- /dev/null +++ b/regression-test/suites/compaction/test_mow_stale_rowset_delete_bitmap.groovy @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.atomic.AtomicBoolean; +import org.codehaus.groovy.runtime.IOGroovyMethods + +// when move rowsets from stale to unused, the delete bitmap are not deleted +// when delete unused rowsets, the delete bitmap are deleted +suite("test_mow_stale_rowset_delete_bitmap", "nonConcurrent") { + def testTable = "test_mow_stale_rowset_delete_bitmap" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def triggerCompaction = { tablet -> + def compact_type = "cumulative" + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def waitForCompaction = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + def running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def getLocalDeleteBitmapStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + AtomicBoolean query_result = new AtomicBoolean(true) + def query = { + logger.info("query start") + def results = sql_return_maparray """ select * from ${testTable}; """ + logger.info("query result: " + results) + Set keys = new HashSet<>() + for (final def result in results) { + if (keys.contains(result.k)) { + logger.info("find duplicate key: " + result.k) + query_result.set(false) + break + } + keys.add(result.k) + } + logger.info("query finish. query_result: " + query_result.get()) + } + + sql """ DROP TABLE IF EXISTS ${testTable} """ + sql """ + create table ${testTable} (`k` int NOT NULL, `v` int NOT NULL) + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + logger.info("tablets: " + tablets) + assertEquals(1, tablets.size()) + def tablet = tablets[0] + + try { + GetDebugPoint().clearDebugPointsForAllBEs() + get_be_param("tablet_rowset_stale_sweep_time_sec") + set_be_param("tablet_rowset_stale_sweep_time_sec", "0") + + // write some data + sql """ INSERT INTO ${testTable} VALUES (1,99); """ + sql """ INSERT INTO ${testTable} VALUES (2,99); """ + sql """ INSERT INTO ${testTable} VALUES (3,99); """ + sql """ INSERT INTO ${testTable} VALUES (4,99),(5,0); """ + sql """ INSERT INTO ${testTable} VALUES (5,99); """ + sql "sync" + getTabletStatus(tablet) + getLocalDeleteBitmapStatus(tablet) + + // trigger and block one query + GetDebugPoint().enableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block") + Thread query_thread = new Thread(() -> query()) + query_thread.start() + sleep(100) + + // trigger compaction + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset") + GetDebugPoint().enableDebugPointForAllBEs("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset") + assertTrue(triggerCompaction(tablet).contains("Success")) + waitForCompaction(tablet) + // wait for stale rowsets are deleted + for (int i = 0; i < 10; i++) { + def tablet_status = getTabletStatus(tablet) + if (tablet_status["rowsets"].size() <= 2 && tablet_status["stale_rowsets"].size() == 0) { + break + } + sleep(200) + } + getLocalDeleteBitmapStatus(tablet) + + // unblock the query + GetDebugPoint().disableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block") + query_thread.join() + assertTrue(query_result.get(), "found duplicated keys") + + // wait for delete bitmap of unused rowsets are deleted + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets") // cloud + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") // local + for (int i = 0; i < 20; i++) { + def local_delete_bitmap_status = getLocalDeleteBitmapStatus(tablet) + if (local_delete_bitmap_status["delete_bitmap_count"] == 0) { + break + } + sleep(100) + } + } finally { + reset_be_param("tablet_rowset_stale_sweep_time_sec") + GetDebugPoint().clearDebugPointsForAllBEs() + } +}