Skip to content
This repository was archived by the owner on Mar 12, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
549 changes: 316 additions & 233 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ members = [

[workspace.dependencies]
alloc_tracker = { path = "src/components/alloc_tracker" }
arrow = { version = "43.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "43.0.0" }
arrow = { version = "49.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "49.0.0" }
arrow_ext = { path = "src/components/arrow_ext" }
analytic_engine = { path = "src/analytic_engine" }
arena = { path = "src/components/arena" }
Expand All @@ -107,8 +107,8 @@ cluster = { path = "src/cluster" }
criterion = "0.5"
horaedb-client = "1.0.2"
common_types = { path = "src/common_types" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" }
derive_builder = "0.12"
df_operator = { path = "src/df_operator" }
df_engine_extensions = { path = "src/df_engine_extensions" }
Expand All @@ -121,10 +121,10 @@ hash_ext = { path = "src/components/hash_ext" }
hex = "0.4.3"
hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev = "425487ce910f26636fbde8c4d640b538431aad50" }
id_allocator = { path = "src/components/id_allocator" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "schema" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "schema" }
interpreters = { path = "src/interpreters" }
itertools = "0.10.5"
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
Expand All @@ -142,7 +142,7 @@ panic_ext = { path = "src/components/panic_ext" }
partitioned_lock = { path = "src/components/partitioned_lock" }
partition_table_engine = { path = "src/partition_table_engine" }
parquet_ext = { path = "src/components/parquet_ext" }
parquet = { version = "43.0.0" }
parquet = { version = "49.0.0" }
paste = "1.0"
pin-project-lite = "0.2.8"
pprof = "0.12.1"
Expand Down Expand Up @@ -172,9 +172,9 @@ size_ext = { path = "src/components/size_ext" }
smallvec = "1.6"
slog = "2.7"
spin = "0.9.6"
sqlparser = { version = "0.35", features = ["serde"] }
system_catalog = { path = "src/system_catalog" }
system_statis = { path = "src/components/system_stats" }
sqlparser = { version = "0.39.0", features = ["serde"] }
system_catalog = { path = "src/system_catalog" }
table_engine = { path = "src/table_engine" }
table_kv = { path = "src/components/table_kv" }
tempfile = "3.1.0"
Expand Down
17 changes: 13 additions & 4 deletions integration_tests/cases/common/dml/issue-1087.result
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ String("logical_plan after inline_table_scan"),String("SAME TEXT AS ABOVE"),
String("logical_plan after type_coercion"),String("SAME TEXT AS ABOVE"),
String("logical_plan after count_wildcard_rule"),String("SAME TEXT AS ABOVE"),
String("analyzed_logical_plan"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_nested_union"),String("SAME TEXT AS ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"),
String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS ABOVE"),
Expand All @@ -33,6 +34,7 @@ String("logical_plan after eliminate_cross_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_one_union"),String("SAME TEXT AS ABOVE"),
String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
Expand All @@ -46,6 +48,7 @@ String("logical_plan after eliminate_projection"),String("TableScan: issue_1087
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT AS ABOVE"),
String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_nested_union"),String("SAME TEXT AS ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"),
String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS ABOVE"),
Expand All @@ -62,6 +65,7 @@ String("logical_plan after eliminate_cross_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_one_union"),String("SAME TEXT AS ABOVE"),
String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
Expand All @@ -76,17 +80,22 @@ String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT AS ABOVE"),
String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
String("logical_plan"),String("TableScan: issue_1087 projection=[tsid, t, name, value]"),
String("initial_physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low\n"),
String("initial_physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),
String("initial_physical_plan_with_stats"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]\n"),
String("physical_plan after OutputRequirements"),String("OutputRequirementExec\n ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),
String("physical_plan after aggregate_statistics"),String("SAME TEXT AS ABOVE"),
String("physical_plan after join_selection"),String("SAME TEXT AS ABOVE"),
String("physical_plan after PipelineFixer"),String("SAME TEXT AS ABOVE"),
String("physical_plan after repartition"),String("SAME TEXT AS ABOVE"),
String("physical_plan after LimitedDistinctAggregation"),String("SAME TEXT AS ABOVE"),
String("physical_plan after EnforceDistribution"),String("SAME TEXT AS ABOVE"),
String("physical_plan after CombinePartialFinalAggregate"),String("SAME TEXT AS ABOVE"),
String("physical_plan after EnforceSorting"),String("SAME TEXT AS ABOVE"),
String("physical_plan after coalesce_batches"),String("SAME TEXT AS ABOVE"),
String("physical_plan after OutputRequirements"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),
String("physical_plan after PipelineChecker"),String("SAME TEXT AS ABOVE"),
String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low\n"),
String("physical_plan after LimitAggregation"),String("SAME TEXT AS ABOVE"),
String("physical_plan after ProjectionPushdown"),String("SAME TEXT AS ABOVE"),
String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),
String("physical_plan_with_stats"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]\n"),


DROP TABLE `issue_1087`;
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/dml/issue-302.result
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ affected_rows: 1

select `t`, count(distinct name) from issue302 group by `t`;

issue302.t,COUNT(DISTINCT issue302.name),
t,COUNT(DISTINCT issue302.name),
Timestamp(1651737067000),Int64(0),


Expand Down
12 changes: 6 additions & 6 deletions integration_tests/cases/common/dml/issue-341.result
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ WHERE

plan_type,plan,
String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.value = Int32(3)]"),
String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8, priority=Low\n"),
String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


-- FilterExec node should not be in plan.
Expand All @@ -71,8 +71,8 @@ WHERE
tag1 = "t3";

plan_type,plan,
String("logical_plan"),String("Projection: issue341_t1.timestamp, issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value, tag1], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8, priority=Low\n"),
String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


-- Repeat operations above, but with overwrite table
Expand Down Expand Up @@ -116,7 +116,7 @@ WHERE

plan_type,plan,
String("logical_plan"),String("Filter: issue341_t2.value = Float64(3)\n TableScan: issue341_t2 projection=[timestamp, value], partial_filters=[issue341_t2.value = Float64(3)]"),
String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8, priority=Low\n"),
String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


-- When using tag as filter, FilterExec node should not be in plan.
Expand All @@ -129,8 +129,8 @@ WHERE
tag1 = "t3";

plan_type,plan,
String("logical_plan"),String("Projection: issue341_t2.timestamp, issue341_t2.value\n TableScan: issue341_t2 projection=[timestamp, value, tag1], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8, priority=Low\n"),
String("logical_plan"),String("TableScan: issue341_t2 projection=[timestamp, value], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


DROP TABLE IF EXISTS `issue341_t1`;
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/cases/common/dml/issue-59.result
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ FROM issue59
GROUP BY id+1;

plan_type,plan,
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, priority=Low\n"),
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


DROP TABLE IF EXISTS issue59;
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/explain/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ EXPLAIN SELECT t FROM `04_explain_t`;

plan_type,plan,
String("logical_plan"),String("TableScan: 04_explain_t projection=[t]"),
String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8, priority=Low\n"),
String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


DROP TABLE `04_explain_t`;
Expand Down
43 changes: 43 additions & 0 deletions integration_tests/cases/common/function/aggregate.result
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,50 @@ COUNT(DISTINCT 02_function_aggregate_table1.arch),
Int64(2),


CREATE TABLE `02_function_aggregate_table2` (
`timestamp` timestamp NOT NULL,
`arch` string TAG,
`datacenter` string TAG,
`value` int,
`uvalue` uint64,
timestamp KEY (timestamp)) ENGINE=Analytic
WITH(
enable_ttl='false',
update_mode = 'append'
);

affected_rows: 0

INSERT INTO `02_function_aggregate_table2`
(`timestamp`, `arch`, `datacenter`, `value`, `uvalue`)
VALUES
(1658304762, 'x86-64', 'china', 100, 10),
(1658304763, 'x86-64', 'china', 200, 10),
(1658304762, 'arm64', 'china', 110, 0),
(1658304763, 'arm64', 'china', 210, 0);

affected_rows: 4

-- The should select empty column
SELECT count(*) FROM `02_function_aggregate_table1`;

COUNT(*),
Int64(4),


-- Same with before, but query from sst
-- SQLNESS ARG pre_cmd=flush
SELECT count(*) FROM `02_function_aggregate_table1`;

COUNT(*),
Int64(4),


DROP TABLE `02_function_aggregate_table1`;

affected_rows: 0

DROP TABLE `02_function_aggregate_table2`;

affected_rows: 0

28 changes: 28 additions & 0 deletions integration_tests/cases/common/function/aggregate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,32 @@ SELECT distinct(`arch`) FROM `02_function_aggregate_table1` ORDER BY `arch` DESC

SELECT count(distinct(`arch`)) FROM `02_function_aggregate_table1`;

CREATE TABLE `02_function_aggregate_table2` (
`timestamp` timestamp NOT NULL,
`arch` string TAG,
`datacenter` string TAG,
`value` int,
`uvalue` uint64,
timestamp KEY (timestamp)) ENGINE=Analytic
WITH(
enable_ttl='false',
update_mode = 'append'
);

INSERT INTO `02_function_aggregate_table2`
(`timestamp`, `arch`, `datacenter`, `value`, `uvalue`)
VALUES
(1658304762, 'x86-64', 'china', 100, 10),
(1658304763, 'x86-64', 'china', 200, 10),
(1658304762, 'arm64', 'china', 110, 0),
(1658304763, 'arm64', 'china', 210, 0);

-- The should select empty column
SELECT count(*) FROM `02_function_aggregate_table1`;

-- Same with before, but query from sst
-- SQLNESS ARG pre_cmd=flush
SELECT count(*) FROM `02_function_aggregate_table1`;

DROP TABLE `02_function_aggregate_table1`;
DROP TABLE `02_function_aggregate_table2`;
2 changes: 1 addition & 1 deletion integration_tests/cases/common/optimizer/optimizer.result
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY

plan_type,plan,
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n TableScan: 07_optimizer_t projection=[name, value]"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8, priority=Low\n"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


DROP TABLE `07_optimizer_t`;
Expand Down
Loading