Skip to content
This repository was archived by the owner on Mar 12, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
483 changes: 265 additions & 218 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ members = [

[workspace.dependencies]
alloc_tracker = { path = "components/alloc_tracker" }
arrow = { version = "43.0.0", features = ["prettyprint"] }
arrow = { version = "47.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "43.0.0" }
arrow_ext = { path = "components/arrow_ext" }
analytic_engine = { path = "analytic_engine" }
arena = { path = "components/arena" }
async-stream = "0.3.4"
async-trait = "0.1.72"
async-trait = "0.1.73"
atomic_enum = "0.2.0"
base64 = "0.13"
bytes = "1"
Expand All @@ -101,8 +101,8 @@ clru = "0.6.1"
cluster = { path = "cluster" }
criterion = "0.5"
common_types = { path = "common_types" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "219cfb4ccb36045c73409127db51377d66ca0f33" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "219cfb4ccb36045c73409127db51377d66ca0f33" }
df_operator = { path = "df_operator" }
df_engine_extensions = { path = "df_engine_extensions" }
future_ext = { path = "components/future_ext" }
Expand All @@ -118,10 +118,10 @@ log = "0.4"
logger = { path = "components/logger" }
lru = "0.7.6"
id_allocator = { path = "components/id_allocator" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "schema" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "2d8060c09d7bc6ceefc7fd41492821f179ae40a8", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "2d8060c09d7bc6ceefc7fd41492821f179ae40a8", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "2d8060c09d7bc6ceefc7fd41492821f179ae40a8", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "2d8060c09d7bc6ceefc7fd41492821f179ae40a8", package = "schema" }
interpreters = { path = "interpreters" }
itertools = "0.10.5"
macros = { path = "components/macros" }
Expand All @@ -133,7 +133,7 @@ panic_ext = { path = "components/panic_ext" }
partitioned_lock = { path = "components/partitioned_lock" }
partition_table_engine = { path = "partition_table_engine" }
parquet_ext = { path = "components/parquet_ext" }
parquet = { version = "43.0.0" }
parquet = { version = "47.0.0" }
paste = "1.0"
pin-project-lite = "0.2.8"
pprof = "0.12.1"
Expand All @@ -158,7 +158,7 @@ size_ext = { path = "components/size_ext" }
smallvec = "1.6"
slog = "2.7"
spin = "0.9.6"
sqlparser = { version = "0.35", features = ["serde"] }
sqlparser = { version = "0.38", features = ["serde"] }
system_catalog = { path = "system_catalog" }
table_engine = { path = "table_engine" }
table_kv = { path = "components/table_kv" }
Expand Down
3 changes: 3 additions & 0 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl TryFrom<&SqlDataType> for DatumKind {
SqlDataType::BigInt(_) => Ok(Self::Int64),
SqlDataType::Int(_) => Ok(Self::Int32),
SqlDataType::SmallInt(_) => Ok(Self::Int16),
SqlDataType::Int8(_) => Ok(Self::Int8),
SqlDataType::String => Ok(Self::String),
SqlDataType::Varbinary(_) => Ok(Self::Varbinary),
SqlDataType::Date => Ok(Self::Date),
Expand Down Expand Up @@ -1466,6 +1467,7 @@ impl Datum {
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::Decimal256(_, _, _)
| ScalarValue::DurationNanosecond(_) => None,
}
}
Expand Down Expand Up @@ -1515,6 +1517,7 @@ impl<'a> DatumView<'a> {
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::Decimal256(_, _, _)
| ScalarValue::DurationNanosecond(_) => None,
}
}
Expand Down
3 changes: 2 additions & 1 deletion components/parquet_ext/src/meta_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use std::{ops::Range, sync::Arc};

use async_trait::async_trait;
use bytes::Bytes;
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use generic_error::GenericResult;
use parquet::{
arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder},
arrow::arrow_reader::ArrowReaderOptions,
errors::{ParquetError, Result},
file::{footer, metadata::ParquetMetaData},
};
Expand Down
4 changes: 2 additions & 2 deletions components/parquet_ext/src/prune/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ mod test {
}

