From 10fd5370c4cd2f820c60a04e6b76a9b308919d9e Mon Sep 17 00:00:00 2001 From: Max Burke Date: Sun, 12 Dec 2021 19:32:16 -0800 Subject: [PATCH 1/9] point to UL repos --- datafusion-cli/Cargo.toml | 2 +- datafusion/Cargo.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 3212b67f967b4..dd4def185ca82 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -31,5 +31,5 @@ clap = "2.33" rustyline = "9.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } datafusion = { path = "../datafusion", version = "6.0.0" } -arrow = { version = "6.1.0" } +arrow = { git = "https://github.com/urbanlogiq/arrow-rs.git", branch="ul-6.4.0" } ballista = { path = "../ballista/rust/client", version = "0.6.0" } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index f0f368a75c6ec..83b80171fe6b3 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -52,8 +52,8 @@ avro = ["avro-rs", "num-traits"] [dependencies] ahash = "0.7" hashbrown = { version = "0.11", features = ["raw"] } -arrow = { version = "6.1.0", features = ["prettyprint"] } -parquet = { version = "6.1.0", features = ["arrow"] } +arrow = { git = "https://github.com/urbanlogiq/arrow-rs.git", branch="ul-6.4.0", features = ["prettyprint"] } +parquet = { git = "https://github.com/urbanlogiq/arrow-rs.git", branch = "ul-6.4.0", features = ["arrow"] } sqlparser = "0.12" paste = "^1.0" num_cpus = "1.13.0" From a1e8e8ae6578ebc8ea63f2ae5b157686c2a44d03 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Mon, 13 Dec 2021 14:04:59 -0800 Subject: [PATCH 2/9] Make ScalarValue::TimestampNanosecond moderately timezone aware --- datafusion/src/logical_plan/expr.rs | 12 +- datafusion/src/optimizer/constant_folding.rs | 4 +- datafusion/src/optimizer/utils.rs | 1 + .../src/physical_plan/datetime_expressions.rs | 8 +- .../src/physical_plan/expressions/binary.rs | 16 +- .../src/physical_plan/expressions/coercion.rs | 37 ++ .../src/physical_plan/expressions/min_max.rs | 65 ++- datafusion/src/physical_plan/functions.rs | 5 +- datafusion/src/physical_plan/hash_utils.rs | 2 +- datafusion/src/physical_plan/planner.rs | 1 + datafusion/src/physical_plan/sort.rs | 11 +- datafusion/src/scalar.rs | 392 +++++++++++++----- 12 files changed, 410 insertions(+), 144 deletions(-) diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index 04e95e73a297e..a6031f1f320d6 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -974,7 +974,7 @@ impl std::fmt::Display for Expr { ref left, ref right, ref op, - } => write!(f, "{} {} {}", left, op, right), + } => write!(f, "{:?} {} {:?}", left, op, right), Expr::AggregateFunction { /// Name of the function ref fun, @@ -1469,9 +1469,10 @@ macro_rules! make_timestamp_literal { #[doc = $DOC] impl TimestampLiteral for $TYPE { fn lit_timestamp_nano(&self) -> Expr { - Expr::Literal(ScalarValue::TimestampNanosecond(Some( - (self.clone()).into(), - ))) + Expr::Literal(ScalarValue::TimestampNanosecond( + Some((self.clone()).into()), + None, + )) } } }; @@ -2028,7 +2029,8 @@ mod tests { #[test] fn test_lit_timestamp_nano() { let expr = col("time").eq(lit_timestamp_nano(10)); // 10 is an implicit i32 - let expected = col("time").eq(lit(ScalarValue::TimestampNanosecond(Some(10)))); + let expected = + col("time").eq(lit(ScalarValue::TimestampNanosecond(Some(10), None))); assert_eq!(expr, expected); let i: i64 = 10; diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs index a0bc04a0caf5e..ec52f2dcbe126 100644 --- a/datafusion/src/optimizer/constant_folding.rs +++ b/datafusion/src/optimizer/constant_folding.rs @@ -789,7 +789,7 @@ mod tests { .build() .unwrap(); - let expected = "Projection: TimestampNanosecond(1599566400000000000)\ + let expected = "Projection: TimestampNanosecond(1599566400000000000, None)\ \n TableScan: test projection=None" .to_string(); let actual = get_optimized_plan_formatted(&plan, &Utc::now()); @@ -873,7 +873,7 @@ mod tests { // expect the same timestamp appears in both exprs let actual = get_optimized_plan_formatted(&plan, &time); let expected = format!( - "Projection: TimestampNanosecond({}), TimestampNanosecond({}) AS t2\ + "Projection: TimestampNanosecond({}, Some(\"UTC\")), TimestampNanosecond({}, Some(\"UTC\")) AS t2\ \n TableScan: test projection=None", time.timestamp_nanos(), time.timestamp_nanos() diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 52beb695529c3..3a86f993ff342 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -681,6 +681,7 @@ impl ConstEvaluator { &self.input_batch.schema(), &self.ctx_state, )?; + let col_val = phys_expr.evaluate(&self.input_batch)?; match col_val { crate::physical_plan::ColumnarValue::Array(a) => { diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs index a776c42f3e9df..09256bd20804d 100644 --- a/datafusion/src/physical_plan/datetime_expressions.rs +++ b/datafusion/src/physical_plan/datetime_expressions.rs @@ -177,6 +177,7 @@ pub fn make_now( move |_arg| { Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( now_ts, + Some("UTC".to_owned()), ))) } } @@ -236,8 +237,11 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result { let f = |x: Option| x.map(|x| date_trunc_single(granularity, x)).transpose(); Ok(match array { - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v)) => { - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?)) + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + (f)(*v)?, + tz_opt.clone(), + )) } ColumnarValue::Array(array) => { let array = array diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs index 456e8d42a5df3..d6cdfcd6bf69a 100644 --- a/datafusion/src/physical_plan/expressions/binary.rs +++ b/datafusion/src/physical_plan/expressions/binary.rs @@ -258,16 +258,16 @@ macro_rules! binary_array_op_scalar { DataType::Float32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float32Array), DataType::Float64 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float64Array), DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray), - DataType::Timestamp(TimeUnit::Nanosecond, None) => { + DataType::Timestamp(TimeUnit::Nanosecond, _) => { compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray) } - DataType::Timestamp(TimeUnit::Microsecond, None) => { + DataType::Timestamp(TimeUnit::Microsecond, _) => { compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray) } - DataType::Timestamp(TimeUnit::Millisecond, None) => { + DataType::Timestamp(TimeUnit::Millisecond, _) => { compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray) } - DataType::Timestamp(TimeUnit::Second, None) => { + DataType::Timestamp(TimeUnit::Second, _) => { compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray) } DataType::Date32 => { @@ -302,7 +302,7 @@ macro_rules! binary_array_op { DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array), DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array), DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray), - DataType::Timestamp(TimeUnit::Nanosecond, None) => { + DataType::Timestamp(TimeUnit::Nanosecond, _) => { compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray) } DataType::Timestamp(TimeUnit::Microsecond, None) => { @@ -468,12 +468,14 @@ fn common_binary_type( // re-write the error message of failed coercions to include the operator's information match result { - None => Err(DataFusionError::Plan( + None => { + Err(DataFusionError::Plan( format!( "'{:?} {} {:?}' can't be evaluated because there isn't a common type to coerce the types to", lhs_type, op, rhs_type ), - )), + )) + }, Some(t) => Ok(t) } } diff --git a/datafusion/src/physical_plan/expressions/coercion.rs b/datafusion/src/physical_plan/expressions/coercion.rs index 180b16548b32b..a449a8d129b42 100644 --- a/datafusion/src/physical_plan/expressions/coercion.rs +++ b/datafusion/src/physical_plan/expressions/coercion.rs @@ -100,11 +100,48 @@ pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option Option { use arrow::datatypes::DataType::*; + use arrow::datatypes::TimeUnit; match (lhs_type, rhs_type) { (Utf8, Date32) => Some(Date32), (Date32, Utf8) => Some(Date32), (Utf8, Date64) => Some(Date64), (Date64, Utf8) => Some(Date64), + (Timestamp(lhs_unit, lhs_tz), Timestamp(rhs_unit, rhs_tz)) => { + let tz = match (lhs_tz, rhs_tz) { + // can't cast across timezones + (Some(lhs_tz), Some(rhs_tz)) => { + if lhs_tz != rhs_tz { + return None; + } else { + Some(lhs_tz.clone()) + } + } + (Some(lhs_tz), None) => Some(lhs_tz.clone()), + (None, Some(rhs_tz)) => Some(rhs_tz.clone()), + (None, None) => None, + }; + + let unit = match (lhs_unit, rhs_unit) { + (TimeUnit::Second, TimeUnit::Millisecond) => TimeUnit::Second, + (TimeUnit::Second, TimeUnit::Microsecond) => TimeUnit::Second, + (TimeUnit::Second, TimeUnit::Nanosecond) => TimeUnit::Second, + (TimeUnit::Millisecond, TimeUnit::Second) => TimeUnit::Second, + (TimeUnit::Millisecond, TimeUnit::Microsecond) => TimeUnit::Millisecond, + (TimeUnit::Millisecond, TimeUnit::Nanosecond) => TimeUnit::Millisecond, + (TimeUnit::Microsecond, TimeUnit::Second) => TimeUnit::Second, + (TimeUnit::Microsecond, TimeUnit::Millisecond) => TimeUnit::Millisecond, + (TimeUnit::Microsecond, TimeUnit::Nanosecond) => TimeUnit::Microsecond, + (TimeUnit::Nanosecond, TimeUnit::Second) => TimeUnit::Second, + (TimeUnit::Nanosecond, TimeUnit::Millisecond) => TimeUnit::Millisecond, + (TimeUnit::Nanosecond, TimeUnit::Microsecond) => TimeUnit::Microsecond, + (l, r) => { + assert_eq!(l, r); + l.clone() + } + }; + + Some(Timestamp(unit, tz)) + } _ => None, } } diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs index 9e5b1e095cd6f..8f17ee0d4cf14 100644 --- a/datafusion/src/physical_plan/expressions/min_max.rs +++ b/datafusion/src/physical_plan/expressions/min_max.rs @@ -127,6 +127,12 @@ macro_rules! typed_min_max_batch { let value = compute::$OP(array); ScalarValue::$SCALAR(value) }}; + + ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident, $TZ:expr) => {{ + let array = $VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap(); + let value = compute::$OP(array); + ScalarValue::$SCALAR(value, $TZ.clone()) + }}; } // Statically-typed version of min/max(array) -> ScalarValue for non-string types. @@ -149,26 +155,35 @@ macro_rules! min_max_batch { DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP), DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP), DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP), - DataType::Timestamp(TimeUnit::Second, _) => { - typed_min_max_batch!($VALUES, TimestampSecondArray, TimestampSecond, $OP) + DataType::Timestamp(TimeUnit::Second, tz_opt) => { + typed_min_max_batch!( + $VALUES, + TimestampSecondArray, + TimestampSecond, + $OP, + tz_opt + ) } - DataType::Timestamp(TimeUnit::Millisecond, _) => typed_min_max_batch!( + DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!( $VALUES, TimestampMillisecondArray, TimestampMillisecond, - $OP + $OP, + tz_opt ), - DataType::Timestamp(TimeUnit::Microsecond, _) => typed_min_max_batch!( + DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!( $VALUES, TimestampMicrosecondArray, TimestampMicrosecond, - $OP + $OP, + tz_opt ), - DataType::Timestamp(TimeUnit::Nanosecond, _) => typed_min_max_batch!( + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!( $VALUES, TimestampNanosecondArray, TimestampNanosecond, - $OP + $OP, + tz_opt ), DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP), DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP), @@ -219,6 +234,18 @@ macro_rules! typed_min_max { (Some(a), Some(b)) => Some((*a).$OP(*b)), }) }}; + + ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident, $TZ:expr) => {{ + ScalarValue::$SCALAR( + match ($VALUE, $DELTA) { + (None, None) => None, + (Some(a), None) => Some(a.clone()), + (None, Some(b)) => Some(b.clone()), + (Some(a), Some(b)) => Some((*a).$OP(*b)), + }, + $TZ.clone(), + ) + }}; } // min/max of two scalar string values. @@ -273,26 +300,26 @@ macro_rules! min_max { (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => { typed_min_max_string!(lhs, rhs, LargeUtf8, $OP) } - (ScalarValue::TimestampSecond(lhs), ScalarValue::TimestampSecond(rhs)) => { - typed_min_max!(lhs, rhs, TimestampSecond, $OP) + (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => { + typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz) } ( - ScalarValue::TimestampMillisecond(lhs), - ScalarValue::TimestampMillisecond(rhs), + ScalarValue::TimestampMillisecond(lhs, l_tz), + ScalarValue::TimestampMillisecond(rhs, _), ) => { - typed_min_max!(lhs, rhs, TimestampMillisecond, $OP) + typed_min_max!(lhs, rhs, TimestampMillisecond, $OP, l_tz) } ( - ScalarValue::TimestampMicrosecond(lhs), - ScalarValue::TimestampMicrosecond(rhs), + ScalarValue::TimestampMicrosecond(lhs, l_tz), + ScalarValue::TimestampMicrosecond(rhs, _), ) => { - typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP) + typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP, l_tz) } ( - ScalarValue::TimestampNanosecond(lhs), - ScalarValue::TimestampNanosecond(rhs), + ScalarValue::TimestampNanosecond(lhs, l_tz), + ScalarValue::TimestampNanosecond(rhs, _), ) => { - typed_min_max!(lhs, rhs, TimestampNanosecond, $OP) + typed_min_max!(lhs, rhs, TimestampNanosecond, $OP, l_tz) } ( ScalarValue::Date32(lhs), diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index 72b2635be385d..ebea4f72158d3 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -592,7 +592,10 @@ pub fn return_type( BuiltinScalarFunction::ToTimestampSeconds => { Ok(DataType::Timestamp(TimeUnit::Second, None)) } - BuiltinScalarFunction::Now => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)), + BuiltinScalarFunction::Now => Ok(DataType::Timestamp( + TimeUnit::Nanosecond, + Some("UTC".to_owned()), + )), BuiltinScalarFunction::Translate => utf8_to_str_type(&arg_types[0], "translate"), BuiltinScalarFunction::Trim => utf8_to_str_type(&arg_types[0], "trim"), BuiltinScalarFunction::Upper => utf8_to_str_type(&arg_types[0], "upper"), diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index fbd0c9716e406..25d1f3fdd85c3 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -369,7 +369,7 @@ pub fn create_hashes<'a>( multi_col ); } - DataType::Timestamp(TimeUnit::Nanosecond, None) => { + DataType::Timestamp(TimeUnit::Nanosecond, _) => { hash_array_primitive!( TimestampNanosecondArray, col, diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 402f119e8ea01..df606df7e0a7d 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -634,6 +634,7 @@ impl DefaultPhysicalPlanner { let physical_input = self.create_initial_plan(input, ctx_state).await?; let input_schema = physical_input.as_ref().schema(); let input_dfschema = input.as_ref().schema(); + let runtime_expr = self.create_physical_expr( predicate, input_dfschema, diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs index a606906e86806..1adb0870a882f 100644 --- a/datafusion/src/physical_plan/sort.rs +++ b/datafusion/src/physical_plan/sort.rs @@ -29,7 +29,7 @@ use crate::physical_plan::{ }; pub use arrow::compute::SortOptions; use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions}; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use arrow::{array::ArrayRef, error::ArrowError}; @@ -201,6 +201,15 @@ fn sort_batch( None, )?; + let schema = Arc::new(Schema::new( + schema + .fields() + .iter() + .zip(batch.columns().iter().map(|col| col.data_type())) + .map(|(field, ty)| Field::new(field.name(), ty.clone(), field.is_nullable())) + .collect::>(), + )); + // reorder all rows based on sorted indices RecordBatch::try_new( schema, diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 33bc9dd10486a..db0b694d35b26 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -20,6 +20,7 @@ use crate::error::{DataFusionError, Result}; use arrow::{ array::*, + compute::kernels::cast::cast, datatypes::{ ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit, TimeUnit, @@ -75,13 +76,13 @@ pub enum ScalarValue { /// Date stored as a signed 64bit int Date64(Option), /// Timestamp Second - TimestampSecond(Option), + TimestampSecond(Option, Option), /// Timestamp Milliseconds - TimestampMillisecond(Option), + TimestampMillisecond(Option, Option), /// Timestamp Microseconds - TimestampMicrosecond(Option), + TimestampMicrosecond(Option, Option), /// Timestamp Nanoseconds - TimestampNanosecond(Option), + TimestampNanosecond(Option, Option), /// Interval with YearMonth unit IntervalYearMonth(Option), /// Interval with DayTime unit @@ -144,14 +145,14 @@ impl PartialEq for ScalarValue { (Date32(_), _) => false, (Date64(v1), Date64(v2)) => v1.eq(v2), (Date64(_), _) => false, - (TimestampSecond(v1), TimestampSecond(v2)) => v1.eq(v2), - (TimestampSecond(_), _) => false, - (TimestampMillisecond(v1), TimestampMillisecond(v2)) => v1.eq(v2), - (TimestampMillisecond(_), _) => false, - (TimestampMicrosecond(v1), TimestampMicrosecond(v2)) => v1.eq(v2), - (TimestampMicrosecond(_), _) => false, - (TimestampNanosecond(v1), TimestampNanosecond(v2)) => v1.eq(v2), - (TimestampNanosecond(_), _) => false, + (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.eq(v2), + (TimestampSecond(_, _), _) => false, + (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => v1.eq(v2), + (TimestampMillisecond(_, _), _) => false, + (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => v1.eq(v2), + (TimestampMicrosecond(_, _), _) => false, + (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => v1.eq(v2), + (TimestampNanosecond(_, _), _) => false, (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.eq(v2), (IntervalYearMonth(_), _) => false, (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.eq(v2), @@ -221,14 +222,20 @@ impl PartialOrd for ScalarValue { (Date32(_), _) => None, (Date64(v1), Date64(v2)) => v1.partial_cmp(v2), (Date64(_), _) => None, - (TimestampSecond(v1), TimestampSecond(v2)) => v1.partial_cmp(v2), - (TimestampSecond(_), _) => None, - (TimestampMillisecond(v1), TimestampMillisecond(v2)) => v1.partial_cmp(v2), - (TimestampMillisecond(_), _) => None, - (TimestampMicrosecond(v1), TimestampMicrosecond(v2)) => v1.partial_cmp(v2), - (TimestampMicrosecond(_), _) => None, - (TimestampNanosecond(v1), TimestampNanosecond(v2)) => v1.partial_cmp(v2), - (TimestampNanosecond(_), _) => None, + (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.partial_cmp(v2), + (TimestampSecond(_, _), _) => None, + (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => { + v1.partial_cmp(v2) + } + (TimestampMillisecond(_, _), _) => None, + (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => { + v1.partial_cmp(v2) + } + (TimestampMicrosecond(_, _), _) => None, + (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => { + v1.partial_cmp(v2) + } + (TimestampNanosecond(_, _), _) => None, (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.partial_cmp(v2), (IntervalYearMonth(_), _) => None, (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.partial_cmp(v2), @@ -280,10 +287,10 @@ impl std::hash::Hash for ScalarValue { } Date32(v) => v.hash(state), Date64(v) => v.hash(state), - TimestampSecond(v) => v.hash(state), - TimestampMillisecond(v) => v.hash(state), - TimestampMicrosecond(v) => v.hash(state), - TimestampNanosecond(v) => v.hash(state), + TimestampSecond(v, _) => v.hash(state), + TimestampMillisecond(v, _) => v.hash(state), + TimestampMicrosecond(v, _) => v.hash(state), + TimestampNanosecond(v, _) => v.hash(state), IntervalYearMonth(v) => v.hash(state), IntervalDayTime(v) => v.hash(state), Struct(v, t) => { @@ -319,6 +326,19 @@ fn get_dict_value( Ok((dict_array.values(), Some(values_index))) } +macro_rules! typed_cast_tz { + ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident, $TZ:expr) => {{ + let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap(); + ScalarValue::$SCALAR( + match array.is_null($index) { + true => None, + false => Some(array.value($index).into()), + }, + $TZ.clone(), + ) + }}; +} + macro_rules! typed_cast { ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident) => {{ let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap(); @@ -367,25 +387,25 @@ macro_rules! build_timestamp_list { Some(values) => { let values = values.as_ref(); match $TIME_UNIT { - TimeUnit::Second => build_values_list!( + TimeUnit::Second => build_values_list_tz!( TimestampSecondBuilder, TimestampSecond, values, $SIZE ), - TimeUnit::Microsecond => build_values_list!( + TimeUnit::Microsecond => build_values_list_tz!( TimestampMillisecondBuilder, TimestampMillisecond, values, $SIZE ), - TimeUnit::Millisecond => build_values_list!( + TimeUnit::Millisecond => build_values_list_tz!( TimestampMicrosecondBuilder, TimestampMicrosecond, values, $SIZE ), - TimeUnit::Nanosecond => build_values_list!( + TimeUnit::Nanosecond => build_values_list_tz!( TimestampNanosecondBuilder, TimestampNanosecond, values, @@ -420,6 +440,29 @@ macro_rules! build_values_list { }}; } +macro_rules! build_values_list_tz { + ($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{ + let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len())); + + for _ in 0..$SIZE { + for scalar_value in $VALUES { + match scalar_value { + ScalarValue::$SCALAR_TY(Some(v), _) => { + builder.values().append_value(v.clone()).unwrap() + } + ScalarValue::$SCALAR_TY(None, _) => { + builder.values().append_null().unwrap(); + } + _ => panic!("Incompatible ScalarValue for list"), + }; + } + builder.append(true).unwrap(); + } + + builder.finish() + }}; +} + macro_rules! build_array_from_option { ($DATA_TYPE:ident, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{ match $EXPR { @@ -435,7 +478,12 @@ macro_rules! build_array_from_option { }}; ($DATA_TYPE:ident, $ENUM:expr, $ENUM2:expr, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{ match $EXPR { - Some(value) => Arc::new($ARRAY_TYPE::from_value(*value, $SIZE)), + Some(value) => { + let array: ArrayRef = Arc::new($ARRAY_TYPE::from_value(*value, $SIZE)); + // Need to call cast to cast to final data type with timezone/extra param + cast(&array, &DataType::$DATA_TYPE($ENUM, $ENUM2)) + .expect("cannot do temporal cast") + } None => new_null_array(&DataType::$DATA_TYPE($ENUM, $ENUM2), $SIZE), } }}; @@ -465,17 +513,17 @@ impl ScalarValue { ScalarValue::Int16(_) => DataType::Int16, ScalarValue::Int32(_) => DataType::Int32, ScalarValue::Int64(_) => DataType::Int64, - ScalarValue::TimestampSecond(_) => { - DataType::Timestamp(TimeUnit::Second, None) + ScalarValue::TimestampSecond(_, tz_opt) => { + DataType::Timestamp(TimeUnit::Second, tz_opt.clone()) } - ScalarValue::TimestampMillisecond(_) => { - DataType::Timestamp(TimeUnit::Millisecond, None) + ScalarValue::TimestampMillisecond(_, tz_opt) => { + DataType::Timestamp(TimeUnit::Millisecond, tz_opt.clone()) } - ScalarValue::TimestampMicrosecond(_) => { - DataType::Timestamp(TimeUnit::Microsecond, None) + ScalarValue::TimestampMicrosecond(_, tz_opt) => { + DataType::Timestamp(TimeUnit::Microsecond, tz_opt.clone()) } - ScalarValue::TimestampNanosecond(_) => { - DataType::Timestamp(TimeUnit::Nanosecond, None) + ScalarValue::TimestampNanosecond(_, tz_opt) => { + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()) } ScalarValue::Float32(_) => DataType::Float32, ScalarValue::Float64(_) => DataType::Float64, @@ -537,9 +585,10 @@ impl ScalarValue { | ScalarValue::Utf8(None) | ScalarValue::LargeUtf8(None) | ScalarValue::List(None, _) - | ScalarValue::TimestampMillisecond(None) - | ScalarValue::TimestampMicrosecond(None) - | ScalarValue::TimestampNanosecond(None) + | ScalarValue::TimestampSecond(None, _) + | ScalarValue::TimestampMillisecond(None, _) + | ScalarValue::TimestampMicrosecond(None, _) + | ScalarValue::TimestampNanosecond(None, _) | ScalarValue::Struct(None, _) ) } @@ -619,6 +668,28 @@ impl ScalarValue { }}; } + macro_rules! build_array_primitive_tz { + ($ARRAY_TY:ident, $SCALAR_TY:ident) => {{ + { + let array = scalars + .map(|sv| { + if let ScalarValue::$SCALAR_TY(v, _) = sv { + Ok(v) + } else { + Err(DataFusionError::Internal(format!( + "Inconsistent types in ScalarValue::iter_to_array. \ + Expected {:?}, got {:?}", + data_type, sv + ))) + } + }) + .collect::>()?; + + Arc::new(array) + } + }}; + } + /// Creates an array of $ARRAY_TY by unpacking values of /// SCALAR_TY for "string-like" types. macro_rules! build_array_string { @@ -723,17 +794,17 @@ impl ScalarValue { DataType::LargeBinary => build_array_string!(LargeBinaryArray, LargeBinary), DataType::Date32 => build_array_primitive!(Date32Array, Date32), DataType::Date64 => build_array_primitive!(Date64Array, Date64), - DataType::Timestamp(TimeUnit::Second, None) => { - build_array_primitive!(TimestampSecondArray, TimestampSecond) + DataType::Timestamp(TimeUnit::Second, _) => { + build_array_primitive_tz!(TimestampSecondArray, TimestampSecond) } - DataType::Timestamp(TimeUnit::Millisecond, None) => { - build_array_primitive!(TimestampMillisecondArray, TimestampMillisecond) + DataType::Timestamp(TimeUnit::Millisecond, _) => { + build_array_primitive_tz!(TimestampMillisecondArray, TimestampMillisecond) } - DataType::Timestamp(TimeUnit::Microsecond, None) => { - build_array_primitive!(TimestampMicrosecondArray, TimestampMicrosecond) + DataType::Timestamp(TimeUnit::Microsecond, _) => { + build_array_primitive_tz!(TimestampMicrosecondArray, TimestampMicrosecond) } - DataType::Timestamp(TimeUnit::Nanosecond, None) => { - build_array_primitive!(TimestampNanosecondArray, TimestampNanosecond) + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + build_array_primitive_tz!(TimestampNanosecondArray, TimestampNanosecond) } DataType::Interval(IntervalUnit::DayTime) => { build_array_primitive!(IntervalDayTimeArray, IntervalDayTime) @@ -931,35 +1002,35 @@ impl ScalarValue { ScalarValue::UInt64(e) => { build_array_from_option!(UInt64, UInt64Array, e, size) } - ScalarValue::TimestampSecond(e) => build_array_from_option!( + ScalarValue::TimestampSecond(e, tz_opt) => build_array_from_option!( Timestamp, TimeUnit::Second, - None, + tz_opt.clone(), TimestampSecondArray, e, size ), - ScalarValue::TimestampMillisecond(e) => build_array_from_option!( + ScalarValue::TimestampMillisecond(e, tz_opt) => build_array_from_option!( Timestamp, TimeUnit::Millisecond, - None, + tz_opt.clone(), TimestampMillisecondArray, e, size ), - ScalarValue::TimestampMicrosecond(e) => build_array_from_option!( + ScalarValue::TimestampMicrosecond(e, tz_opt) => build_array_from_option!( Timestamp, TimeUnit::Microsecond, - None, + tz_opt.clone(), TimestampMicrosecondArray, e, size ), - ScalarValue::TimestampNanosecond(e) => build_array_from_option!( + ScalarValue::TimestampNanosecond(e, tz_opt) => build_array_from_option!( Timestamp, TimeUnit::Nanosecond, - None, + tz_opt.clone(), TimestampNanosecondArray, e, size @@ -1126,27 +1197,41 @@ impl ScalarValue { DataType::Date64 => { typed_cast!(array, index, Date64Array, Date64) } - DataType::Timestamp(TimeUnit::Second, _) => { - typed_cast!(array, index, TimestampSecondArray, TimestampSecond) + DataType::Timestamp(TimeUnit::Second, tz_opt) => { + typed_cast_tz!( + array, + index, + TimestampSecondArray, + TimestampSecond, + tz_opt + ) } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - typed_cast!( + DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => { + typed_cast_tz!( array, index, TimestampMillisecondArray, - TimestampMillisecond + TimestampMillisecond, + tz_opt ) } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - typed_cast!( + DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => { + typed_cast_tz!( array, index, TimestampMicrosecondArray, - TimestampMicrosecond + TimestampMicrosecond, + tz_opt ) } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - typed_cast!(array, index, TimestampNanosecondArray, TimestampNanosecond) + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => { + typed_cast_tz!( + array, + index, + TimestampNanosecondArray, + TimestampNanosecond, + tz_opt + ) } DataType::Dictionary(index_type, _) => { let (values, values_index) = match **index_type { @@ -1262,16 +1347,16 @@ impl ScalarValue { ScalarValue::Date64(val) => { eq_array_primitive!(array, index, Date64Array, val) } - ScalarValue::TimestampSecond(val) => { + ScalarValue::TimestampSecond(val, _) => { eq_array_primitive!(array, index, TimestampSecondArray, val) } - ScalarValue::TimestampMillisecond(val) => { + ScalarValue::TimestampMillisecond(val, _) => { eq_array_primitive!(array, index, TimestampMillisecondArray, val) } - ScalarValue::TimestampMicrosecond(val) => { + ScalarValue::TimestampMicrosecond(val, _) => { eq_array_primitive!(array, index, TimestampMicrosecondArray, val) } - ScalarValue::TimestampNanosecond(val) => { + ScalarValue::TimestampNanosecond(val, _) => { eq_array_primitive!(array, index, TimestampNanosecondArray, val) } ScalarValue::IntervalYearMonth(val) => { @@ -1420,10 +1505,10 @@ impl TryFrom for i64 { match value { ScalarValue::Int64(Some(inner_value)) | ScalarValue::Date64(Some(inner_value)) - | ScalarValue::TimestampNanosecond(Some(inner_value)) - | ScalarValue::TimestampMicrosecond(Some(inner_value)) - | ScalarValue::TimestampMillisecond(Some(inner_value)) - | ScalarValue::TimestampSecond(Some(inner_value)) => Ok(inner_value), + | ScalarValue::TimestampNanosecond(Some(inner_value), _) + | ScalarValue::TimestampMicrosecond(Some(inner_value), _) + | ScalarValue::TimestampMillisecond(Some(inner_value), _) + | ScalarValue::TimestampSecond(Some(inner_value), _) => Ok(inner_value), _ => Err(DataFusionError::Internal(format!( "Cannot convert {:?} to {}", value, @@ -1462,17 +1547,17 @@ impl TryFrom<&DataType> for ScalarValue { DataType::LargeUtf8 => ScalarValue::LargeUtf8(None), DataType::Date32 => ScalarValue::Date32(None), DataType::Date64 => ScalarValue::Date64(None), - DataType::Timestamp(TimeUnit::Second, _) => { - ScalarValue::TimestampSecond(None) + DataType::Timestamp(TimeUnit::Second, tz_opt) => { + ScalarValue::TimestampSecond(None, tz_opt.clone()) } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - ScalarValue::TimestampMillisecond(None) + DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => { + ScalarValue::TimestampMillisecond(None, tz_opt.clone()) } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - ScalarValue::TimestampMicrosecond(None) + DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => { + ScalarValue::TimestampMicrosecond(None, tz_opt.clone()) } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - ScalarValue::TimestampNanosecond(None) + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => { + ScalarValue::TimestampNanosecond(None, tz_opt.clone()) } DataType::Dictionary(_index_type, value_type) => { value_type.as_ref().try_into()? @@ -1516,10 +1601,10 @@ impl fmt::Display for ScalarValue { ScalarValue::UInt16(e) => format_option!(f, e)?, ScalarValue::UInt32(e) => format_option!(f, e)?, ScalarValue::UInt64(e) => format_option!(f, e)?, - ScalarValue::TimestampSecond(e) => format_option!(f, e)?, - ScalarValue::TimestampMillisecond(e) => format_option!(f, e)?, - ScalarValue::TimestampMicrosecond(e) => format_option!(f, e)?, - ScalarValue::TimestampNanosecond(e) => format_option!(f, e)?, + ScalarValue::TimestampSecond(e, _) => format_option!(f, e)?, + ScalarValue::TimestampMillisecond(e, _) => format_option!(f, e)?, + ScalarValue::TimestampMicrosecond(e, _) => format_option!(f, e)?, + ScalarValue::TimestampNanosecond(e, _) => format_option!(f, e)?, ScalarValue::Utf8(e) => format_option!(f, e)?, ScalarValue::LargeUtf8(e) => format_option!(f, e)?, ScalarValue::Binary(e) => match e { @@ -1590,15 +1675,17 @@ impl fmt::Debug for ScalarValue { ScalarValue::UInt16(_) => write!(f, "UInt16({})", self), ScalarValue::UInt32(_) => write!(f, "UInt32({})", self), ScalarValue::UInt64(_) => write!(f, "UInt64({})", self), - ScalarValue::TimestampSecond(_) => write!(f, "TimestampSecond({})", self), - ScalarValue::TimestampMillisecond(_) => { - write!(f, "TimestampMillisecond({})", self) + ScalarValue::TimestampSecond(_, tz_opt) => { + write!(f, "TimestampSecond({}, {:?})", self, tz_opt) + } + ScalarValue::TimestampMillisecond(_, tz_opt) => { + write!(f, "TimestampMillisecond({}, {:?})", self, tz_opt) } - ScalarValue::TimestampMicrosecond(_) => { - write!(f, "TimestampMicrosecond({})", self) + ScalarValue::TimestampMicrosecond(_, tz_opt) => { + write!(f, "TimestampMicrosecond({}, {:?})", self, tz_opt) } - ScalarValue::TimestampNanosecond(_) => { - write!(f, "TimestampNanosecond({})", self) + ScalarValue::TimestampNanosecond(_, tz_opt) => { + write!(f, "TimestampNanosecond({}, {:?})", self, tz_opt) } ScalarValue::Utf8(None) => write!(f, "Utf8({})", self), ScalarValue::Utf8(Some(_)) => write!(f, "Utf8(\"{}\")", self), @@ -1650,25 +1737,25 @@ impl ScalarType for Float32Type { impl ScalarType for TimestampSecondType { fn scalar(r: Option) -> ScalarValue { - ScalarValue::TimestampSecond(r) + ScalarValue::TimestampSecond(r, None) } } impl ScalarType for TimestampMillisecondType { fn scalar(r: Option) -> ScalarValue { - ScalarValue::TimestampMillisecond(r) + ScalarValue::TimestampMillisecond(r, None) } } impl ScalarType for TimestampMicrosecondType { fn scalar(r: Option) -> ScalarValue { - ScalarValue::TimestampMicrosecond(r) + ScalarValue::TimestampMicrosecond(r, None) } } impl ScalarType for TimestampNanosecondType { fn scalar(r: Option) -> ScalarValue { - ScalarValue::TimestampNanosecond(r) + ScalarValue::TimestampNanosecond(r, None) } } @@ -1760,6 +1847,23 @@ mod tests { }}; } + /// Creates array directly and via ScalarValue and ensures they are the same + /// but for variants that carry a timezone field. + macro_rules! check_scalar_iter_tz { + ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{ + let scalars: Vec<_> = $INPUT + .iter() + .map(|v| ScalarValue::$SCALAR_T(*v, None)) + .collect(); + + let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap(); + + let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT)); + + assert_eq!(&array, &expected); + }}; + } + /// Creates array directly and via ScalarValue and ensures they /// are the same, for string arrays macro_rules! check_scalar_iter_string { @@ -1813,22 +1917,22 @@ mod tests { check_scalar_iter!(UInt32, UInt32Array, vec![Some(1), None, Some(3)]); check_scalar_iter!(UInt64, UInt64Array, vec![Some(1), None, Some(3)]); - check_scalar_iter!( + check_scalar_iter_tz!( TimestampSecond, TimestampSecondArray, vec![Some(1), None, Some(3)] ); - check_scalar_iter!( + check_scalar_iter_tz!( TimestampMillisecond, TimestampMillisecondArray, vec![Some(1), None, Some(3)] ); - check_scalar_iter!( + check_scalar_iter_tz!( TimestampMicrosecond, TimestampMicrosecondArray, vec![Some(1), None, Some(3)] ); - check_scalar_iter!( + check_scalar_iter_tz!( TimestampNanosecond, TimestampNanosecondArray, vec![Some(1), None, Some(3)] @@ -1909,7 +2013,7 @@ mod tests { // Since ScalarValues are used in a non trivial number of places, // making it larger means significant more memory consumption // per distinct value. - assert_eq!(std::mem::size_of::(), 32); + assert_eq!(std::mem::size_of::(), 48); } #[test] @@ -1956,6 +2060,17 @@ mod tests { scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(), } }}; + + ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident, $TZ:expr) => {{ + let tz = $TZ; + TestCase { + array: Arc::new($INPUT.iter().collect::<$ARRAY_TY>()), + scalars: $INPUT + .iter() + .map(|v| ScalarValue::$SCALAR_TY(*v, tz.clone())) + .collect(), + } + }}; } macro_rules! make_str_test_case { @@ -2020,10 +2135,49 @@ mod tests { make_binary_test_case!(str_vals, LargeBinaryArray, LargeBinary), make_test_case!(i32_vals, Date32Array, Date32), make_test_case!(i64_vals, Date64Array, Date64), - make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond), - make_test_case!(i64_vals, TimestampMillisecondArray, TimestampMillisecond), - make_test_case!(i64_vals, TimestampMicrosecondArray, TimestampMicrosecond), - make_test_case!(i64_vals, TimestampNanosecondArray, TimestampNanosecond), + make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond, None), + make_test_case!( + i64_vals, + TimestampSecondArray, + TimestampSecond, + Some("UTC".to_owned()) + ), + make_test_case!( + i64_vals, + TimestampMillisecondArray, + TimestampMillisecond, + None + ), + make_test_case!( + i64_vals, + TimestampMillisecondArray, + TimestampMillisecond, + Some("UTC".to_owned()) + ), + make_test_case!( + i64_vals, + TimestampMicrosecondArray, + TimestampMicrosecond, + None + ), + make_test_case!( + i64_vals, + TimestampMicrosecondArray, + TimestampMicrosecond, + Some("UTC".to_owned()) + ), + make_test_case!( + i64_vals, + TimestampNanosecondArray, + TimestampNanosecond, + None + ), + make_test_case!( + i64_vals, + TimestampNanosecondArray, + TimestampNanosecond, + Some("UTC".to_owned()) + ), make_test_case!(i32_vals, IntervalYearMonthArray, IntervalYearMonth), make_test_case!(i64_vals, IntervalDayTimeArray, IntervalDayTime), make_str_dict_test_case!(str_vals, Int8Type, Utf8), @@ -2650,4 +2804,30 @@ mod tests { assert_eq!(array, &expected); } + + #[test] + fn scalar_timestamp_ns_utc_timezone() { + let scalar = ScalarValue::TimestampNanosecond( + Some(1599566400000000000), + Some("UTC".to_owned()), + ); + + assert_eq!( + scalar.get_datatype(), + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned())) + ); + + let array = scalar.to_array(); + assert_eq!(array.len(), 1); + assert_eq!( + array.data_type(), + &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned())) + ); + + let newscalar = ScalarValue::try_from_array(&array, 0).unwrap(); + assert_eq!( + newscalar.get_datatype(), + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned())) + ); + } } From 5fd5bf1c7317dd7541ad5b40aa33282f72928efa Mon Sep 17 00:00:00 2001 From: Max Burke Date: Wed, 15 Dec 2021 17:09:24 -0800 Subject: [PATCH 3/9] cargo fmt --- datafusion/src/physical_plan/functions.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index b80318ff01bca..df073b62c5b78 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -616,7 +616,9 @@ pub fn return_type( TimeUnit::Nanosecond, Some("UTC".to_owned()), )), - BuiltinScalarFunction::Translate => utf8_to_str_type(&input_expr_types[0], "translate"), + BuiltinScalarFunction::Translate => { + utf8_to_str_type(&input_expr_types[0], "translate") + } BuiltinScalarFunction::Trim => utf8_to_str_type(&input_expr_types[0], "trim"), BuiltinScalarFunction::Upper => utf8_to_str_type(&input_expr_types[0], "upper"), BuiltinScalarFunction::RegexpMatch => Ok(match input_expr_types[0] { From dc800e17f548aebf6f3e332264787e7dd4853ba1 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Wed, 15 Dec 2021 19:36:10 -0800 Subject: [PATCH 4/9] fix ballista build --- .../core/src/serde/logical_plan/from_proto.rs | 20 +++++++++---------- .../core/src/serde/logical_plan/to_proto.rs | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index ba40488f4028c..dfac547d7bb35 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -494,10 +494,10 @@ fn typechecked_scalar_value_conversion( ScalarValue::Date32(Some(*v)) } (Value::TimeMicrosecondValue(v), PrimitiveScalarType::TimeMicrosecond) => { - ScalarValue::TimestampMicrosecond(Some(*v)) + ScalarValue::TimestampMicrosecond(Some(*v), None) } (Value::TimeNanosecondValue(v), PrimitiveScalarType::TimeMicrosecond) => { - ScalarValue::TimestampNanosecond(Some(*v)) + ScalarValue::TimestampNanosecond(Some(*v), None) } (Value::Utf8Value(v), PrimitiveScalarType::Utf8) => { ScalarValue::Utf8(Some(v.to_owned())) @@ -530,10 +530,10 @@ fn typechecked_scalar_value_conversion( PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None), PrimitiveScalarType::Date32 => ScalarValue::Date32(None), PrimitiveScalarType::TimeMicrosecond => { - ScalarValue::TimestampMicrosecond(None) + ScalarValue::TimestampMicrosecond(None, None) } PrimitiveScalarType::TimeNanosecond => { - ScalarValue::TimestampNanosecond(None) + ScalarValue::TimestampNanosecond(None, None) } PrimitiveScalarType::Null => { return Err(proto_error( @@ -593,10 +593,10 @@ impl TryInto for &protobuf::scalar_value::Value ScalarValue::Date32(Some(*v)) } protobuf::scalar_value::Value::TimeMicrosecondValue(v) => { - ScalarValue::TimestampMicrosecond(Some(*v)) + ScalarValue::TimestampMicrosecond(Some(*v), None) } protobuf::scalar_value::Value::TimeNanosecondValue(v) => { - ScalarValue::TimestampNanosecond(Some(*v)) + ScalarValue::TimestampNanosecond(Some(*v), None) } protobuf::scalar_value::Value::ListValue(v) => v.try_into()?, protobuf::scalar_value::Value::NullListValue(v) => { @@ -758,10 +758,10 @@ impl TryInto for protobuf::PrimitiveScalarType protobuf::PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None), protobuf::PrimitiveScalarType::Date32 => ScalarValue::Date32(None), protobuf::PrimitiveScalarType::TimeMicrosecond => { - ScalarValue::TimestampMicrosecond(None) + ScalarValue::TimestampMicrosecond(None, None) } protobuf::PrimitiveScalarType::TimeNanosecond => { - ScalarValue::TimestampNanosecond(None) + ScalarValue::TimestampNanosecond(None, None) } }) } @@ -811,10 +811,10 @@ impl TryInto for &protobuf::ScalarValue { ScalarValue::Date32(Some(*v)) } protobuf::scalar_value::Value::TimeMicrosecondValue(v) => { - ScalarValue::TimestampMicrosecond(Some(*v)) + ScalarValue::TimestampMicrosecond(Some(*v), None) } protobuf::scalar_value::Value::TimeNanosecondValue(v) => { - ScalarValue::TimestampNanosecond(Some(*v)) + ScalarValue::TimestampNanosecond(Some(*v), None) } protobuf::scalar_value::Value::ListValue(scalar_list) => { let protobuf::ScalarListValue { diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 68ed7097632f1..47b5df47cd730 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -652,12 +652,12 @@ impl TryFrom<&datafusion::scalar::ScalarValue> for protobuf::ScalarValue { datafusion::scalar::ScalarValue::Date32(val) => { create_proto_scalar(val, PrimitiveScalarType::Date32, |s| Value::Date32Value(*s)) } - datafusion::scalar::ScalarValue::TimestampMicrosecond(val) => { + datafusion::scalar::ScalarValue::TimestampMicrosecond(val, _) => { create_proto_scalar(val, PrimitiveScalarType::TimeMicrosecond, |s| { Value::TimeMicrosecondValue(*s) }) } - datafusion::scalar::ScalarValue::TimestampNanosecond(val) => { + datafusion::scalar::ScalarValue::TimestampNanosecond(val, _) => { create_proto_scalar(val, PrimitiveScalarType::TimeNanosecond, |s| { Value::TimeNanosecondValue(*s) }) From 1f500c7b88c4e016f10ded8f1b7ef5e42436d431 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Thu, 16 Dec 2021 08:22:06 -0800 Subject: [PATCH 5/9] fix ballista tests --- .../rust/core/src/serde/logical_plan/mod.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index a5e2aa0e98c60..a0f481a803258 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -216,8 +216,8 @@ mod roundtrip_tests { ScalarValue::LargeUtf8(None), ScalarValue::List(None, Box::new(DataType::Boolean)), ScalarValue::Date32(None), - ScalarValue::TimestampMicrosecond(None), - ScalarValue::TimestampNanosecond(None), + ScalarValue::TimestampMicrosecond(None, None), + ScalarValue::TimestampNanosecond(None, None), ScalarValue::Boolean(Some(true)), ScalarValue::Boolean(Some(false)), ScalarValue::Float32(Some(1.0)), @@ -256,11 +256,11 @@ mod roundtrip_tests { ScalarValue::LargeUtf8(Some(String::from("Test Large utf8"))), ScalarValue::Date32(Some(0)), ScalarValue::Date32(Some(i32::MAX)), - ScalarValue::TimestampNanosecond(Some(0)), - ScalarValue::TimestampNanosecond(Some(i64::MAX)), - ScalarValue::TimestampMicrosecond(Some(0)), - ScalarValue::TimestampMicrosecond(Some(i64::MAX)), - ScalarValue::TimestampMicrosecond(None), + ScalarValue::TimestampNanosecond(Some(0), None), + ScalarValue::TimestampNanosecond(Some(i64::MAX), None), + ScalarValue::TimestampMicrosecond(Some(0), None), + ScalarValue::TimestampMicrosecond(Some(i64::MAX), None), + ScalarValue::TimestampMicrosecond(None, None), ScalarValue::List( Some(Box::new(vec![ ScalarValue::Float32(Some(-213.1)), @@ -619,8 +619,8 @@ mod roundtrip_tests { ScalarValue::Utf8(None), ScalarValue::LargeUtf8(None), ScalarValue::Date32(None), - ScalarValue::TimestampMicrosecond(None), - ScalarValue::TimestampNanosecond(None), + ScalarValue::TimestampMicrosecond(None, None), + ScalarValue::TimestampNanosecond(None, None), //ScalarValue::List(None, DataType::Boolean) ]; From 0bd6c26a029c3f3117ebe5bb3a5b3d62ae95a41c Mon Sep 17 00:00:00 2001 From: Max Burke Date: Thu, 16 Dec 2021 12:14:08 -0800 Subject: [PATCH 6/9] ScalarValue is only 64b on aarch64; it is still 48 on amd64 --- datafusion/src/scalar.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 9d6439c7b1093..ca0ffd190a1a7 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -2256,7 +2256,11 @@ mod tests { // Since ScalarValues are used in a non trivial number of places, // making it larger means significant more memory consumption // per distinct value. + #[cfg(target_arch = "aarch64")] assert_eq!(std::mem::size_of::(), 64); + + #[cfg(target_arch = "amd64")] + assert_eq!(std::mem::size_of::(), 48); } #[test] From e782fe79678b19818b9a3d0bea1a84bd54ab7477 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Sat, 18 Dec 2021 07:30:34 -0800 Subject: [PATCH 7/9] remove debugging code --- datafusion/src/logical_plan/expr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index 0aa7a5deef553..fc862cd9ae376 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -974,7 +974,7 @@ impl std::fmt::Display for Expr { ref left, ref right, ref op, - } => write!(f, "{:?} {} {:?}", left, op, right), + } => write!(f, "{} {} {}", left, op, right), Expr::AggregateFunction { /// Name of the function ref fun, From 118c48738461654b1503a0e02f392206221256f3 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Sat, 18 Dec 2021 07:54:48 -0800 Subject: [PATCH 8/9] add tests for timestamp coercion --- datafusion/tests/sql.rs | 329 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 329 insertions(+) diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index b72606f137c5a..89c37bb069e9b 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -6615,3 +6615,332 @@ async fn csv_query_with_decimal_by_sql() -> Result<()> { assert_batches_eq!(expected, &actual); Ok(()) } + +#[tokio::test] +async fn timestamp_coercion() -> Result<()> { + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------------------+-------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+---------------------+-------------------------+--------------------------+", + "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190 | true |", + "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190 | true |", + "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190 | true |", + "+---------------------+-------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+---------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190855 | true |", + "+---------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+---------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190855 | true |", + "+---------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+-------------------------+---------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+-------------------------+---------------------+--------------------------+", + "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29 | true |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29 | true |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29 | true |", + "+-------------------------+---------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+-------------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+-------------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29.190855 | true |", + "+-------------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+-------------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+-------------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29.190855 | true |", + "+-------------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+---------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+---------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29 | true |", + "+----------------------------+---------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+-------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+-------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190 | true |", + "+----------------------------+-------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190855 | true |", + "+----------------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+---------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+---------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29 | true |", + "+----------------------------+---------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+-------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+-------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190 | true |", + "+----------------------------+-------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190855 | true |", + "+----------------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + Ok(()) +} From 602c1b232bb95a12499da44e33cf77f44a352ef9 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Sat, 18 Dec 2021 08:24:23 -0800 Subject: [PATCH 9/9] minmax test on mixed ts types, allow creation of timestamp tables with a timezone, fix a missed case in the binary ops applied to timestamp types with timezones --- .../src/physical_plan/expressions/binary.rs | 6 +- datafusion/tests/sql.rs | 64 ++++++++++++++----- 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs index 721527b372eb5..bd593fd6ecb5d 100644 --- a/datafusion/src/physical_plan/expressions/binary.rs +++ b/datafusion/src/physical_plan/expressions/binary.rs @@ -377,13 +377,13 @@ macro_rules! binary_array_op { DataType::Timestamp(TimeUnit::Nanosecond, _) => { compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray) } - DataType::Timestamp(TimeUnit::Microsecond, None) => { + DataType::Timestamp(TimeUnit::Microsecond, _) => { compute_op!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray) } - DataType::Timestamp(TimeUnit::Millisecond, None) => { + DataType::Timestamp(TimeUnit::Millisecond, _) => { compute_op!($LEFT, $RIGHT, $OP, TimestampMillisecondArray) } - DataType::Timestamp(TimeUnit::Second, None) => { + DataType::Timestamp(TimeUnit::Second, _) => { compute_op!($LEFT, $RIGHT, $OP, TimestampSecondArray) } DataType::Date32 => { diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 89c37bb069e9b..7c3210dd7599e 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -4104,37 +4104,44 @@ async fn like() -> Result<()> { } fn make_timestamp_table() -> Result> +where + A: ArrowTimestampType, +{ + make_timestamp_tz_table::(None) +} + +fn make_timestamp_tz_table(tz: Option) -> Result> where A: ArrowTimestampType, { let schema = Arc::new(Schema::new(vec![ - Field::new("ts", DataType::Timestamp(A::get_time_unit(), None), false), + Field::new( + "ts", + DataType::Timestamp(A::get_time_unit(), tz.clone()), + false, + ), Field::new("value", DataType::Int32, true), ])); - let mut builder = PrimitiveBuilder::::new(3); - - let nanotimestamps = vec![ - 1599572549190855000i64, // 2020-09-08T13:42:29.190855+00:00 - 1599568949190855000, // 2020-09-08T12:42:29.190855+00:00 - 1599565349190855000, //2020-09-08T11:42:29.190855+00:00 - ]; // 2020-09-08T11:42:29.190855+00:00 let divisor = match A::get_time_unit() { TimeUnit::Nanosecond => 1, TimeUnit::Microsecond => 1000, TimeUnit::Millisecond => 1_000_000, TimeUnit::Second => 1_000_000_000, }; - for ts in nanotimestamps { - builder.append_value( - ::Native::from_i64(ts / divisor).unwrap(), - )?; - } + + let timestamps = vec![ + 1599572549190855000i64 / divisor, // 2020-09-08T13:42:29.190855+00:00 + 1599568949190855000 / divisor, // 2020-09-08T12:42:29.190855+00:00 + 1599565349190855000 / divisor, //2020-09-08T11:42:29.190855+00:00 + ]; // 2020-09-08T11:42:29.190855+00:00 + + let array = PrimitiveArray::::from_vec(timestamps, tz); let data = RecordBatch::try_new( schema.clone(), vec![ - Arc::new(builder.finish()), + Arc::new(array), Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), ], )?; @@ -6616,12 +6623,37 @@ async fn csv_query_with_decimal_by_sql() -> Result<()> { Ok(()) } +#[tokio::test] +async fn timestamp_minmax() -> Result<()> { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_tz_table::(None)?; + let table_b = + make_timestamp_tz_table::(Some("UTC".to_owned()))?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT MIN(table_a.ts), MAX(table_b.ts) FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+-------------------------+----------------------------+", + "| MIN(table_a.ts) | MAX(table_b.ts) |", + "+-------------------------+----------------------------+", + "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 |", + "+-------------------------+----------------------------+", + ]; + assert_batches_eq!(expected, &actual); + + Ok(()) +} + #[tokio::test] async fn timestamp_coercion() -> Result<()> { { let mut ctx = ExecutionContext::new(); - let table_a = make_timestamp_table::()?; - let table_b = make_timestamp_table::()?; + let table_a = + make_timestamp_tz_table::(Some("UTC".to_owned()))?; + let table_b = + make_timestamp_tz_table::(Some("UTC".to_owned()))?; ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?;