diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 38b491e42bcbd..06ffd8ba5b3c6 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -34,9 +34,7 @@ use arrow::datatypes::{ use arrow::error::ArrowError; use arrow::temporal_conversions::NANOSECONDS_IN_DAY; use datafusion_common::cast::as_primitive_array; -use datafusion_common::{ - DataFusionError, Result, ScalarValue, exec_err, not_impl_err, plan_err, -}; +use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err, plan_err}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ @@ -420,14 +418,44 @@ fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> Res } fn to_utc_date_time(nanos: i64) -> Result> { - let secs = nanos / NANOS_PER_SEC; - let nsec = (nanos % NANOS_PER_SEC) as u32; + // Keep negative sub-second values normalized as seconds + non-negative nanos. + let secs = nanos.div_euclid(NANOS_PER_SEC); + let nsec = nanos.rem_euclid(NANOS_PER_SEC) as u32; match DateTime::from_timestamp(secs, nsec) { Some(dt) => Ok(dt), None => exec_err!("Invalid timestamp value"), } } +fn timestamp_scale() -> i64 { + match T::UNIT { + Nanosecond => 1, + Microsecond => NANOS_PER_MICRO, + Millisecond => NANOS_PER_MILLI, + Second => NANOSECONDS, + } +} + +// Scale to nanoseconds and report overflow as a normal error. +fn checked_scale_to_nanos(x: i64, scale: i64) -> Result { + match x.checked_mul(scale) { + Some(scaled) => Ok(scaled), + None => exec_err!("date_bin timestamp value {x} * scale {scale} overflows i64"), + } +} + +fn validate_time_stride(stride: &Interval) -> Result<()> { + match stride { + Interval::Months(m) if *m > 0 => { + exec_err!("DATE_BIN stride for TIME input must be less than 1 day") + } + Interval::Nanoseconds(ns) if *ns >= NANOSECONDS_IN_DAY => { + exec_err!("DATE_BIN stride for TIME input must be less than 1 day") + } + _ => Ok(()), + } +} + // Supported intervals: // 1. IntervalDayTime: this means that the stride is in days, hours, minutes, seconds and milliseconds // We will assume month interval won't be converted into this type @@ -498,83 +526,20 @@ fn date_bin_impl( (*v, false) } ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(v))) => { - match stride { - Interval::Months(m) => { - if m > 0 { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - Interval::Nanoseconds(ns) => { - if ns >= NANOSECONDS_IN_DAY { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - } - - (*v as i64 * NANOS_PER_MILLI, true) + validate_time_stride(&stride)?; + // TIME origins can come from reinterpret casts, so scale defensively. + (checked_scale_to_nanos(*v as i64, NANOS_PER_MILLI)?, true) } ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => { - match stride { - Interval::Months(m) => { - if m > 0 { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - Interval::Nanoseconds(ns) => { - if ns >= NANOSECONDS_IN_DAY { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - } - - (*v as i64 * NANOS_PER_SEC, true) + validate_time_stride(&stride)?; + (checked_scale_to_nanos(*v as i64, NANOS_PER_SEC)?, true) } ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => { - match stride { - Interval::Months(m) => { - if m > 0 { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - Interval::Nanoseconds(ns) => { - if ns >= NANOSECONDS_IN_DAY { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - } - - (*v * NANOS_PER_MICRO, true) + validate_time_stride(&stride)?; + (checked_scale_to_nanos(*v, NANOS_PER_MICRO)?, true) } ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => { - match stride { - Interval::Months(m) => { - if m > 0 { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - Interval::Nanoseconds(ns) => { - if ns >= NANOSECONDS_IN_DAY { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - } - + validate_time_stride(&stride)?; (*v, true) } ColumnarValue::Scalar(v) => { @@ -597,91 +562,49 @@ fn date_bin_impl( return exec_err!("DATE_BIN stride must be non-zero"); } - fn timestamp_scale() -> i64 { - match T::UNIT { - Nanosecond => 1, - Microsecond => NANOS_PER_MICRO, - Millisecond => NANOS_PER_MILLI, - Second => NANOSECONDS, - } - } - - fn timestamp_scale_overflow_error(x: i64) -> DataFusionError { - DataFusionError::Execution(format!( - "DATE_BIN source timestamp {x} cannot be represented in nanoseconds" - )) + fn transform_scalar_with_stride( + value: Option, + origin: i64, + stride: i64, + stride_fn: BinFunction, + ) -> Option { + let scale = timestamp_scale::(); + value + .and_then(|val| val.checked_mul(scale)) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| binned / scale) } Ok(match array { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { - let scale = timestamp_scale::(); ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - match *v { - Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { - Ok(result) => Some(result / scale), - Err(_) => None, - } - } - None => None, - }, + transform_scalar_with_stride::( + *v, origin, stride, stride_fn, + ), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => { - let scale = timestamp_scale::(); ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - match *v { - Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { - Ok(result) => Some(result / scale), - Err(_) => None, - } - } - None => None, - }, + transform_scalar_with_stride::( + *v, origin, stride, stride_fn, + ), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => { - let scale = timestamp_scale::(); ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( - match *v { - Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { - Ok(result) => Some(result / scale), - Err(_) => None, - } - } - None => None, - }, + transform_scalar_with_stride::( + *v, origin, stride, stride_fn, + ), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => { - let scale = timestamp_scale::(); ColumnarValue::Scalar(ScalarValue::TimestampSecond( - match *v { - Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { - Ok(result) => Some(result / scale), - Err(_) => None, - } - } - None => None, - }, + transform_scalar_with_stride::( + *v, origin, stride, stride_fn, + ), tz_opt.clone(), )) } @@ -689,39 +612,30 @@ fn date_bin_impl( if !is_time { return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } - let result = v.and_then(|x| { - match stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin) { - Ok(binned_nanos) => { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - Some((nanos / NANOS_PER_MILLI) as i32) - } - Err(_) => None, - } - }); + let result = v + .and_then(|x| (x as i64).checked_mul(NANOS_PER_MILLI)) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32); ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result)) } ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => { if !is_time { return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } - let result = v.and_then(|x| { - match stride_fn(stride, x as i64 * NANOS_PER_SEC, origin) { - Ok(binned_nanos) => { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - Some((nanos / NANOS_PER_SEC) as i32) - } - Err(_) => None, - } - }); + let result = v + .and_then(|x| (x as i64).checked_mul(NANOS_PER_SEC)) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32); ColumnarValue::Scalar(ScalarValue::Time32Second(result)) } ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => { if !is_time { return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); } - let result = v.and_then(|x| match stride_fn(stride, x, origin) { - Ok(binned_nanos) => Some(binned_nanos % (NANOSECONDS_IN_DAY)), - Err(_) => None, + let result = v.and_then(|x| { + stride_fn(stride, x, origin) + .map(|binned| binned % NANOSECONDS_IN_DAY) + .ok() }); ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(result)) } @@ -729,14 +643,10 @@ fn date_bin_impl( if !is_time { return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); } - let result = - v.and_then(|x| match stride_fn(stride, x * NANOS_PER_MICRO, origin) { - Ok(binned_nanos) => { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - Some(nanos / NANOS_PER_MICRO) - } - Err(_) => None, - }); + let result = v + .and_then(|x| x.checked_mul(NANOS_PER_MICRO)) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO); ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result)) } ColumnarValue::Array(array) => { @@ -753,22 +663,12 @@ fn date_bin_impl( let array = as_primitive_array::(array)?; let scale = timestamp_scale::(); - let values = array - .iter() - .map(|val| match val { - Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - Ok(stride_fn(stride, scaled, origin) - .ok() - .map(|binned| binned / scale)) - } - None => Ok(None), - }) - .collect::>>()?; - - let result = PrimitiveArray::::from_iter(values); + // Per-row errors become NULL, matching scalar behavior. + let result: PrimitiveArray = array.unary_opt(|val| { + val.checked_mul(scale) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| binned / scale) + }); let array = result.with_timezone_opt(tz_opt.clone()); Ok(ColumnarValue::Array(Arc::new(array))) @@ -803,14 +703,15 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.try_unary(|x| { - stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin) - .map(|binned_nanos| { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - (nanos / NANOS_PER_MILLI) as i32 + array.unary_opt(|x| { + (x as i64) + .checked_mul(NANOS_PER_MILLI) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| { + ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) + as i32 }) - .map_err(|e| ArrowError::ComputeError(e.to_string())) - })?; + }); ColumnarValue::Array(Arc::new(result)) } Time32(Second) => { @@ -820,15 +721,14 @@ fn date_bin_impl( ); } let array = array.as_primitive::(); - let result: PrimitiveArray = - array.try_unary(|x| { - stride_fn(stride, x as i64 * NANOS_PER_SEC, origin) - .map(|binned_nanos| { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - (nanos / NANOS_PER_SEC) as i32 - }) - .map_err(|e| ArrowError::ComputeError(e.to_string())) - })?; + let result: PrimitiveArray = array.unary_opt(|x| { + (x as i64) + .checked_mul(NANOS_PER_SEC) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| { + ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32 + }) + }); ColumnarValue::Array(Arc::new(result)) } Time64(Microsecond) => { @@ -839,14 +739,13 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.try_unary(|x| { - stride_fn(stride, x * NANOS_PER_MICRO, origin) - .map(|binned_nanos| { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - nanos / NANOS_PER_MICRO + array.unary_opt(|x| { + x.checked_mul(NANOS_PER_MICRO) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| { + (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO }) - .map_err(|e| ArrowError::ComputeError(e.to_string())) - })?; + }); ColumnarValue::Array(Arc::new(result)) } Time64(Nanosecond) => { @@ -857,11 +756,11 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.try_unary(|x| { + array.unary_opt(|x| { stride_fn(stride, x, origin) .map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY)) - .map_err(|e| ArrowError::ComputeError(e.to_string())) - })?; + .ok() + }); ColumnarValue::Array(Arc::new(result)) } _ => { @@ -917,6 +816,31 @@ mod tests { DateBinFunc::new().invoke_with_args(args) } + fn assert_null_scalar(value: ColumnarValue, expected_type: DataType) { + let ColumnarValue::Scalar(value) = value else { + panic!("expected scalar, got {value:?}"); + }; + assert_eq!(value.data_type(), expected_type); + assert!(value.is_null(), "expected NULL, got {value:?}"); + } + + fn assert_array_null_then_valid(value: ColumnarValue, expected_type: DataType) { + let ColumnarValue::Array(array) = value else { + panic!("expected array, got {value:?}"); + }; + assert_eq!(array.data_type(), &expected_type); + assert!(array.is_null(0), "expected NULL at row 0"); + assert!(array.is_valid(1), "expected valid value at row 1"); + } + + fn assert_overflow_error(result: Result) { + let err = result.expect_err("expected overflow error"); + assert!( + err.strip_backtrace().contains("overflows i64"), + "unexpected error: {err}" + ); + } + #[test] fn test_date_bin() { let return_field = &Arc::new(Field::new( @@ -1433,6 +1357,97 @@ mod tests { } } + #[test] + fn test_date_bin_scale_overflow_returns_null() { + // Scaling non-nanosecond timestamps to nanoseconds can overflow. + use arrow::array::{ + ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampSecondArray, + }; + + let scalar_cases = [ + ScalarValue::TimestampSecond(Some(i64::MAX), None), + ScalarValue::TimestampMillisecond(Some(i64::MAX), None), + ScalarValue::TimestampMicrosecond(Some(i64::MAX), None), + ]; + for source in scalar_cases { + let expected_type = source.data_type(); + let return_field = Arc::new(Field::new("f", expected_type.clone(), true)); + let args = vec![ + ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), + ColumnarValue::Scalar(source), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(0), None)), + ]; + let result = invoke_date_bin_with_args(args, 1, &return_field) + .unwrap_or_else(|e| panic!("expected Ok for {expected_type}, got {e:?}")); + assert_null_scalar(result, expected_type); + } + + let array_cases: Vec = vec![ + Arc::new(TimestampSecondArray::from(vec![Some(i64::MAX), Some(0)])), + Arc::new(TimestampMillisecondArray::from(vec![ + Some(i64::MAX), + Some(0), + ])), + Arc::new(TimestampMicrosecondArray::from(vec![ + Some(i64::MAX), + Some(0), + ])), + ]; + for array in array_cases { + let dt = array.data_type().clone(); + let return_field = Arc::new(Field::new("f", dt.clone(), true)); + let args = vec![ + ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), + ColumnarValue::Array(array), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(0), None)), + ]; + let result = invoke_date_bin_with_args(args, 2, &return_field) + .unwrap_or_else(|e| panic!("expected Ok for {dt:?}, got {e:?}")); + assert_array_null_then_valid(result, dt); + } + } + + #[test] + fn test_date_bin_time64_micro_overflow_handling() { + // Time64(Microsecond) can hold out-of-range values after reinterpret casts. + use arrow::array::Time64MicrosecondArray; + + let data_type = DataType::Time64(TimeUnit::Microsecond); + let return_field = &Arc::new(Field::new("f", data_type.clone(), true)); + let stride = || ColumnarValue::Scalar(ScalarValue::new_interval_dt(0, 1000)); + let origin = || ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))); + + // Out-of-range source values are per-row data, so they become NULL. + let args = vec![ + stride(), + ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(i64::MAX))), + origin(), + ]; + let result = invoke_date_bin_with_args(args, 1, return_field).unwrap(); + assert_null_scalar(result, data_type.clone()); + + let array = Arc::new(Time64MicrosecondArray::from(vec![Some(i64::MAX), Some(0)])); + let args = vec![stride(), ColumnarValue::Array(array), origin()]; + let result = invoke_date_bin_with_args(args, 2, return_field).unwrap(); + assert_array_null_then_valid(result, data_type); + + let bad_origin = + || ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(i64::MAX))); + + // Out-of-range origins are shared inputs, so they return an error. + let args = vec![ + stride(), + ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))), + bad_origin(), + ]; + assert_overflow_error(invoke_date_bin_with_args(args, 1, return_field)); + + let array = Arc::new(Time64MicrosecondArray::from(vec![Some(0), Some(1)])); + let args = vec![stride(), ColumnarValue::Array(array), bad_origin()]; + assert_overflow_error(invoke_date_bin_with_args(args, 2, return_field)); + } + #[test] fn test_date_bin_compute_distance_rem_overflow() { // Regression for #22215: `time_diff % stride` panics with "attempt to diff --git a/datafusion/sqllogictest/test_files/date_bin_errors.slt b/datafusion/sqllogictest/test_files/date_bin_errors.slt index 20408c84ef79a..53cba506defd6 100644 --- a/datafusion/sqllogictest/test_files/date_bin_errors.slt +++ b/datafusion/sqllogictest/test_files/date_bin_errors.slt @@ -23,10 +23,24 @@ select date_bin(interval '1637426858 months', to_timestamp_millis(1040292460), t ---- NULL -# Negative timestamp with month interval - should return NULL instead of panicking +# Issue #22528: negative sub-second source with month interval. query P select date_bin(interval '1 month', to_timestamp_millis(-1040292460), timestamp '1984-01-07 00:00:00'); ---- +1969-12-07T00:00:00 + +# Array path should match the scalar path above. +query P +select date_bin(interval '1 month', c, timestamp '1984-01-07 00:00:00') +from values (to_timestamp_millis(-1040292460)) t(c); +---- +1969-12-07T00:00:00 + +# Array path should return NULL for per-row overflow. +query P +select date_bin(interval '1637426858 months', c, timestamp '1984-01-07 00:00:00') +from values (to_timestamp_millis(1040292460)) t(c); +---- NULL # Large stride causing overflow - should return NULL @@ -79,16 +93,18 @@ select date_bin( ---- NULL -# Source timestamp scaling to nanoseconds overflows: should return an error, not panic -query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds +# Source timestamp scaling to nanoseconds overflows: should return NULL, not panic +query P select date_bin( interval '1 nanosecond', arrow_cast(9223372036854775807, 'Timestamp(Second, None)'), timestamp '1970-01-01 00:00:00' ); +---- +NULL -# Source timestamp scaling to nanoseconds overflows in array path: should return an error, not panic -query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds +# Source timestamp scaling to nanoseconds overflows in array path: should return NULL, not panic +query P select date_bin( interval '1 nanosecond', ts, @@ -97,3 +113,5 @@ select date_bin( from ( values (arrow_cast(9223372036854775807, 'Timestamp(Second, None)')) ) as t(ts); +---- +NULL