fn prepare_parquet_schema_descr(schema: &ArrowSchema) -> SchemaDescPtr {
let mut fields = schema
let fields = schema
.fields()
.iter()
.map(|field| {
Expand All @@ -242,7 +242,7 @@ mod test {
})
.collect();
let schema = SchemaType::group_type_builder("schema")
.with_fields(&mut fields)
.with_fields(fields)
.build()
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion df_engine_extensions/src/dist_sql_query/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ impl ExecutionPlan for MockScan {
}

fn statistics(&self) -> datafusion::physical_plan::Statistics {
unimplemented!()
datafusion::physical_plan::Statistics::default()
}
}

Expand Down
5 changes: 3 additions & 2 deletions integration_tests/cases/common/dml/issue-1087.result
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,16 @@ String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT
String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
String("logical_plan"),String("TableScan: issue_1087 projection=[tsid, t, name, value]"),
String("initial_physical_plan"),String("ScanTable: table=issue_1087, parallelism=8\n"),
String("physical_plan after OutputRequirements"),String("OutputRequirementExec\n ScanTable: table=issue_1087, parallelism=8\n"),
String("physical_plan after aggregate_statistics"),String("SAME TEXT AS ABOVE"),
String("physical_plan after join_selection"),String("SAME TEXT AS ABOVE"),
String("physical_plan after PipelineFixer"),String("SAME TEXT AS ABOVE"),
String("physical_plan after repartition"),String("SAME TEXT AS ABOVE"),
String("physical_plan after EnforceDistribution"),String("SAME TEXT AS ABOVE"),
String("physical_plan after CombinePartialFinalAggregate"),String("SAME TEXT AS ABOVE"),
String("physical_plan after EnforceSorting"),String("SAME TEXT AS ABOVE"),
String("physical_plan after coalesce_batches"),String("SAME TEXT AS ABOVE"),
String("physical_plan after OutputRequirements"),String("ScanTable: table=issue_1087, parallelism=8\n"),
String("physical_plan after PipelineChecker"),String("SAME TEXT AS ABOVE"),
String("physical_plan after LimitAggregation"),String("SAME TEXT AS ABOVE"),
String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8\n"),


Expand Down
8 changes: 4 additions & 4 deletions integration_tests/cases/common/dml/issue-341.result
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ WHERE
tag1 = "t3";

plan_type,plan,
String("logical_plan"),String("Projection: issue341_t1.timestamp, issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value, tag1], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8\n"),
String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8\n"),


-- Repeat operations above, but with overwrite table
Expand Down Expand Up @@ -129,8 +129,8 @@ WHERE
tag1 = "t3";

plan_type,plan,
String("logical_plan"),String("Projection: issue341_t2.timestamp, issue341_t2.value\n TableScan: issue341_t2 projection=[timestamp, value, tag1], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8\n"),
String("logical_plan"),String("TableScan: issue341_t2 projection=[timestamp, value], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ScanTable: table=issue341_t2, parallelism=8\n"),


DROP TABLE IF EXISTS `issue341_t1`;
Expand Down
1 change: 0 additions & 1 deletion integration_tests/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ impl<T: Send + Sync> Database for CeresDB<T> {
let cmd = pre_cmd.expect("parse command");
match cmd {
Command::Flush => {
println!("Flush memtable...");
if let Err(e) = self.execute_flush().await {
panic!("Execute flush command failed, err:{e}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::{
logical_expr::{
expr::{Expr, InList},
logical_plan::{Filter, LogicalPlan, TableScan},
utils, Between, BinaryExpr, ExprSchemable, Operator,
Between, BinaryExpr, ExprSchemable, Operator,
},
optimizer::analyzer::AnalyzerRule,
scalar::ScalarValue,
Expand Down Expand Up @@ -116,12 +116,13 @@ impl AnalyzerRule for TypeConversion {
.map(|e| e.rewrite(&mut rewriter))
.collect::<Result<Vec<_>>>()?;

Ok(utils::from_plan(&plan, &expr, &new_inputs)?)
Ok(plan.with_new_exprs(expr, &new_inputs)?)
}
LogicalPlan::Subquery(_)
| LogicalPlan::Statement { .. }
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::Copy(_)
| LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
}
}
Expand Down Expand Up @@ -331,13 +332,17 @@ fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result<i64,
LocalResult::None => Err(ArrowError::CastError(format!(
"Error parsing '{s}' as timestamp: local time representation is invalid"
))),
LocalResult::Single(local_datetime) => {
Ok(local_datetime.with_timezone(&Utc).timestamp_nanos() / 1_000_000)
}

LocalResult::Ambiguous(local_datetime, _) => {
Ok(local_datetime.with_timezone(&Utc).timestamp_nanos() / 1_000_000)
}
LocalResult::Single(local_datetime) => Ok(local_datetime
.with_timezone(&Utc)
.timestamp_nanos_opt()
.unwrap()
/ 1_000_000),

LocalResult::Ambiguous(local_datetime, _) => Ok(local_datetime
.with_timezone(&Utc)
.timestamp_nanos_opt()
.unwrap()
/ 1_000_000),
}
}

Expand Down
4 changes: 2 additions & 2 deletions query_engine/src/datafusion_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl DfContextBuilder {

// Using default logcial optimizer, if want to add more custom rule, using
// `add_optimizer_rule` to add.
let state = SessionState::with_config_rt(df_session_config, self.runtime_env.clone())
let state = SessionState::new_with_config_rt(df_session_config, self.runtime_env.clone())
.with_query_planner(self.physical_planner.clone());

// Register analyzer rules
Expand All @@ -168,7 +168,7 @@ impl DfContextBuilder {
// Register iox optimizers, used by influxql.
let state = influxql_query::logical_optimizer::register_iox_logical_optimizers(state);

SessionContext::with_state(state)
SessionContext::new_with_state(state)
}

// TODO: this is not used now, bug of RepartitionAdapter is already fixed in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use std::sync::Arc;

use datafusion::{
config::ConfigOptions,
physical_optimizer::{optimizer::PhysicalOptimizerRule, repartition::Repartition},
physical_optimizer::{
enforce_distribution::EnforceDistribution, optimizer::PhysicalOptimizerRule,
},
physical_plan::ExecutionPlan,
};
use log::debug;
Expand All @@ -31,7 +33,7 @@ pub struct RepartitionAdapter {

impl Adapter for RepartitionAdapter {
fn may_adapt(original_rule: OptimizeRuleRef) -> OptimizeRuleRef {
if original_rule.name() == Repartition::new().name() {
if original_rule.name() == EnforceDistribution::new().name() {
Arc::new(Self { original_rule })
} else {
original_rule
Expand Down
4 changes: 2 additions & 2 deletions query_engine/src/datafusion_impl/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use df_engine_extensions::dist_sql_query::{
};
use futures::future::BoxFuture;
use generic_error::BoxError;
use prost::Message;
use snafu::ResultExt;
use table_engine::{
provider::{CeresdbOptions, ScanTable},
Expand Down Expand Up @@ -101,7 +100,8 @@ impl Preprocessor {

async fn preprocess_remote_plan(&self, encoded_plan: &[u8]) -> Result<Arc<dyn ExecutionPlan>> {
// Decode to datafusion physical plan.
let protobuf = protobuf::PhysicalPlanNode::decode(encoded_plan)

let protobuf = protobuf::PhysicalPlanNode::try_decode(encoded_plan)
.box_err()
.with_context(|| ExecutorWithCause {
msg: Some("failed to decode plan".to_string()),
Expand Down
2 changes: 2 additions & 0 deletions table_engine/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ impl<'a> TimeRangeExtractor<'a> {
| Operator::BitwiseXor
| Operator::BitwiseShiftRight
| Operator::BitwiseShiftLeft
| Operator::AtArrow
| Operator::ArrowAt
| Operator::StringConcat => TimeRange::min_to_max(),
}
}
Expand Down