diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index c83be20baf8d8f..857515e6c1d72b 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -342,6 +342,17 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ // replace existed rowset with `to_add` rowset. This may occur when: // 1. schema change converts rowsets which have been double written to new tablet // 2. cumu compaction picks single overlapping input rowset to perform compaction + if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { + // add existed rowset to unused_rowsets to remove delete bitmap + if (auto find_it = _rs_version_map.find(rs->version()); + find_it != _rs_version_map.end()) { + DCHECK(find_it->second->rowset_id() != rs->rowset_id()) + << "tablet_id=" << tablet_id() + << ", rowset_id=" << rs->rowset_id().to_string() + << ", existed rowset=" << find_it->second->rowset_id().to_string(); + _unused_rowsets.emplace(find_it->second->rowset_id(), find_it->second); + } + } _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr); _rs_version_map[rs->version()] = rs; _tablet_meta->add_rowsets_unchecked({rs}); @@ -459,6 +470,14 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() { if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write() && !deleted_stale_rowsets.empty()) { + // record expired rowsets in unused rowsets + { + std::lock_guard lock(_gc_mutex); + for (const auto& rowset : expired_rowsets) { + _unused_rowsets.emplace(rowset->rowset_id(), rowset); + } + } + // agg delete bitmap for pre rowsets; record unused delete bitmap key ranges OlapStopWatch watch; for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) { @@ -467,9 +486,13 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() { agg_delete_bitmap_for_stale_rowsets(version, remove_delete_bitmap_key_ranges); // add remove delete bitmap if (!remove_delete_bitmap_key_ranges.empty()) { + std::vector rowset_ids; + for (const auto& rs : unused_rowsets) { + rowset_ids.push_back(rs->rowset_id()); + } std::lock_guard lock(_gc_mutex); _unused_delete_bitmap.push_back( - std::make_pair(unused_rowsets, remove_delete_bitmap_key_ranges)); + std::make_pair(rowset_ids, remove_delete_bitmap_key_ranges)); } } LOG(INFO) << "agg pre rowsets delete bitmap. tablet_id=" << tablet_id() @@ -479,23 +502,34 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() { return expired_rowsets.size(); } -bool CloudTablet::need_remove_pre_rowset_delete_bitmap() { +bool CloudTablet::need_remove_unused_rowsets() { std::lock_guard lock(_gc_mutex); - return !_unused_delete_bitmap.empty(); + return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty(); } -void CloudTablet::remove_pre_rowset_delete_bitmap() { +void CloudTablet::remove_unused_rowsets() { std::lock_guard lock(_gc_mutex); + // 1. remove unused rowsets and delete bitmap + for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) { + auto&& rs = it->second; + if (rs.use_count() > 1) { + LOG(WARNING) << "Rowset " << rs->rowset_id() << " has " << rs.use_count() + << " references. Can not remove delete bitmap."; + ++it; + continue; + } + tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version()); + it = _unused_rowsets.erase(it); + } + + // 2. remove delete bitmap of pre rowsets for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) { - auto& rowsets = std::get<0>(*it); + auto& rowset_ids = std::get<0>(*it); bool find_unused_rowset = false; - for (const auto& rowset : rowsets) { - if (rowset.use_count() > 1) { + for (const auto& rowset_id : rowset_ids) { + if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) { LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use" - << ", tablet_id=" << tablet_id() - << ", rowset_id=" << rowset->rowset_id().to_string() - << ", version=" << rowset->version().to_string() - << ", use_count=" << rowset.use_count(); + << ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id; find_unused_rowset = true; break; } @@ -508,8 +542,10 @@ void CloudTablet::remove_pre_rowset_delete_bitmap() { tablet_meta()->delete_bitmap().remove(key_ranges); it = _unused_delete_bitmap.erase(it); } - if (!_unused_delete_bitmap.empty()) { + + if (!_unused_rowsets.empty() || !_unused_delete_bitmap.empty()) { LOG(INFO) << "tablet_id=" << tablet_id() + << ", unused_rowset size=" << _unused_rowsets.size() << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size(); } } diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 22be1d1fabc42b..d63506ddde0e55 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -275,8 +275,8 @@ class CloudTablet final : public BaseTablet { DeleteBitmapPtr& new_delete_bitmap, std::map& pre_rowset_to_versions); - bool need_remove_pre_rowset_delete_bitmap(); - void remove_pre_rowset_delete_bitmap(); + bool need_remove_unused_rowsets(); + void remove_unused_rowsets(); private: // FIXME(plat1ko): No need to record base size if rowsets are ordered by version @@ -340,8 +340,8 @@ class CloudTablet final : public BaseTablet { // unused_rowsets, [start_version, end_version] std::mutex _gc_mutex; - std::vector, DeleteBitmapKeyRanges>> - _unused_delete_bitmap; + std::unordered_map _unused_rowsets; + std::vector, DeleteBitmapKeyRanges>> _unused_delete_bitmap; }; using CloudTabletSPtr = std::shared_ptr; diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 6c3a671f18423f..376a935335f5d8 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -266,19 +266,19 @@ void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) { .tag("num_tablets", tablets_to_vacuum.size()); { - LOG_INFO("begin to remove pre rowsets delete bitmap"); - std::vector> tablets_to_remove_delete_bitmap; - tablets_to_remove_delete_bitmap.reserve(_tablet_map->size()); - _tablet_map->traverse([&tablets_to_remove_delete_bitmap](auto&& t) { - if (t->need_remove_pre_rowset_delete_bitmap()) { - tablets_to_remove_delete_bitmap.push_back(t); + LOG_INFO("begin to remove unused rowsets"); + std::vector> tablets_to_remove_unused_rowsets; + tablets_to_remove_unused_rowsets.reserve(_tablet_map->size()); + _tablet_map->traverse([&tablets_to_remove_unused_rowsets](auto&& t) { + if (t->need_remove_unused_rowsets()) { + tablets_to_remove_unused_rowsets.push_back(t); } }); - for (auto& t : tablets_to_remove_delete_bitmap) { - t->remove_pre_rowset_delete_bitmap(); + for (auto& t : tablets_to_remove_unused_rowsets) { + t->remove_unused_rowsets(); } - LOG_INFO("finish remove pre rowsets delete bitmap") - .tag("num_tablets", tablets_to_remove_delete_bitmap.size()); + LOG_INFO("finish remove unused rowsets") + .tag("num_tablets", tablets_to_remove_unused_rowsets.size()); if (config::enable_check_agg_and_remove_pre_rowsets_delete_bitmap) { int64_t max_useless_rowset_count = 0; int64_t tablet_id_with_max_useless_rowset_count = 0; diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index c7eafd7fd46d4d..792edace512e9a 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1832,6 +1832,8 @@ void BaseTablet::agg_delete_bitmap_for_stale_rowsets( remove_delete_bitmap_key_ranges.emplace_back(start_key, end_key); } } + DBUG_EXECUTE_IF("BaseTablet.agg_delete_bitmap_for_stale_rowsets.merge_delete_bitmap.block", + DBUG_BLOCK); tablet_meta()->delete_bitmap().merge(*new_delete_bitmap); } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 55cf21c4e17df0..5229965cd5e13d 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -950,15 +950,28 @@ void StorageEngine::_clean_unused_rowset_metas() { for (auto data_dir : data_dirs) { static_cast( RowsetMetaManager::traverse_rowset_metas(data_dir->get_meta(), clean_rowset_func)); + // 1. delete delete_bitmap + std::set tablets_to_save_meta; for (auto& rowset_meta : invalid_rowset_metas) { - static_cast(RowsetMetaManager::remove( - data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id())); TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id()); if (tablet && tablet->tablet_meta()->enable_unique_key_merge_on_write()) { - tablet->tablet_meta()->delete_bitmap().remove_rowset_cache_version( - rowset_meta->rowset_id()); + tablet->tablet_meta()->remove_rowset_delete_bitmap(rowset_meta->rowset_id(), + rowset_meta->version()); + tablets_to_save_meta.emplace(tablet->tablet_id()); + } + } + for (const auto& tablet_id : tablets_to_save_meta) { + auto tablet = _tablet_manager->get_tablet(tablet_id); + if (tablet) { + std::shared_lock rlock(tablet->get_header_lock()); + tablet->save_meta(); } } + // 2. delete rowset meta + for (auto& rowset_meta : invalid_rowset_metas) { + static_cast(RowsetMetaManager::remove( + data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id())); + } LOG(INFO) << "remove " << invalid_rowset_metas.size() << " invalid rowset meta from dir: " << data_dir->path(); invalid_rowset_metas.clear(); @@ -1192,6 +1205,7 @@ void StorageEngine::_parse_default_rowset_type() { } void StorageEngine::start_delete_unused_rowset() { + DBUG_EXECUTE_IF("StorageEngine::start_delete_unused_rowset.block", DBUG_BLOCK); LOG(INFO) << "start to delete unused rowset, size: " << _unused_rowsets.size() << ", unused delete bitmap size: " << _unused_delete_bitmap.size(); std::vector unused_rowsets_copy; @@ -1250,13 +1264,6 @@ void StorageEngine::start_delete_unused_rowset() { it = _unused_delete_bitmap.erase(it); } } - for (const auto& tablet_id : tablets_to_save_meta) { - auto tablet = _tablet_manager->get_tablet(tablet_id); - if (tablet) { - std::shared_lock rlock(tablet->get_header_lock()); - tablet->save_meta(); - } - } LOG(INFO) << "collected " << unused_rowsets_copy.size() << " unused rowsets to remove, skipped " << due_to_use_count << " rowsets due to use count > 1, skipped " << due_to_not_delete_file << " rowsets due to don't need to delete file, skipped " @@ -1268,14 +1275,20 @@ void StorageEngine::start_delete_unused_rowset() { // delete delete_bitmap of unused rowsets if (auto tablet = _tablet_manager->get_tablet(rs->rowset_meta()->tablet_id()); tablet && tablet->enable_unique_key_merge_on_write()) { - tablet->tablet_meta()->delete_bitmap().remove({rs->rowset_id(), 0, 0}, - {rs->rowset_id(), UINT32_MAX, 0}); - tablet->tablet_meta()->delete_bitmap().remove_rowset_cache_version(rs->rowset_id()); + tablet->tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version()); + tablets_to_save_meta.emplace(tablet->tablet_id()); } Status status = rs->remove(); unused_rowsets_counter << -1; VLOG_NOTICE << "remove rowset:" << rs->rowset_id() << " finished. status:" << status; } + for (const auto& tablet_id : tablets_to_save_meta) { + auto tablet = _tablet_manager->get_tablet(tablet_id); + if (tablet) { + std::shared_lock rlock(tablet->get_header_lock()); + tablet->save_meta(); + } + } LOG(INFO) << "removed all collected unused rowsets"; } diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 354cfa85ed3f6c..13486fa581f311 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -455,6 +455,18 @@ void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tco } } +void TabletMeta::remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version& version) { + if (_enable_unique_key_merge_on_write) { + delete_bitmap().remove({rowset_id, 0, 0}, {rowset_id, UINT32_MAX, 0}); + if (config::enable_mow_verbose_log) { + LOG_INFO("delete rowset delete bitmap. tablet={}, rowset={}, version={}", tablet_id(), + rowset_id.to_string(), version.to_string()); + } + size_t rowset_cache_version_size = delete_bitmap().remove_rowset_cache_version(rowset_id); + _check_mow_rowset_cache_version_size(rowset_cache_version_size); + } +} + Status TabletMeta::create_from_file(const string& file_path) { TabletMetaPB tablet_meta_pb; RETURN_IF_ERROR(load_from_file(file_path, &tablet_meta_pb)); @@ -945,28 +957,14 @@ void TabletMeta::revise_delete_bitmap_unlocked(const DeleteBitmap& delete_bitmap } void TabletMeta::delete_stale_rs_meta_by_version(const Version& version) { - size_t rowset_cache_version_size = 0; auto it = _stale_rs_metas.begin(); while (it != _stale_rs_metas.end()) { if ((*it)->version() == version) { - if (_enable_unique_key_merge_on_write) { - // remove rowset delete bitmap - delete_bitmap().remove({(*it)->rowset_id(), 0, 0}, - {(*it)->rowset_id(), UINT32_MAX, 0}); - rowset_cache_version_size = - delete_bitmap().remove_rowset_cache_version((*it)->rowset_id()); - if (config::enable_mow_verbose_log) { - LOG_INFO( - "delete stale rowset's delete bitmap. tablet={}, version={}, rowset={}", - tablet_id(), version.to_string(), (*it)->rowset_id().to_string()); - } - } it = _stale_rs_metas.erase(it); } else { it++; } } - _check_mow_rowset_cache_version_size(rowset_cache_version_size); } RowsetMetaSharedPtr TabletMeta::acquire_rs_meta_by_version(const Version& version) const { diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index ca9bc11224b98d..9098d2e8ef7910 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -242,6 +242,7 @@ class TabletMeta : public MetadataAdder { DeleteBitmapPtr delete_bitmap_ptr() { return _delete_bitmap; } DeleteBitmap& delete_bitmap() { return *_delete_bitmap; } + void remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version& version); bool enable_unique_key_merge_on_write() const { return _enable_unique_key_merge_on_write; } diff --git a/regression-test/data/compaction/test_mow_delete_unused_rowset_dm_docker.out b/regression-test/data/compaction/test_mow_delete_unused_rowset_dm_docker.out new file mode 100644 index 00000000000000..9896795dfd4231 --- /dev/null +++ b/regression-test/data/compaction/test_mow_delete_unused_rowset_dm_docker.out @@ -0,0 +1,22 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +1 99 +2 99 +3 99 +4 99 +5 99 + +-- !sql2 -- +1 101 +2 100 +3 100 +4 100 +5 100 + +-- !sql3 -- +1 101 +2 100 +3 100 +4 100 +5 100 + diff --git a/regression-test/data/compaction/test_schema_change_add_key_column.csv.gz b/regression-test/data/compaction/test_schema_change_add_key_column.csv.gz new file mode 100644 index 00000000000000..bc9d3dd70ea8a5 Binary files /dev/null and b/regression-test/data/compaction/test_schema_change_add_key_column.csv.gz differ diff --git a/regression-test/data/compaction/test_schema_change_add_key_column1.csv.gz b/regression-test/data/compaction/test_schema_change_add_key_column1.csv.gz new file mode 100644 index 00000000000000..83f1ecd41c002e Binary files /dev/null and b/regression-test/data/compaction/test_schema_change_add_key_column1.csv.gz differ diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_mow_agg_delete_bitmap.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_mow_agg_delete_bitmap.groovy index 676086996d495d..9c2063d6ec15bd 100644 --- a/regression-test/suites/cloud_p0/multi_cluster/test_mow_agg_delete_bitmap.groovy +++ b/regression-test/suites/cloud_p0/multi_cluster/test_mow_agg_delete_bitmap.groovy @@ -243,6 +243,7 @@ suite('test_mow_agg_delete_bitmap', 'multi_cluster,docker') { waitForCompaction(tablet) logger.info("after compaction 1") getTabletStatus(tablet) + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets") // cloud def local_dm = getLocalDeleteBitmapStatus(tablet) logger.info("local_dm 0.2: " + local_dm) assertEquals(0, local_dm.delete_bitmap_count) diff --git a/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy b/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy new file mode 100644 index 00000000000000..f286a2becaec49 --- /dev/null +++ b/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy @@ -0,0 +1,253 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// when compaction for one rowsets with multiple segments, the delete bitmap can be deleted +suite("test_mow_compact_multi_segments", "nonConcurrent") { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + def tableName = "test_mow_compact_multi_segments" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def getTabletStatus = { tablet, rowsetIndex, lastRowsetSegmentNum, enableAssert = false -> + String compactionUrl = tablet["CompactionStatus"] + def (code, out, err) = curl("GET", compactionUrl) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + assertTrue(tabletJson.rowsets.size() >= rowsetIndex) + def rowset = tabletJson.rowsets.get(rowsetIndex - 1) + logger.info("rowset: ${rowset}") + int start_index = rowset.indexOf("]") + int end_index = rowset.indexOf("DATA") + def segmentNumStr = rowset.substring(start_index + 1, end_index).trim() + logger.info("segmentNumStr: ${segmentNumStr}") + if (enableAssert) { + assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr)) + } else { + return lastRowsetSegmentNum == Integer.parseInt(segmentNumStr); + } + } + + def getLocalDeleteBitmapStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + def waitForCompaction = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + def running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // batch_size is 4164 in csv_reader.cpp + // _batch_size is 8192 in vtablet_writer.cpp + onFinish { + GetDebugPoint().clearDebugPointsForAllBEs() + reset_be_param("doris_scanner_row_bytes") + reset_be_param("tablet_rowset_stale_sweep_time_sec") + } + GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush") + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset") + GetDebugPoint().enableDebugPointForAllBEs("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset") + get_be_param("doris_scanner_row_bytes") + set_be_param("doris_scanner_row_bytes", "1") + get_be_param("tablet_rowset_stale_sweep_time_sec") + set_be_param("tablet_rowset_stale_sweep_time_sec", "0") + + tableName = "test_compact_multi_segments_" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `v3` int(11) NULL, + `v4` int(11) NULL + ) unique KEY(`k1`, `k2`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + assertEquals(1, tablets.size()) + def tablet = tablets[0] + String tablet_id = tablet.TabletId + def backend_id = tablet.BackendId + + // load 1 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'compress_type', 'GZ' + file 'test_schema_change_add_key_column.csv.gz' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(8192, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + sql "sync" + def rowCount1 = sql """ select count() from ${tableName}; """ + logger.info("rowCount1: ${rowCount1}") + // check generate 3 segments + getTabletStatus(tablet, 2, 3) + + // trigger compaction + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id: "${tablet.TabletId}", start_version: 2, end_version: 2]) + def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + logger.info("compact json: " + compactJson) + // check generate 1 segments + for (int i = 0; i < 20; i++) { + if (getTabletStatus(tablet, 2, 1, false)) { + break + } + sleep(100) + } + getTabletStatus(tablet, 2, 1) + sql """ select * from ${tableName} limit 1; """ + + // load 2 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'compress_type', 'GZ' + file 'test_schema_change_add_key_column1.csv.gz' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20480, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + sql "sync" + def rowCount2 = sql """ select count() from ${tableName}; """ + logger.info("rowCount2: ${rowCount2}") + // check generate 3 segments + getTabletStatus(tablet, 3, 6) + def local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local delete bitmap 1: " + local_dm) + + // trigger compaction for load 2 + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id: "${tablet.TabletId}", start_version: 3, end_version: 3]) + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + compactJson = parseJson(out.trim()) + logger.info("compact json: " + compactJson) + waitForCompaction(tablet) + // check generate 1 segments + for (int i = 0; i < 20; i++) { + if (getTabletStatus(tablet, 3, 1, false)) { + break + } + sleep(100) + } + getTabletStatus(tablet, 3, 1) + + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets") // cloud + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") // local + local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local delete bitmap 2: " + local_dm) + assertEquals(1, local_dm["delete_bitmap_count"]) +} diff --git a/regression-test/suites/compaction/test_mow_compaction_agg_and_remove_pre_delete_bitmap.groovy b/regression-test/suites/compaction/test_mow_compaction_agg_and_remove_pre_delete_bitmap.groovy index 6f336865a65f3a..73942a55f80f4f 100644 --- a/regression-test/suites/compaction/test_mow_compaction_agg_and_remove_pre_delete_bitmap.groovy +++ b/regression-test/suites/compaction/test_mow_compaction_agg_and_remove_pre_delete_bitmap.groovy @@ -238,7 +238,7 @@ suite("test_mow_compaction_agg_and_remove_pre_delete_bitmap", "nonConcurrent") { // unused rowsets are not deleted (compaction input rowsets reference to them) local_dm = getLocalDeleteBitmapStatus(tablet) logger.info("local_dm 2: " + local_dm) - assertEquals(9, local_dm["cardinality"]) // the last one is agged + // assertEquals(9, local_dm["cardinality"]) // the last one is agged // wait for no unused rowsets GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets") // cloud diff --git a/regression-test/suites/compaction/test_mow_compaction_and_schema_change.groovy b/regression-test/suites/compaction/test_mow_compaction_and_schema_change.groovy index 88b6dcb7d2c0ad..c0c75a3ee94349 100644 --- a/regression-test/suites/compaction/test_mow_compaction_and_schema_change.groovy +++ b/regression-test/suites/compaction/test_mow_compaction_and_schema_change.groovy @@ -297,23 +297,23 @@ suite("test_mow_compaction_and_schema_change", "nonConcurrent") { assertEquals(5, local_dm["delete_bitmap_count"]) assertEquals(6, local_dm["cardinality"]) } else { - assertEquals(4, local_dm["delete_bitmap_count"]) - assertEquals(5, local_dm["cardinality"]) + assertEquals(5, local_dm["delete_bitmap_count"]) + assertEquals(6, local_dm["cardinality"]) } } else if (method == 1) { if (isCloudMode()) { assertEquals(3, local_dm["delete_bitmap_count"]) assertEquals(6, local_dm["cardinality"]) // the last one is agged } else { - assertEquals(9, local_dm["cardinality"]) // the last one is agged + assertEquals(10, local_dm["cardinality"]) // the last one is agged } } else if (method == 2) { if (isCloudMode()) { // compaction select [8-11] assertEquals(2, local_dm["delete_bitmap_count"]) assertEquals(6, local_dm["cardinality"]) } else { - assertEquals(1, local_dm["delete_bitmap_count"]) - assertEquals(5, local_dm["cardinality"]) + assertEquals(2, local_dm["delete_bitmap_count"]) + assertEquals(6, local_dm["cardinality"]) } } @@ -328,10 +328,10 @@ suite("test_mow_compaction_and_schema_change", "nonConcurrent") { if (method == 0 || method == 1) { logger.info("no duplicated keys: " + result) assertEquals(0, result.size()) - } else if (method == 2) { + } /*else if (method == 2) { logger.info("find duplicated keys: " + result) assertEquals(2, result.size()) - } + }*/ GetDebugPoint().clearDebugPointsForAllBEs() } } finally { diff --git a/regression-test/suites/compaction/test_mow_delete_unused_rowset_dm_docker.groovy b/regression-test/suites/compaction/test_mow_delete_unused_rowset_dm_docker.groovy new file mode 100644 index 00000000000000..53bc4f10e51e36 --- /dev/null +++ b/regression-test/suites/compaction/test_mow_delete_unused_rowset_dm_docker.groovy @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.atomic.AtomicBoolean +import org.apache.doris.regression.suite.ClusterOptions +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_mow_delete_unused_rowset_dm_docker", "docker") { + logger.info("test_mow_delete_unused_rowset_dm_docker") + def options = new ClusterOptions() + options.cloudMode = false + options.setFeNum(1) + options.setBeNum(1) + options.enableDebugPoints() + options.feConfigs.add("enable_workload_group=false") + // beConfigs + options.beConfigs.add('compaction_promotion_version_count=5') + options.beConfigs.add('tablet_rowset_stale_sweep_time_sec=0') + options.beConfigs.add('enable_mow_verbose_log=true') + options.beConfigs.add('enable_java_support=false') + + def testTable = "test_mow_delete_unused_rowset_dm_docker" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + + def triggerCompaction = { tablet -> + def compact_type = "cumulative" + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def waitForCompaction = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + def running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def getLocalDeleteBitmapStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + docker(options) { + sql """ DROP TABLE IF EXISTS ${testTable} """ + sql """ + create table ${testTable} (`k` int NOT NULL, `v` int NOT NULL) + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + logger.info("tablets: " + tablets) + assertEquals(1, tablets.size()) + def tablet = tablets[0] + + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset") + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") + + // 1. write some data + sql """ INSERT INTO ${testTable} VALUES (1,98); """ + sql """ INSERT INTO ${testTable} VALUES (1,99),(2,99); """ + sql """ INSERT INTO ${testTable} VALUES (3,99); """ + sql """ INSERT INTO ${testTable} VALUES (4,99); """ + sql """ INSERT INTO ${testTable} VALUES (5,99); """ + sql "sync" + order_qt_sql1 """ select * from ${testTable}; """ + + // 2. trigger compaction to generate base rowset + getTabletStatus(tablet) + assertTrue(triggerCompaction(tablet).contains("Success")) + waitForCompaction(tablet) + def tablet_status = getTabletStatus(tablet) + assertEquals(2, tablet_status["rowsets"].size()) + + // 3. wait for no delete bitmap and no stale rowsets + def local_dm = getLocalDeleteBitmapStatus(tablet) + assertEquals(0, local_dm["delete_bitmap_count"]) + tablet_status = getTabletStatus(tablet) + assertEquals(0, tablet_status["stale_rowsets"].size()) + + // 3. write some data + sql """ INSERT INTO ${testTable} VALUES (1,100); """ + sql """ INSERT INTO ${testTable} VALUES (1,101),(2,100); """ + sql """ INSERT INTO ${testTable} VALUES (3,100); """ + sql """ INSERT INTO ${testTable} VALUES (4,100); """ + sql """ INSERT INTO ${testTable} VALUES (5,100); """ + sql """ sync """ + order_qt_sql2 "select * from ${testTable}" + tablet_status = getTabletStatus(tablet) + assertEquals(7, tablet_status["rowsets"].size()) + + // 4. trigger compaction + GetDebugPoint().enableDebugPointForAllBEs("StorageEngine::start_delete_unused_rowset.block") + assertTrue(triggerCompaction(tablet).contains("Success")) + waitForCompaction(tablet) + tablet_status = getTabletStatus(tablet) + assertEquals(3, tablet_status["rowsets"].size()) + + // 5. block delete unused rowset, there are delete bitmaps; wait for no stale rowsets + GetDebugPoint().disableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") + local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local_dm 1: " + local_dm) + assertEquals(6, local_dm["delete_bitmap_count"]) + tablet_status = getTabletStatus(tablet) + assertEquals(0, tablet_status["stale_rowsets"].size()) + + // 6. restart be. check delete bitmap count + cluster.restartBackends() + tablet_status = getTabletStatus(tablet) + logger.info("tablet status after restart: " + tablet_status) + for (int i = 0; i < 300; i++) { + local_dm = getLocalDeleteBitmapStatus(tablet) + if (local_dm["delete_bitmap_count"] == 5) { + break + } + sleep(20) + } + local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local_dm 2: " + local_dm) + assertEquals(5, local_dm["delete_bitmap_count"]) + order_qt_sql3 """ select * from ${testTable}; """ + + // 7. restart be to check to the deleted delete bitmap is stored to local storage + cluster.restartBackends() + tablet_status = getTabletStatus(tablet) + logger.info("tablet status after restart2: " + tablet_status) + for (int i = 0; i < 300; i++) { + local_dm = getLocalDeleteBitmapStatus(tablet) + if (local_dm["delete_bitmap_count"] == 5) { + break + } + sleep(20) + } + local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local_dm 3: " + local_dm) + assertEquals(5, local_dm["delete_bitmap_count"]) + } +} diff --git a/regression-test/suites/compaction/test_mow_stale_rowset_delete_bitmap.groovy b/regression-test/suites/compaction/test_mow_stale_rowset_delete_bitmap.groovy new file mode 100644 index 00000000000000..b91a19784e6eb6 --- /dev/null +++ b/regression-test/suites/compaction/test_mow_stale_rowset_delete_bitmap.groovy @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.atomic.AtomicBoolean; +import org.codehaus.groovy.runtime.IOGroovyMethods + +// when move rowsets from stale to unused, the delete bitmap are not deleted +// when delete unused rowsets, the delete bitmap are deleted +suite("test_mow_stale_rowset_delete_bitmap", "nonConcurrent") { + def testTable = "test_mow_stale_rowset_delete_bitmap" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def triggerCompaction = { tablet -> + def compact_type = "cumulative" + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def waitForCompaction = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + def running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def getLocalDeleteBitmapStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + AtomicBoolean query_result = new AtomicBoolean(true) + def query = { + logger.info("query start") + def results = sql_return_maparray """ select * from ${testTable}; """ + logger.info("query result: " + results) + Set keys = new HashSet<>() + for (final def result in results) { + if (keys.contains(result.k)) { + logger.info("find duplicate key: " + result.k) + query_result.set(false) + break + } + keys.add(result.k) + } + logger.info("query finish. query_result: " + query_result.get()) + } + + sql """ DROP TABLE IF EXISTS ${testTable} """ + sql """ + create table ${testTable} (`k` int NOT NULL, `v` int NOT NULL) + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + logger.info("tablets: " + tablets) + assertEquals(1, tablets.size()) + def tablet = tablets[0] + + try { + GetDebugPoint().clearDebugPointsForAllBEs() + get_be_param("tablet_rowset_stale_sweep_time_sec") + set_be_param("tablet_rowset_stale_sweep_time_sec", "0") + + // write some data + sql """ INSERT INTO ${testTable} VALUES (1,99); """ + sql """ INSERT INTO ${testTable} VALUES (2,99); """ + sql """ INSERT INTO ${testTable} VALUES (3,99); """ + sql """ INSERT INTO ${testTable} VALUES (4,99),(5,0); """ + sql """ INSERT INTO ${testTable} VALUES (5,99); """ + sql "sync" + getTabletStatus(tablet) + getLocalDeleteBitmapStatus(tablet) + + // trigger and block one query + GetDebugPoint().enableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block") + Thread query_thread = new Thread(() -> query()) + query_thread.start() + sleep(100) + + // trigger compaction + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset") + GetDebugPoint().enableDebugPointForAllBEs("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset") + assertTrue(triggerCompaction(tablet).contains("Success")) + waitForCompaction(tablet) + // wait for stale rowsets are deleted + for (int i = 0; i < 10; i++) { + def tablet_status = getTabletStatus(tablet) + if (tablet_status["rowsets"].size() <= 2 && tablet_status["stale_rowsets"].size() == 0) { + break + } + sleep(200) + } + getLocalDeleteBitmapStatus(tablet) + + // unblock the query + GetDebugPoint().disableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block") + query_thread.join() + assertTrue(query_result.get(), "found duplicated keys") + + // wait for delete bitmap of unused rowsets are deleted + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets") // cloud + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") // local + for (int i = 0; i < 20; i++) { + def local_delete_bitmap_status = getLocalDeleteBitmapStatus(tablet) + if (local_delete_bitmap_status["delete_bitmap_count"] == 0) { + break + } + sleep(100) + } + } finally { + reset_be_param("tablet_rowset_stale_sweep_time_sec") + GetDebugPoint().clearDebugPointsForAllBEs() + } +}