diff --git a/be/src/pipeline/exec/cache_source_operator.cpp b/be/src/pipeline/exec/cache_source_operator.cpp index 1387de351d11ef..e2bee373f9d17d 100644 --- a/be/src/pipeline/exec/cache_source_operator.cpp +++ b/be/src/pipeline/exec/cache_source_operator.cpp @@ -70,14 +70,7 @@ Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { _hit_cache_results = _query_cache_handle.get_cache_result(); auto hit_cache_slot_orders = _query_cache_handle.get_cache_slot_orders(); - bool need_reorder = _slot_orders.size() != hit_cache_slot_orders->size(); - if (!need_reorder) { - for (int i = 0; i < _slot_orders.size(); ++i) { - need_reorder = _slot_orders[i] != (*hit_cache_slot_orders)[i]; - } - } - - if (need_reorder) { + if (_slot_orders != *hit_cache_slot_orders) { for (auto slot_id : _slot_orders) { auto find_res = std::find(hit_cache_slot_orders->begin(), hit_cache_slot_orders->end(), slot_id); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 220474c91e4a90..81a84451e89c0a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1235,7 +1235,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) + ": group by and output is empty"); } - + bool need_create_cache_op = + enable_query_cache && tnode.node_id == request.fragment.query_cache_param.node_id; auto create_query_cache_operator = [&](PipelinePtr& new_pipe) { auto cache_node_id = request.local_params[0].per_node_scan_ranges.begin()->first; auto cache_source_id = next_operator_id(); @@ -1269,7 +1270,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo request.query_options.__isset.enable_distinct_streaming_aggregation && request.query_options.enable_distinct_streaming_aggregation && !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) { - if (enable_query_cache) { + if (need_create_cache_op) { PipelinePtr new_pipe; RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); @@ -1293,7 +1294,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && !tnode.agg_node.grouping_exprs.empty()) { - if (enable_query_cache) { + if (need_create_cache_op) { PipelinePtr new_pipe; RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); @@ -1310,7 +1311,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } else { // create new pipeline to add query cache operator PipelinePtr new_pipe; - if (enable_query_cache) { + if (need_create_cache_op) { RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); } @@ -1319,7 +1320,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } else { op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs)); } - if (enable_query_cache) { + if (need_create_cache_op) { RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); RETURN_IF_ERROR(new_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); diff --git a/regression-test/data/query_p0/cache/query_cache.out b/regression-test/data/query_p0/cache/query_cache.out new file mode 100644 index 00000000000000..90a8086bb69dc4 --- /dev/null +++ b/regression-test/data/query_p0/cache/query_cache.out @@ -0,0 +1,30 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !query_cache1 -- + +-- !query_cache2 -- +0 0 0 +1 1 1 +2 2 2 + +-- !query_cache3 -- + +-- !query_cache4 -- +0 0 0 +1 1 1 +2 2 2 + +-- !query_cache5 -- + +-- !query_cache6 -- +0 0 0 +1 1 1 +2 2 2 + +-- !query_cache7 -- +\N \N \N 0 \N +6 6 6 1 6 + +-- !query_cache8 -- +0 \N \N \N \N +1 6 6 6 6 + diff --git a/regression-test/suites/query_p0/cache/query_cache.groovy b/regression-test/suites/query_p0/cache/query_cache.groovy new file mode 100644 index 00000000000000..da467bcc3a285b --- /dev/null +++ b/regression-test/suites/query_p0/cache/query_cache.groovy @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.stream.Collectors + +suite("query_cache") { + def tableName = "table_3_undef_partitions2_keys3_properties4_distributed_by53" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `pk` int NULL, + `col_varchar_10__undef_signed` varchar(10) NULL, + `col_int_undef_signed` int NULL, + `col_varchar_1024__undef_signed` varchar(1024) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`pk`, `col_varchar_10__undef_signed`) + DISTRIBUTED BY HASH(`pk`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ) + """ + + sql """ + INSERT INTO ${tableName}(pk, col_varchar_10__undef_signed, col_int_undef_signed, col_varchar_1024__undef_signed) + VALUES + (0, "mean", null, "p"), + (1, "is", 6, "what"), + (2, "one", null, "e") + """ + + // First complex query - Run without cache + order_qt_query_cache1 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} AS alias1 + WHERE ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', '%') + AND ( + (alias1.`pk` = 154 OR ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('lWpWJPFqXM', '%') + AND alias1.`pk` = 111 + )) + AND ( + alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr' + AND alias1.col_varchar_1024__undef_signed > 'with' + ) + AND alias1.`pk` IS NULL + ) + AND alias1.col_int_undef_signed < 7 + ) + GROUP BY field3 + """ + + // Simple query - Run without cache + order_qt_query_cache2 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} + GROUP BY field3 + """ + + // Enable query cache + sql "set enable_query_cache=true" + + // Run the same complex query with cache enabled + order_qt_query_cache3 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} AS alias1 + WHERE ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', '%') + AND ( + (alias1.`pk` = 154 OR ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('lWpWJPFqXM', '%') + AND alias1.`pk` = 111 + )) + AND ( + alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr' + AND alias1.col_varchar_1024__undef_signed > 'with' + ) + AND alias1.`pk` IS NULL + ) + AND alias1.col_int_undef_signed < 7 + ) + GROUP BY field3 + """ + + // Run the same simple query with cache enabled + order_qt_query_cache4 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} + GROUP BY field3 + """ + + // Run both queries again to test cache hit + order_qt_query_cache5 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} AS alias1 + WHERE ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', '%') + AND ( + (alias1.`pk` = 154 OR ( + alias1.col_varchar_1024__undef_signed LIKE CONCAT('lWpWJPFqXM', '%') + AND alias1.`pk` = 111 + )) + AND ( + alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr' + AND alias1.col_varchar_1024__undef_signed > 'with' + ) + AND alias1.`pk` IS NULL + ) + AND alias1.col_int_undef_signed < 7 + ) + GROUP BY field3 + """ + + order_qt_query_cache6 """ + SELECT + MIN(`pk`) AS field1, + MAX(`pk`) AS field2, + `pk` AS field3 + FROM ${tableName} + GROUP BY field3 + """ + + order_qt_query_cache7 """ + SELECT + col_int_undef_signed, + MIN(`col_int_undef_signed`) AS field1, + MAX(`col_int_undef_signed`) AS field2, + COUNT(`col_int_undef_signed`) AS field3, + SUM(`col_int_undef_signed`) AS field4 + FROM ${tableName} + GROUP BY col_int_undef_signed + """ + + // reorder the order_qt_query_cache7 select list to test the cache hit + order_qt_query_cache8 """ + SELECT + COUNT(`col_int_undef_signed`) AS field3, -- Count of col_int_undef_signed (Original field3) + col_int_undef_signed, -- The original unsigned integer column (Original col_int_undef_signed) + SUM(`col_int_undef_signed`) AS field4, -- Sum of col_int_undef_signed (Original field4) + MIN(`col_int_undef_signed`) AS field1, -- Minimum value of col_int_undef_signed (Original field1) + MAX(`col_int_undef_signed`) AS field2 -- Maximum value of col_int_undef_signed (Original field2). Note: Trailing comma removed to avoid syntax error. +FROM ${tableName} +GROUP BY col_int_undef_signed; + """ +}