From 46ee0bf1c0033bcf9d2786a83d8ec11f3a1774e4 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Fri, 29 May 2026 12:11:34 +0530 Subject: [PATCH 1/5] fix: handle date_bin negative subsecond and overflow cases --- datafusion/functions/src/datetime/date_bin.rs | 467 ++++++++++-------- .../test_files/date_bin_errors.slt | 28 +- 2 files changed, 281 insertions(+), 214 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 38b491e42bcbd..0d307f27d4bac 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -34,9 +34,7 @@ use arrow::datatypes::{ use arrow::error::ArrowError; use arrow::temporal_conversions::NANOSECONDS_IN_DAY; use datafusion_common::cast::as_primitive_array; -use datafusion_common::{ - DataFusionError, Result, ScalarValue, exec_err, not_impl_err, plan_err, -}; +use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err, plan_err}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ @@ -420,14 +418,46 @@ fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> Res } fn to_utc_date_time(nanos: i64) -> Result> { - let secs = nanos / NANOS_PER_SEC; - let nsec = (nanos % NANOS_PER_SEC) as u32; + // DateTime::from_timestamp requires a non-negative nanosecond part. + let secs = nanos.div_euclid(NANOS_PER_SEC); + let nsec = nanos.rem_euclid(NANOS_PER_SEC) as u32; match DateTime::from_timestamp(secs, nsec) { Some(dt) => Ok(dt), None => exec_err!("Invalid timestamp value"), } } +fn timestamp_scale(unit: TimeUnit) -> i64 { + match unit { + Nanosecond => 1, + Microsecond => NANOS_PER_MICRO, + Millisecond => NANOS_PER_MILLI, + Second => NANOSECONDS, + } +} + +// Scale to nanoseconds and report overflow as a normal error. +fn checked_scale_to_nanos(x: i64, scale: i64) -> Result { + x.checked_mul(scale).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "date_bin timestamp value {x} * scale {scale} overflows i64" + )) + .into() + }) +} + +fn validate_time_stride(stride: &Interval) -> Result<()> { + match stride { + Interval::Months(m) if *m > 0 => { + exec_err!("DATE_BIN stride for TIME input must be less than 1 day") + } + Interval::Nanoseconds(ns) if *ns >= NANOSECONDS_IN_DAY => { + exec_err!("DATE_BIN stride for TIME input must be less than 1 day") + } + _ => Ok(()), + } +} + // Supported intervals: // 1. IntervalDayTime: this means that the stride is in days, hours, minutes, seconds and milliseconds // We will assume month interval won't be converted into this type @@ -498,83 +528,20 @@ fn date_bin_impl( (*v, false) } ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(v))) => { - match stride { - Interval::Months(m) => { - if m > 0 { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - Interval::Nanoseconds(ns) => { - if ns >= NANOSECONDS_IN_DAY { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - } - - (*v as i64 * NANOS_PER_MILLI, true) + validate_time_stride(&stride)?; + // TIME origins can come from reinterpret casts, so scale defensively. + (checked_scale_to_nanos(*v as i64, NANOS_PER_MILLI)?, true) } ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => { - match stride { - Interval::Months(m) => { - if m > 0 { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - Interval::Nanoseconds(ns) => { - if ns >= NANOSECONDS_IN_DAY { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - } - - (*v as i64 * NANOS_PER_SEC, true) + validate_time_stride(&stride)?; + (checked_scale_to_nanos(*v as i64, NANOS_PER_SEC)?, true) } ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => { - match stride { - Interval::Months(m) => { - if m > 0 { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - Interval::Nanoseconds(ns) => { - if ns >= NANOSECONDS_IN_DAY { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - } - - (*v * NANOS_PER_MICRO, true) + validate_time_stride(&stride)?; + (checked_scale_to_nanos(*v, NANOS_PER_MICRO)?, true) } ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => { - match stride { - Interval::Months(m) => { - if m > 0 { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - Interval::Nanoseconds(ns) => { - if ns >= NANOSECONDS_IN_DAY { - return exec_err!( - "DATE_BIN stride for TIME input must be less than 1 day" - ); - } - } - } - + validate_time_stride(&stride)?; (*v, true) } ColumnarValue::Scalar(v) => { @@ -597,91 +564,47 @@ fn date_bin_impl( return exec_err!("DATE_BIN stride must be non-zero"); } - fn timestamp_scale() -> i64 { - match T::UNIT { - Nanosecond => 1, - Microsecond => NANOS_PER_MICRO, - Millisecond => NANOS_PER_MILLI, - Second => NANOSECONDS, + fn stride_map_fn( + origin: i64, + stride: i64, + stride_fn: BinFunction, + ) -> impl Fn(i64) -> Result { + let scale = timestamp_scale(T::UNIT); + move |x: i64| { + Ok(stride_fn(stride, checked_scale_to_nanos(x, scale)?, origin)? / scale) } } - fn timestamp_scale_overflow_error(x: i64) -> DataFusionError { - DataFusionError::Execution(format!( - "DATE_BIN source timestamp {x} cannot be represented in nanoseconds" - )) - } - Ok(match array { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { - let scale = timestamp_scale::(); + let apply_stride_fn = + stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - match *v { - Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { - Ok(result) => Some(result / scale), - Err(_) => None, - } - } - None => None, - }, + v.and_then(|val| apply_stride_fn(val).ok()), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => { - let scale = timestamp_scale::(); + let apply_stride_fn = + stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - match *v { - Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { - Ok(result) => Some(result / scale), - Err(_) => None, - } - } - None => None, - }, + v.and_then(|val| apply_stride_fn(val).ok()), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => { - let scale = timestamp_scale::(); + let apply_stride_fn = + stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( - match *v { - Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { - Ok(result) => Some(result / scale), - Err(_) => None, - } - } - None => None, - }, + v.and_then(|val| apply_stride_fn(val).ok()), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => { - let scale = timestamp_scale::(); + let apply_stride_fn = + stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampSecond( - match *v { - Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { - Ok(result) => Some(result / scale), - Err(_) => None, - } - } - None => None, - }, + v.and_then(|val| apply_stride_fn(val).ok()), tz_opt.clone(), )) } @@ -690,13 +613,12 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } let result = v.and_then(|x| { - match stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin) { - Ok(binned_nanos) => { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - Some((nanos / NANOS_PER_MILLI) as i32) - } - Err(_) => None, - } + checked_scale_to_nanos(x as i64, NANOS_PER_MILLI) + .and_then(|scaled| stride_fn(stride, scaled, origin)) + .map(|binned| { + ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32 + }) + .ok() }); ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result)) } @@ -705,13 +627,10 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } let result = v.and_then(|x| { - match stride_fn(stride, x as i64 * NANOS_PER_SEC, origin) { - Ok(binned_nanos) => { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - Some((nanos / NANOS_PER_SEC) as i32) - } - Err(_) => None, - } + checked_scale_to_nanos(x as i64, NANOS_PER_SEC) + .and_then(|scaled| stride_fn(stride, scaled, origin)) + .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32) + .ok() }); ColumnarValue::Scalar(ScalarValue::Time32Second(result)) } @@ -719,9 +638,10 @@ fn date_bin_impl( if !is_time { return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); } - let result = v.and_then(|x| match stride_fn(stride, x, origin) { - Ok(binned_nanos) => Some(binned_nanos % (NANOSECONDS_IN_DAY)), - Err(_) => None, + let result = v.and_then(|x| { + stride_fn(stride, x, origin) + .map(|binned| binned % NANOSECONDS_IN_DAY) + .ok() }); ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(result)) } @@ -729,14 +649,12 @@ fn date_bin_impl( if !is_time { return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); } - let result = - v.and_then(|x| match stride_fn(stride, x * NANOS_PER_MICRO, origin) { - Ok(binned_nanos) => { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - Some(nanos / NANOS_PER_MICRO) - } - Err(_) => None, - }); + let result = v.and_then(|x| { + checked_scale_to_nanos(x, NANOS_PER_MICRO) + .and_then(|scaled| stride_fn(stride, scaled, origin)) + .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO) + .ok() + }); ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result)) } ColumnarValue::Array(array) => { @@ -751,24 +669,15 @@ fn date_bin_impl( T: ArrowTimestampType, { let array = as_primitive_array::(array)?; - let scale = timestamp_scale::(); - - let values = array - .iter() - .map(|val| match val { - Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - Ok(stride_fn(stride, scaled, origin) - .ok() - .map(|binned| binned / scale)) - } - None => Ok(None), - }) - .collect::>>()?; - - let result = PrimitiveArray::::from_iter(values); + let scale = timestamp_scale(T::UNIT); + + // Per-row errors become NULL, matching scalar behavior. + let result: PrimitiveArray = array.unary_opt(|val| { + checked_scale_to_nanos(val, scale) + .and_then(|scaled| stride_fn(stride, scaled, origin)) + .map(|binned| binned / scale) + .ok() + }); let array = result.with_timezone_opt(tz_opt.clone()); Ok(ColumnarValue::Array(Arc::new(array))) @@ -803,14 +712,15 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.try_unary(|x| { - stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin) - .map(|binned_nanos| { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - (nanos / NANOS_PER_MILLI) as i32 + array.unary_opt(|x| { + checked_scale_to_nanos(x as i64, NANOS_PER_MILLI) + .and_then(|scaled| stride_fn(stride, scaled, origin)) + .map(|binned| { + ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) + as i32 }) - .map_err(|e| ArrowError::ComputeError(e.to_string())) - })?; + .ok() + }); ColumnarValue::Array(Arc::new(result)) } Time32(Second) => { @@ -820,15 +730,14 @@ fn date_bin_impl( ); } let array = array.as_primitive::(); - let result: PrimitiveArray = - array.try_unary(|x| { - stride_fn(stride, x as i64 * NANOS_PER_SEC, origin) - .map(|binned_nanos| { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - (nanos / NANOS_PER_SEC) as i32 - }) - .map_err(|e| ArrowError::ComputeError(e.to_string())) - })?; + let result: PrimitiveArray = array.unary_opt(|x| { + checked_scale_to_nanos(x as i64, NANOS_PER_SEC) + .and_then(|scaled| stride_fn(stride, scaled, origin)) + .map(|binned| { + ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32 + }) + .ok() + }); ColumnarValue::Array(Arc::new(result)) } Time64(Microsecond) => { @@ -839,14 +748,14 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.try_unary(|x| { - stride_fn(stride, x * NANOS_PER_MICRO, origin) - .map(|binned_nanos| { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - nanos / NANOS_PER_MICRO + array.unary_opt(|x| { + checked_scale_to_nanos(x, NANOS_PER_MICRO) + .and_then(|scaled| stride_fn(stride, scaled, origin)) + .map(|binned| { + (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO }) - .map_err(|e| ArrowError::ComputeError(e.to_string())) - })?; + .ok() + }); ColumnarValue::Array(Arc::new(result)) } Time64(Nanosecond) => { @@ -857,11 +766,11 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.try_unary(|x| { + array.unary_opt(|x| { stride_fn(stride, x, origin) .map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY)) - .map_err(|e| ArrowError::ComputeError(e.to_string())) - })?; + .ok() + }); ColumnarValue::Array(Arc::new(result)) } _ => { @@ -1433,6 +1342,146 @@ mod tests { } } + #[test] + fn test_date_bin_scale_overflow_returns_null() { + // Scaling non-nanosecond timestamps to nanoseconds can overflow. + use arrow::array::{ + ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampSecondArray, + }; + + let return_field = &Arc::new(Field::new( + "f", + DataType::Timestamp(TimeUnit::Second, None), + true, + )); + + let scalar_cases = [ + ScalarValue::TimestampSecond(Some(i64::MAX), None), + ScalarValue::TimestampMillisecond(Some(i64::MAX), None), + ScalarValue::TimestampMicrosecond(Some(i64::MAX), None), + ]; + for source in scalar_cases { + let args = vec![ + ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), + ColumnarValue::Scalar(source.clone()), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(0), None)), + ]; + let result = invoke_date_bin_with_args(args, 1, return_field) + .unwrap_or_else(|e| panic!("expected Ok for {source:?}, got {e:?}")); + match result { + ColumnarValue::Scalar( + ScalarValue::TimestampSecond(v, _) + | ScalarValue::TimestampMillisecond(v, _) + | ScalarValue::TimestampMicrosecond(v, _), + ) => { + assert!(v.is_none(), "expected NULL for {source:?}, got {v:?}"); + } + other => panic!("unexpected result for {source:?}: {other:?}"), + } + } + + let second_arr = + Arc::new(TimestampSecondArray::from(vec![Some(i64::MAX), Some(0)])); + let milli_arr = Arc::new(TimestampMillisecondArray::from(vec![ + Some(i64::MAX), + Some(0), + ])); + let micro_arr = Arc::new(TimestampMicrosecondArray::from(vec![ + Some(i64::MAX), + Some(0), + ])); + let array_cases: Vec = vec![second_arr, milli_arr, micro_arr]; + for array in array_cases { + let dt = array.data_type().clone(); + let args = vec![ + ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), + ColumnarValue::Array(array), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(0), None)), + ]; + let result = invoke_date_bin_with_args(args, 2, return_field) + .unwrap_or_else(|e| panic!("expected Ok for {dt:?}, got {e:?}")); + let ColumnarValue::Array(out) = result else { + panic!("expected array result for {dt:?}"); + }; + assert!(out.is_null(0), "expected NULL at row 0 for {dt:?}"); + assert!(out.is_valid(1), "expected valid value at row 1 for {dt:?}"); + } + } + + #[test] + fn test_date_bin_time64_micro_scale_overflow_returns_null() { + // Time64(Microsecond) can hold out-of-range values after reinterpret casts. + use arrow::array::Time64MicrosecondArray; + + let return_field = &Arc::new(Field::new( + "f", + DataType::Time64(TimeUnit::Microsecond), + true, + )); + let stride = || ColumnarValue::Scalar(ScalarValue::new_interval_dt(0, 1000)); + let origin = || ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))); + + let args = vec![ + stride(), + ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(i64::MAX))), + origin(), + ]; + let result = invoke_date_bin_with_args(args, 1, return_field).unwrap(); + let ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v)) = result else { + panic!("expected Time64Microsecond scalar, got {result:?}"); + }; + assert!( + v.is_none(), + "expected NULL for overflowing scalar, got {v:?}" + ); + + let array = Arc::new(Time64MicrosecondArray::from(vec![Some(i64::MAX), Some(0)])); + let args = vec![stride(), ColumnarValue::Array(array), origin()]; + let result = invoke_date_bin_with_args(args, 2, return_field).unwrap(); + let ColumnarValue::Array(out) = result else { + panic!("expected array result, got {result:?}"); + }; + assert!(out.is_null(0), "expected NULL at row 0"); + assert!(out.is_valid(1), "expected valid value at row 1"); + } + + #[test] + fn test_date_bin_time64_micro_origin_scale_overflow_errors() { + // Out-of-range TIME origins should error before row evaluation. + use arrow::array::Time64MicrosecondArray; + + let return_field = &Arc::new(Field::new( + "f", + DataType::Time64(TimeUnit::Microsecond), + true, + )); + let stride = || ColumnarValue::Scalar(ScalarValue::new_interval_dt(0, 1000)); + let bad_origin = + || ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(i64::MAX))); + + let args = vec![ + stride(), + ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))), + bad_origin(), + ]; + let err = invoke_date_bin_with_args(args, 1, return_field) + .expect_err("expected Err for overflowing origin (scalar source)"); + assert!( + err.strip_backtrace().contains("overflows i64"), + "unexpected error: {err}" + ); + + let array = Arc::new(Time64MicrosecondArray::from(vec![Some(0), Some(1)])); + let args = vec![stride(), ColumnarValue::Array(array), bad_origin()]; + let err = invoke_date_bin_with_args(args, 2, return_field) + .expect_err("expected Err for overflowing origin (array source)"); + assert!( + err.strip_backtrace().contains("overflows i64"), + "unexpected error: {err}" + ); + } + #[test] fn test_date_bin_compute_distance_rem_overflow() { // Regression for #22215: `time_diff % stride` panics with "attempt to diff --git a/datafusion/sqllogictest/test_files/date_bin_errors.slt b/datafusion/sqllogictest/test_files/date_bin_errors.slt index 20408c84ef79a..53cba506defd6 100644 --- a/datafusion/sqllogictest/test_files/date_bin_errors.slt +++ b/datafusion/sqllogictest/test_files/date_bin_errors.slt @@ -23,10 +23,24 @@ select date_bin(interval '1637426858 months', to_timestamp_millis(1040292460), t ---- NULL -# Negative timestamp with month interval - should return NULL instead of panicking +# Issue #22528: negative sub-second source with month interval. query P select date_bin(interval '1 month', to_timestamp_millis(-1040292460), timestamp '1984-01-07 00:00:00'); ---- +1969-12-07T00:00:00 + +# Array path should match the scalar path above. +query P +select date_bin(interval '1 month', c, timestamp '1984-01-07 00:00:00') +from values (to_timestamp_millis(-1040292460)) t(c); +---- +1969-12-07T00:00:00 + +# Array path should return NULL for per-row overflow. +query P +select date_bin(interval '1637426858 months', c, timestamp '1984-01-07 00:00:00') +from values (to_timestamp_millis(1040292460)) t(c); +---- NULL # Large stride causing overflow - should return NULL @@ -79,16 +93,18 @@ select date_bin( ---- NULL -# Source timestamp scaling to nanoseconds overflows: should return an error, not panic -query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds +# Source timestamp scaling to nanoseconds overflows: should return NULL, not panic +query P select date_bin( interval '1 nanosecond', arrow_cast(9223372036854775807, 'Timestamp(Second, None)'), timestamp '1970-01-01 00:00:00' ); +---- +NULL -# Source timestamp scaling to nanoseconds overflows in array path: should return an error, not panic -query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds +# Source timestamp scaling to nanoseconds overflows in array path: should return NULL, not panic +query P select date_bin( interval '1 nanosecond', ts, @@ -97,3 +113,5 @@ select date_bin( from ( values (arrow_cast(9223372036854775807, 'Timestamp(Second, None)')) ) as t(ts); +---- +NULL From a20b06185a0c8b7c3c0770d3eec444f68d225ae7 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Fri, 29 May 2026 12:35:04 +0530 Subject: [PATCH 2/5] simplify tests --- datafusion/functions/src/datetime/date_bin.rs | 120 ++++++++---------- 1 file changed, 50 insertions(+), 70 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 0d307f27d4bac..13b9a580e50cc 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -826,6 +826,31 @@ mod tests { DateBinFunc::new().invoke_with_args(args) } + fn assert_null_scalar(value: ColumnarValue, expected_type: DataType) { + let ColumnarValue::Scalar(value) = value else { + panic!("expected scalar, got {value:?}"); + }; + assert_eq!(value.data_type(), expected_type); + assert!(value.is_null(), "expected NULL, got {value:?}"); + } + + fn assert_array_null_then_valid(value: ColumnarValue, expected_type: DataType) { + let ColumnarValue::Array(array) = value else { + panic!("expected array, got {value:?}"); + }; + assert_eq!(array.data_type(), &expected_type); + assert!(array.is_null(0), "expected NULL at row 0"); + assert!(array.is_valid(1), "expected valid value at row 1"); + } + + fn assert_overflow_error(result: Result) { + let err = result.expect_err("expected overflow error"); + assert!( + err.strip_backtrace().contains("overflows i64"), + "unexpected error: {err}" + ); + } + #[test] fn test_date_bin() { let return_field = &Arc::new(Field::new( @@ -1362,36 +1387,28 @@ mod tests { ScalarValue::TimestampMicrosecond(Some(i64::MAX), None), ]; for source in scalar_cases { + let expected_type = source.data_type(); let args = vec![ ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), - ColumnarValue::Scalar(source.clone()), + ColumnarValue::Scalar(source), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(0), None)), ]; let result = invoke_date_bin_with_args(args, 1, return_field) - .unwrap_or_else(|e| panic!("expected Ok for {source:?}, got {e:?}")); - match result { - ColumnarValue::Scalar( - ScalarValue::TimestampSecond(v, _) - | ScalarValue::TimestampMillisecond(v, _) - | ScalarValue::TimestampMicrosecond(v, _), - ) => { - assert!(v.is_none(), "expected NULL for {source:?}, got {v:?}"); - } - other => panic!("unexpected result for {source:?}: {other:?}"), - } + .unwrap_or_else(|e| panic!("expected Ok for {expected_type}, got {e:?}")); + assert_null_scalar(result, expected_type); } - let second_arr = - Arc::new(TimestampSecondArray::from(vec![Some(i64::MAX), Some(0)])); - let milli_arr = Arc::new(TimestampMillisecondArray::from(vec![ - Some(i64::MAX), - Some(0), - ])); - let micro_arr = Arc::new(TimestampMicrosecondArray::from(vec![ - Some(i64::MAX), - Some(0), - ])); - let array_cases: Vec = vec![second_arr, milli_arr, micro_arr]; + let array_cases: Vec = vec![ + Arc::new(TimestampSecondArray::from(vec![Some(i64::MAX), Some(0)])), + Arc::new(TimestampMillisecondArray::from(vec![ + Some(i64::MAX), + Some(0), + ])), + Arc::new(TimestampMicrosecondArray::from(vec![ + Some(i64::MAX), + Some(0), + ])), + ]; for array in array_cases { let dt = array.data_type().clone(); let args = vec![ @@ -1401,85 +1418,48 @@ mod tests { ]; let result = invoke_date_bin_with_args(args, 2, return_field) .unwrap_or_else(|e| panic!("expected Ok for {dt:?}, got {e:?}")); - let ColumnarValue::Array(out) = result else { - panic!("expected array result for {dt:?}"); - }; - assert!(out.is_null(0), "expected NULL at row 0 for {dt:?}"); - assert!(out.is_valid(1), "expected valid value at row 1 for {dt:?}"); + assert_array_null_then_valid(result, dt); } } #[test] - fn test_date_bin_time64_micro_scale_overflow_returns_null() { + fn test_date_bin_time64_micro_overflow_handling() { // Time64(Microsecond) can hold out-of-range values after reinterpret casts. use arrow::array::Time64MicrosecondArray; - let return_field = &Arc::new(Field::new( - "f", - DataType::Time64(TimeUnit::Microsecond), - true, - )); + let data_type = DataType::Time64(TimeUnit::Microsecond); + let return_field = &Arc::new(Field::new("f", data_type.clone(), true)); let stride = || ColumnarValue::Scalar(ScalarValue::new_interval_dt(0, 1000)); let origin = || ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))); + // Out-of-range source values are per-row data, so they become NULL. let args = vec![ stride(), ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(i64::MAX))), origin(), ]; let result = invoke_date_bin_with_args(args, 1, return_field).unwrap(); - let ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v)) = result else { - panic!("expected Time64Microsecond scalar, got {result:?}"); - }; - assert!( - v.is_none(), - "expected NULL for overflowing scalar, got {v:?}" - ); + assert_null_scalar(result, data_type.clone()); let array = Arc::new(Time64MicrosecondArray::from(vec![Some(i64::MAX), Some(0)])); let args = vec![stride(), ColumnarValue::Array(array), origin()]; let result = invoke_date_bin_with_args(args, 2, return_field).unwrap(); - let ColumnarValue::Array(out) = result else { - panic!("expected array result, got {result:?}"); - }; - assert!(out.is_null(0), "expected NULL at row 0"); - assert!(out.is_valid(1), "expected valid value at row 1"); - } - - #[test] - fn test_date_bin_time64_micro_origin_scale_overflow_errors() { - // Out-of-range TIME origins should error before row evaluation. - use arrow::array::Time64MicrosecondArray; + assert_array_null_then_valid(result, data_type); - let return_field = &Arc::new(Field::new( - "f", - DataType::Time64(TimeUnit::Microsecond), - true, - )); - let stride = || ColumnarValue::Scalar(ScalarValue::new_interval_dt(0, 1000)); let bad_origin = || ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(i64::MAX))); + // Out-of-range origins are shared inputs, so they return an error. let args = vec![ stride(), ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))), bad_origin(), ]; - let err = invoke_date_bin_with_args(args, 1, return_field) - .expect_err("expected Err for overflowing origin (scalar source)"); - assert!( - err.strip_backtrace().contains("overflows i64"), - "unexpected error: {err}" - ); + assert_overflow_error(invoke_date_bin_with_args(args, 1, return_field)); let array = Arc::new(Time64MicrosecondArray::from(vec![Some(0), Some(1)])); let args = vec![stride(), ColumnarValue::Array(array), bad_origin()]; - let err = invoke_date_bin_with_args(args, 2, return_field) - .expect_err("expected Err for overflowing origin (array source)"); - assert!( - err.strip_backtrace().contains("overflows i64"), - "unexpected error: {err}" - ); + assert_overflow_error(invoke_date_bin_with_args(args, 2, return_field)); } #[test] From 72e327d1dfd6b3e1be664af6d94e4a1462b88b69 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Mon, 8 Jun 2026 17:00:22 +0530 Subject: [PATCH 3/5] chore: address date_bin review comments --- datafusion/functions/src/datetime/date_bin.rs | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 13b9a580e50cc..79a4e36a991d5 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -564,47 +564,51 @@ fn date_bin_impl( return exec_err!("DATE_BIN stride must be non-zero"); } - fn stride_map_fn( + fn transform_scalar_with_stride( + value: Option, origin: i64, stride: i64, stride_fn: BinFunction, - ) -> impl Fn(i64) -> Result { + ) -> Option { let scale = timestamp_scale(T::UNIT); - move |x: i64| { - Ok(stride_fn(stride, checked_scale_to_nanos(x, scale)?, origin)? / scale) - } + value.and_then(|val| { + checked_scale_to_nanos(val, scale) + .and_then(|scaled| stride_fn(stride, scaled, origin)) + .map(|binned| binned / scale) + .ok() + }) } Ok(match array { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { - let apply_stride_fn = - stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - v.and_then(|val| apply_stride_fn(val).ok()), + transform_scalar_with_stride::( + *v, origin, stride, stride_fn, + ), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => { - let apply_stride_fn = - stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - v.and_then(|val| apply_stride_fn(val).ok()), + transform_scalar_with_stride::( + *v, origin, stride, stride_fn, + ), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => { - let apply_stride_fn = - stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( - v.and_then(|val| apply_stride_fn(val).ok()), + transform_scalar_with_stride::( + *v, origin, stride, stride_fn, + ), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => { - let apply_stride_fn = - stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampSecond( - v.and_then(|val| apply_stride_fn(val).ok()), + transform_scalar_with_stride::( + *v, origin, stride, stride_fn, + ), tz_opt.clone(), )) } @@ -1375,12 +1379,6 @@ mod tests { TimestampSecondArray, }; - let return_field = &Arc::new(Field::new( - "f", - DataType::Timestamp(TimeUnit::Second, None), - true, - )); - let scalar_cases = [ ScalarValue::TimestampSecond(Some(i64::MAX), None), ScalarValue::TimestampMillisecond(Some(i64::MAX), None), @@ -1388,12 +1386,13 @@ mod tests { ]; for source in scalar_cases { let expected_type = source.data_type(); + let return_field = Arc::new(Field::new("f", expected_type.clone(), true)); let args = vec![ ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), ColumnarValue::Scalar(source), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(0), None)), ]; - let result = invoke_date_bin_with_args(args, 1, return_field) + let result = invoke_date_bin_with_args(args, 1, &return_field) .unwrap_or_else(|e| panic!("expected Ok for {expected_type}, got {e:?}")); assert_null_scalar(result, expected_type); } @@ -1411,12 +1410,13 @@ mod tests { ]; for array in array_cases { let dt = array.data_type().clone(); + let return_field = Arc::new(Field::new("f", dt.clone(), true)); let args = vec![ ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), ColumnarValue::Array(array), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(0), None)), ]; - let result = invoke_date_bin_with_args(args, 2, return_field) + let result = invoke_date_bin_with_args(args, 2, &return_field) .unwrap_or_else(|e| panic!("expected Ok for {dt:?}, got {e:?}")); assert_array_null_then_valid(result, dt); } From e3472aeb63f64f933489f3bf5657a70148efc8e9 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Tue, 9 Jun 2026 13:22:25 +0530 Subject: [PATCH 4/5] chore: simplify date_bin source overflow handling --- datafusion/functions/src/datetime/date_bin.rs | 72 ++++++++----------- 1 file changed, 30 insertions(+), 42 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 79a4e36a991d5..66ff04fa9ec2a 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -427,8 +427,8 @@ fn to_utc_date_time(nanos: i64) -> Result> { } } -fn timestamp_scale(unit: TimeUnit) -> i64 { - match unit { +fn timestamp_scale() -> i64 { + match T::UNIT { Nanosecond => 1, Microsecond => NANOS_PER_MICRO, Millisecond => NANOS_PER_MILLI, @@ -570,13 +570,11 @@ fn date_bin_impl( stride: i64, stride_fn: BinFunction, ) -> Option { - let scale = timestamp_scale(T::UNIT); - value.and_then(|val| { - checked_scale_to_nanos(val, scale) - .and_then(|scaled| stride_fn(stride, scaled, origin)) - .map(|binned| binned / scale) - .ok() - }) + let scale = timestamp_scale::(); + value + .and_then(|val| val.checked_mul(scale)) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| binned / scale) } Ok(match array { @@ -616,26 +614,20 @@ fn date_bin_impl( if !is_time { return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } - let result = v.and_then(|x| { - checked_scale_to_nanos(x as i64, NANOS_PER_MILLI) - .and_then(|scaled| stride_fn(stride, scaled, origin)) - .map(|binned| { - ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32 - }) - .ok() - }); + let result = v + .and_then(|x| (x as i64).checked_mul(NANOS_PER_MILLI)) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32); ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result)) } ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => { if !is_time { return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } - let result = v.and_then(|x| { - checked_scale_to_nanos(x as i64, NANOS_PER_SEC) - .and_then(|scaled| stride_fn(stride, scaled, origin)) - .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32) - .ok() - }); + let result = v + .and_then(|x| (x as i64).checked_mul(NANOS_PER_SEC)) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32); ColumnarValue::Scalar(ScalarValue::Time32Second(result)) } ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => { @@ -653,12 +645,10 @@ fn date_bin_impl( if !is_time { return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); } - let result = v.and_then(|x| { - checked_scale_to_nanos(x, NANOS_PER_MICRO) - .and_then(|scaled| stride_fn(stride, scaled, origin)) - .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO) - .ok() - }); + let result = v + .and_then(|x| x.checked_mul(NANOS_PER_MICRO)) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO); ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result)) } ColumnarValue::Array(array) => { @@ -673,14 +663,13 @@ fn date_bin_impl( T: ArrowTimestampType, { let array = as_primitive_array::(array)?; - let scale = timestamp_scale(T::UNIT); + let scale = timestamp_scale::(); // Per-row errors become NULL, matching scalar behavior. let result: PrimitiveArray = array.unary_opt(|val| { - checked_scale_to_nanos(val, scale) - .and_then(|scaled| stride_fn(stride, scaled, origin)) + val.checked_mul(scale) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| binned / scale) - .ok() }); let array = result.with_timezone_opt(tz_opt.clone()); @@ -717,13 +706,13 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|x| { - checked_scale_to_nanos(x as i64, NANOS_PER_MILLI) - .and_then(|scaled| stride_fn(stride, scaled, origin)) + (x as i64) + .checked_mul(NANOS_PER_MILLI) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| { ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32 }) - .ok() }); ColumnarValue::Array(Arc::new(result)) } @@ -735,12 +724,12 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|x| { - checked_scale_to_nanos(x as i64, NANOS_PER_SEC) - .and_then(|scaled| stride_fn(stride, scaled, origin)) + (x as i64) + .checked_mul(NANOS_PER_SEC) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| { ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32 }) - .ok() }); ColumnarValue::Array(Arc::new(result)) } @@ -753,12 +742,11 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|x| { - checked_scale_to_nanos(x, NANOS_PER_MICRO) - .and_then(|scaled| stride_fn(stride, scaled, origin)) + x.checked_mul(NANOS_PER_MICRO) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| { (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO }) - .ok() }); ColumnarValue::Array(Arc::new(result)) } From 38c4c6aab049875e2ba8107b68cfdb46b4fa756a Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 10 Jun 2026 13:06:10 +0530 Subject: [PATCH 5/5] chore: address date_bin error feedback --- datafusion/functions/src/datetime/date_bin.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 66ff04fa9ec2a..06ffd8ba5b3c6 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -418,7 +418,7 @@ fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> Res } fn to_utc_date_time(nanos: i64) -> Result> { - // DateTime::from_timestamp requires a non-negative nanosecond part. + // Keep negative sub-second values normalized as seconds + non-negative nanos. let secs = nanos.div_euclid(NANOS_PER_SEC); let nsec = nanos.rem_euclid(NANOS_PER_SEC) as u32; match DateTime::from_timestamp(secs, nsec) { @@ -438,12 +438,10 @@ fn timestamp_scale() -> i64 { // Scale to nanoseconds and report overflow as a normal error. fn checked_scale_to_nanos(x: i64, scale: i64) -> Result { - x.checked_mul(scale).ok_or_else(|| { - ArrowError::InvalidArgumentError(format!( - "date_bin timestamp value {x} * scale {scale} overflows i64" - )) - .into() - }) + match x.checked_mul(scale) { + Some(scaled) => Ok(scaled), + None => exec_err!("date_bin timestamp value {x} * scale {scale} overflows i64"), + } } fn validate_time_stride(stride: &Interval) -> Result<()> {