From f271637f73000f401e570c6ac01043d8e6a5b668 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Fri, 14 Feb 2025 11:36:38 +0800 Subject: [PATCH 1/3] [Bug](cache) Fix query cache report error message (#47883) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit the plan: ``` QUERY_CACHE: | | CACHE_NODE_ID: 2 | | DIGEST: 557413bec3209f50fd640eb3b0534c12ccd9df35f9d4c86620416936985e2679 | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | UNPARTITIONED | | | | 2:VAGGREGATE (merge finalize)(273) | | | output: min(partial_min(pk)[#6])[#9], max(partial_max(pk)[#7])[#10] | | | group by: pk[#5] | | | sortByGroupKey:false | | | cardinality=0 | | | final projections: field1[#9], field2[#10], pk[#8] | | | final project output tuple id: 4 | | | distribute expr lists: pk[#5] | | | | | 1:VAGGREGATE (update serialize)(268) | | | output: partial_min(pk[#4])[#6], partial_max(pk[#4])[#7] | | | group by: pk[#4] | | | sortByGroupKey:false | | | cardinality=0 | | | distribute expr lists: pk[#4] | | | ``` FE choose the id agg 2, but BE still use id agg 1. BE should support the case --- be/src/pipeline/pipeline_fragment_context.cpp | 11 +- .../data/query_p0/cache/query_cache.out | 22 +++ .../suites/query_p0/cache/query_cache.groovy | 163 ++++++++++++++++++ 3 files changed, 191 insertions(+), 5 deletions(-) create mode 100644 regression-test/data/query_p0/cache/query_cache.out create mode 100644 regression-test/suites/query_p0/cache/query_cache.groovy diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 220474c91e4a90..81a84451e89c0a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1235,7 +1235,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) + ": group by and output is empty"); } - + bool need_create_cache_op = + enable_query_cache && tnode.node_id == request.fragment.query_cache_param.node_id; auto create_query_cache_operator = [&](PipelinePtr& new_pipe) { auto cache_node_id = request.local_params[0].per_node_scan_ranges.begin()->first; auto cache_source_id = next_operator_id(); @@ -1269,7 +1270,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo request.query_options.__isset.enable_distinct_streaming_aggregation && request.query_options.enable_distinct_streaming_aggregation && !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) { - if (enable_query_cache) { + if (need_create_cache_op) { PipelinePtr new_pipe; RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); @@ -1293,7 +1294,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && !tnode.agg_node.grouping_exprs.empty()) { - if (enable_query_cache) { + if (need_create_cache_op) { PipelinePtr new_pipe; RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); @@ -1310,7 +1311,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } else { // create new pipeline to add query cache operator PipelinePtr new_pipe; - if (enable_query_cache) { + if (need_create_cache_op) { RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); } @@ -1319,7 +1320,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } else { op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs)); } - if (enable_query_cache) { + if (need_create_cache_op) { RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); RETURN_IF_ERROR(new_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); diff --git a/regression-test/data/query_p0/cache/query_cache.out b/regression-test/data/query_p0/cache/query_cache.out new file mode 100644 index 00000000000000..5cfa581b3bffbf --- /dev/null +++ b/regression-test/data/query_p0/cache/query_cache.out @@ -0,0 +1,22 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !query_cache1 -- + +-- !query_cache2 -- +0 0 0 +1 1 1 +2 2 2 + +-- !query_cache3 -- + +-- !query_cache4 -- +0 0 0 +1 1 1 +2 2 2 + +-- !query_cache5 -- + +-- !query_cache6 -- +0 0 0 +1 1 1 +2 2 2 + diff --git a/regression-test/suites/query_p0/cache/query_cache.groovy b/regression-test/suites/query_p0/cache/query_cache.groovy new file mode 100644 index 00000000000000..997453f18198e1 --- /dev/null +++ b/regression-test/suites/query_p0/cache/query_cache.groovy @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.stream.Collectors + +suite("query_cache") { + def tableName = "table_3_undef_partitions2_keys3_properties4_distributed_by53" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `pk` int NULL, + `col_varchar_10__undef_signed` varchar(10) NULL, + `col_int_undef_signed` int NULL, + `col_varchar_1024__undef_signed` varchar(1024) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`pk`, `col_varchar_10__undef_signed`) + DISTRIBUTED BY HASH(`pk`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "min_load_replica_num" = "-1", + "is_being_synced" = "false", + "storage_medium" = "hdd", + "storage_format" = "V2", + "inverted_index_storage_format" = "V3", + "light_schema_change" = "true", + "disable_auto_compaction" = "false", + "enable_single_replica_compaction" = "false", + "group_commit_interval_ms" = "10000", + "group_commit_data_bytes" = "134217728" + ) + """ + + sql """ + INSERT INTO ${tableName}(pk, col_varchar_10__undef_signed, col_int_undef_signed, col_varchar_1024__undef_signed) + VALUES + (0, "mean", null, "p"), + (1, "is", 6, "what"), + (2, "one", null, "e") + """ + + // First complex query - Run without cache + order_qt_query_cache1 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} AS alias1 + WHERE ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', '%') + AND ( + (alias1.`pk` = 154 OR ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('lWpWJPFqXM', '%') + AND alias1.`pk` = 111 + )) + AND ( + alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr' + AND alias1.col_varchar_1024__undef_signed > 'with' + ) + AND alias1.`pk` IS NULL + ) + AND alias1.col_int_undef_signed < 7 + ) + GROUP BY field3 + """ + + // Simple query - Run without cache + order_qt_query_cache2 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} + GROUP BY field3 + """ + + // Enable query cache + sql "set enable_query_cache=true" + + // Run the same complex query with cache enabled + order_qt_query_cache3 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} AS alias1 + WHERE ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', '%') + AND ( + (alias1.`pk` = 154 OR ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('lWpWJPFqXM', '%') + AND alias1.`pk` = 111 + )) + AND ( + alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr' + AND alias1.col_varchar_1024__undef_signed > 'with' + ) + AND alias1.`pk` IS NULL + ) + AND alias1.col_int_undef_signed < 7 + ) + GROUP BY field3 + """ + + // Run the same simple query with cache enabled + order_qt_query_cache4 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} + GROUP BY field3 + """ + + // Run both queries again to test cache hit + order_qt_query_cache5 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} AS alias1 + WHERE ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', '%') + AND ( + (alias1.`pk` = 154 OR ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('lWpWJPFqXM', '%') + AND alias1.`pk` = 111 + )) + AND ( + alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr' + AND alias1.col_varchar_1024__undef_signed > 'with' + ) + AND alias1.`pk` IS NULL + ) + AND alias1.col_int_undef_signed < 7 + ) + GROUP BY field3 + """ + + order_qt_query_cache6 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} + GROUP BY field3 + """ + +} \ No newline at end of file From 2aba6fc761bdae98a2517188c5e5780c79bc6502 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Tue, 18 Feb 2025 12:00:04 +0800 Subject: [PATCH 2/3] [Fix](QueryCache) Fix cache order different may cause error (#47961) the origin judge may cause the need reorder judge error: ``` bool need_reorder = _slot_orders.size() != hit_cache_slot_orders->size(); if (!need_reorder) { for (int i = 0; i < _slot_orders.size(); ++i) { need_reorder = _slot_orders[i] != (*hit_cache_slot_orders)[i]; } } ``` --- .../pipeline/exec/cache_source_operator.cpp | 9 +------ .../data/query_p0/cache/query_cache.out | 8 +++++++ .../suites/query_p0/cache/query_cache.groovy | 24 ++++++++++++++++++- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/be/src/pipeline/exec/cache_source_operator.cpp b/be/src/pipeline/exec/cache_source_operator.cpp index 1387de351d11ef..e2bee373f9d17d 100644 --- a/be/src/pipeline/exec/cache_source_operator.cpp +++ b/be/src/pipeline/exec/cache_source_operator.cpp @@ -70,14 +70,7 @@ Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { _hit_cache_results = _query_cache_handle.get_cache_result(); auto hit_cache_slot_orders = _query_cache_handle.get_cache_slot_orders(); - bool need_reorder = _slot_orders.size() != hit_cache_slot_orders->size(); - if (!need_reorder) { - for (int i = 0; i < _slot_orders.size(); ++i) { - need_reorder = _slot_orders[i] != (*hit_cache_slot_orders)[i]; - } - } - - if (need_reorder) { + if (_slot_orders != *hit_cache_slot_orders) { for (auto slot_id : _slot_orders) { auto find_res = std::find(hit_cache_slot_orders->begin(), hit_cache_slot_orders->end(), slot_id); diff --git a/regression-test/data/query_p0/cache/query_cache.out b/regression-test/data/query_p0/cache/query_cache.out index 5cfa581b3bffbf..90a8086bb69dc4 100644 --- a/regression-test/data/query_p0/cache/query_cache.out +++ b/regression-test/data/query_p0/cache/query_cache.out @@ -20,3 +20,11 @@ 1 1 1 2 2 2 +-- !query_cache7 -- +\N \N \N 0 \N +6 6 6 1 6 + +-- !query_cache8 -- +0 \N \N \N \N +1 6 6 6 6 + diff --git a/regression-test/suites/query_p0/cache/query_cache.groovy b/regression-test/suites/query_p0/cache/query_cache.groovy index 997453f18198e1..e448a8978a0565 100644 --- a/regression-test/suites/query_p0/cache/query_cache.groovy +++ b/regression-test/suites/query_p0/cache/query_cache.groovy @@ -160,4 +160,26 @@ suite("query_cache") { GROUP BY field3 """ -} \ No newline at end of file + order_qt_query_cache7 """ + SELECT + col_int_undef_signed, + MIN(`col_int_undef_signed`) AS field1, + MAX(`col_int_undef_signed`) AS field2, + COUNT(`col_int_undef_signed`) AS field3, + SUM(`col_int_undef_signed`) AS field4 + FROM ${tableName} + GROUP BY col_int_undef_signed + """ + + // reorder the order_qt_query_cache7 select list to test the cache hit + order_qt_query_cache8 """ + SELECT + COUNT(`col_int_undef_signed`) AS field3, -- Count of col_int_undef_signed (Original field3) + col_int_undef_signed, -- The original unsigned integer column (Original col_int_undef_signed) + SUM(`col_int_undef_signed`) AS field4, -- Sum of col_int_undef_signed (Original field4) + MIN(`col_int_undef_signed`) AS field1, -- Minimum value of col_int_undef_signed (Original field1) + MAX(`col_int_undef_signed`) AS field2 -- Maximum value of col_int_undef_signed (Original field2). Note: Trailing comma removed to avoid syntax error. +FROM ${tableName} +GROUP BY col_int_undef_signed; + """ +} From 0aa3dbf9fa57a43f782f25024f7b30b2b19aab12 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 23 May 2025 10:08:26 +0800 Subject: [PATCH 3/3] update --- .../suites/query_p0/cache/query_cache.groovy | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/regression-test/suites/query_p0/cache/query_cache.groovy b/regression-test/suites/query_p0/cache/query_cache.groovy index e448a8978a0565..da467bcc3a285b 100644 --- a/regression-test/suites/query_p0/cache/query_cache.groovy +++ b/regression-test/suites/query_p0/cache/query_cache.groovy @@ -31,17 +31,7 @@ suite("query_cache") { DUPLICATE KEY(`pk`, `col_varchar_10__undef_signed`) DISTRIBUTED BY HASH(`pk`) BUCKETS 10 PROPERTIES ( - "replication_allocation" = "tag.location.default: 1", - "min_load_replica_num" = "-1", - "is_being_synced" = "false", - "storage_medium" = "hdd", - "storage_format" = "V2", - "inverted_index_storage_format" = "V3", - "light_schema_change" = "true", - "disable_auto_compaction" = "false", - "enable_single_replica_compaction" = "false", - "group_commit_interval_ms" = "10000", - "group_commit_data_bytes" = "134217728" + "replication_allocation" = "tag.location.default: 1" ) """