diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 06ffd8ba5b3c6..32a8aae7d101c 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -34,7 +34,9 @@ use arrow::datatypes::{ use arrow::error::ArrowError; use arrow::temporal_conversions::NANOSECONDS_IN_DAY; use datafusion_common::cast::as_primitive_array; -use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err, plan_err}; +use datafusion_common::{ + Result, ScalarValue, exec_datafusion_err, exec_err, not_impl_err, plan_err, +}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ @@ -365,6 +367,22 @@ fn compute_distance(time_diff: i64, stride: i64) -> Result { } } +// Shift `origin_date` by `month_delta` months, mapping an out-of-range result to +// the same error the binning paths reported when this was written inline. +fn shift_months(origin_date: DateTime, month_delta: i64) -> Result> { + if month_delta < 0 { + origin_date + .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32)) + .ok_or_else(|| { + exec_datafusion_err!("DATE_BIN month subtraction out of range") + }) + } else { + origin_date + .checked_add_months(Months::new(month_delta as u32)) + .ok_or_else(|| exec_datafusion_err!("DATE_BIN month addition out of range")) + } +} + // return time in nanoseconds that the source timestamp falls into based on the stride and origin fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> Result { // convert source and origin to DateTime @@ -379,37 +397,13 @@ fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> Res // distance from origin to bin let month_delta = compute_distance(month_diff as i64, stride_months)?; - let mut bin_time = if month_delta < 0 { - match origin_date - .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32)) - { - Some(dt) => dt, - None => return exec_err!("DATE_BIN month subtraction out of range"), - } - } else { - match origin_date.checked_add_months(Months::new(month_delta as u32)) { - Some(dt) => dt, - None => return exec_err!("DATE_BIN month addition out of range"), - } - }; + let mut bin_time = shift_months(origin_date, month_delta)?; // If origin is not midnight of first date of the month, the bin_time may be larger than the source // In this case, we need to move back to previous bin if bin_time > source_date { let month_delta = month_delta - stride_months; - bin_time = if month_delta < 0 { - match origin_date - .checked_sub_months(Months::new(month_delta.unsigned_abs() as u32)) - { - Some(dt) => dt, - None => return exec_err!("DATE_BIN month subtraction out of range"), - } - } else { - match origin_date.checked_add_months(Months::new(month_delta as u32)) { - Some(dt) => dt, - None => return exec_err!("DATE_BIN month addition out of range"), - } - }; + bin_time = shift_months(origin_date, month_delta)?; } match bin_time.timestamp_nanos_opt() { Some(nanos) => Ok(nanos), @@ -444,6 +438,36 @@ fn checked_scale_to_nanos(x: i64, scale: i64) -> Result { } } +// Per-row timestamp binning shared by scalar and array paths. +// Source-value failures become None, which callers map to NULL. +fn date_bin_timestamp_value( + value: i64, + origin: i64, + stride: i64, + stride_fn: BinFunction, +) -> Option { + let scale = timestamp_scale::(); + checked_scale_to_nanos(value, scale) + .and_then(|scaled| stride_fn(stride, scaled, origin)) + .map(|binned| binned / scale) + .ok() +} + +// Per-row TIME binning shared by scalar and array paths. +// The modulo keeps the result within a single day before unscaling. +fn date_bin_time_value( + value: i64, + scale: i64, + origin: i64, + stride: i64, + stride_fn: BinFunction, +) -> Option { + checked_scale_to_nanos(value, scale) + .and_then(|scaled| stride_fn(stride, scaled, origin)) + .map(|binned| (binned % NANOSECONDS_IN_DAY) / scale) + .ok() +} + fn validate_time_stride(stride: &Interval) -> Result<()> { match stride { Interval::Months(m) if *m > 0 => { @@ -562,91 +586,85 @@ fn date_bin_impl( return exec_err!("DATE_BIN stride must be non-zero"); } - fn transform_scalar_with_stride( - value: Option, - origin: i64, - stride: i64, - stride_fn: BinFunction, - ) -> Option { - let scale = timestamp_scale::(); - value - .and_then(|val| val.checked_mul(scale)) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| binned / scale) + // A TIME source requires a TIME origin. This shared-input check is ordered + // after stride/origin parsing and the zero-stride check so error ordering is + // unchanged, and replaces the per-arm guards in the TIME branches below. + if !is_time { + match array.data_type() { + Time32(_) => { + return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); + } + Time64(_) => { + return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); + } + _ => {} + } } Ok(match array { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - transform_scalar_with_stride::( - *v, origin, stride, stride_fn, - ), + v.and_then(|x| { + date_bin_timestamp_value::( + x, origin, stride, stride_fn, + ) + }), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => { ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - transform_scalar_with_stride::( - *v, origin, stride, stride_fn, - ), + v.and_then(|x| { + date_bin_timestamp_value::( + x, origin, stride, stride_fn, + ) + }), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => { ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( - transform_scalar_with_stride::( - *v, origin, stride, stride_fn, - ), + v.and_then(|x| { + date_bin_timestamp_value::( + x, origin, stride, stride_fn, + ) + }), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => { ColumnarValue::Scalar(ScalarValue::TimestampSecond( - transform_scalar_with_stride::( - *v, origin, stride, stride_fn, - ), + v.and_then(|x| { + date_bin_timestamp_value::( + x, origin, stride, stride_fn, + ) + }), tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::Time32Millisecond(v)) => { - if !is_time { - return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); - } - let result = v - .and_then(|x| (x as i64).checked_mul(NANOS_PER_MILLI)) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32); + let result = v.and_then(|x| { + date_bin_time_value(x as i64, NANOS_PER_MILLI, origin, stride, stride_fn) + .map(|binned| binned as i32) + }); ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result)) } ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => { - if !is_time { - return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); - } - let result = v - .and_then(|x| (x as i64).checked_mul(NANOS_PER_SEC)) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32); + let result = v.and_then(|x| { + date_bin_time_value(x as i64, NANOS_PER_SEC, origin, stride, stride_fn) + .map(|binned| binned as i32) + }); ColumnarValue::Scalar(ScalarValue::Time32Second(result)) } ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => { - if !is_time { - return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); - } - let result = v.and_then(|x| { - stride_fn(stride, x, origin) - .map(|binned| binned % NANOSECONDS_IN_DAY) - .ok() - }); + let result = + v.and_then(|x| date_bin_time_value(x, 1, origin, stride, stride_fn)); ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(result)) } ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v)) => { - if !is_time { - return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); - } - let result = v - .and_then(|x| x.checked_mul(NANOS_PER_MICRO)) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO); + let result = v.and_then(|x| { + date_bin_time_value(x, NANOS_PER_MICRO, origin, stride, stride_fn) + }); ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result)) } ColumnarValue::Array(array) => { @@ -661,13 +679,10 @@ fn date_bin_impl( T: ArrowTimestampType, { let array = as_primitive_array::(array)?; - let scale = timestamp_scale::(); // Per-row errors become NULL, matching scalar behavior. let result: PrimitiveArray = array.unary_opt(|val| { - val.checked_mul(scale) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| binned / scale) + date_bin_timestamp_value::(val, origin, stride, stride_fn) }); let array = result.with_timezone_opt(tz_opt.clone()); @@ -696,70 +711,53 @@ fn date_bin_impl( )? } Time32(Millisecond) => { - if !is_time { - return exec_err!( - "DATE_BIN with Time32 source requires Time32 origin" - ); - } let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|x| { - (x as i64) - .checked_mul(NANOS_PER_MILLI) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| { - ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) - as i32 - }) + date_bin_time_value( + x as i64, + NANOS_PER_MILLI, + origin, + stride, + stride_fn, + ) + .map(|binned| binned as i32) }); ColumnarValue::Array(Arc::new(result)) } Time32(Second) => { - if !is_time { - return exec_err!( - "DATE_BIN with Time32 source requires Time32 origin" - ); - } let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|x| { - (x as i64) - .checked_mul(NANOS_PER_SEC) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| { - ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32 - }) + date_bin_time_value( + x as i64, + NANOS_PER_SEC, + origin, + stride, + stride_fn, + ) + .map(|binned| binned as i32) }); ColumnarValue::Array(Arc::new(result)) } Time64(Microsecond) => { - if !is_time { - return exec_err!( - "DATE_BIN with Time64 source requires Time64 origin" - ); - } let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|x| { - x.checked_mul(NANOS_PER_MICRO) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| { - (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO - }) + date_bin_time_value( + x, + NANOS_PER_MICRO, + origin, + stride, + stride_fn, + ) }); ColumnarValue::Array(Arc::new(result)) } Time64(Nanosecond) => { - if !is_time { - return exec_err!( - "DATE_BIN with Time64 source requires Time64 origin" - ); - } let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|x| { - stride_fn(stride, x, origin) - .map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY)) - .ok() + date_bin_time_value(x, 1, origin, stride, stride_fn) }); ColumnarValue::Array(Arc::new(result)) } @@ -1448,6 +1446,144 @@ mod tests { assert_overflow_error(invoke_date_bin_with_args(args, 2, return_field)); } + // Compare scalar execution with a one-row array for the same input. + fn assert_scalar_array_parity( + stride: ScalarValue, + source: ScalarValue, + origin: ScalarValue, + ) { + let return_field = Arc::new(Field::new("f", source.data_type().clone(), true)); + + let scalar_args = vec![ + ColumnarValue::Scalar(stride.clone()), + ColumnarValue::Scalar(source.clone()), + ColumnarValue::Scalar(origin.clone()), + ]; + let scalar_result = invoke_date_bin_with_args(scalar_args, 1, &return_field) + .expect("scalar path should not error"); + let ColumnarValue::Scalar(scalar_value) = scalar_result else { + panic!("expected scalar result, got {scalar_result:?}"); + }; + + let source_array = source.to_array().expect("source value to array"); + let array_args = vec![ + ColumnarValue::Scalar(stride), + ColumnarValue::Array(source_array), + ColumnarValue::Scalar(origin), + ]; + let array_result = invoke_date_bin_with_args(array_args, 1, &return_field) + .expect("array path should not error"); + let ColumnarValue::Array(array) = array_result else { + panic!("expected array result, got {array_result:?}"); + }; + let array_value = + ScalarValue::try_from_array(&array, 0).expect("array row to scalar"); + + assert_eq!( + scalar_value, array_value, + "scalar and array results diverged for source {source:?}" + ); + } + + #[test] + fn test_date_bin_scalar_array_parity() { + // Negative sub-second timestamp with a month interval. This is the case + // that previously diverged (scalar value vs array execution error) + // before #22610; both paths must now agree on the same non-NULL value. + assert_scalar_array_parity( + ScalarValue::new_interval_mdn(1, 0, 0), + ScalarValue::TimestampNanosecond(Some(-1), None), + ScalarValue::TimestampNanosecond(Some(0), None), + ); + + // Source scaling overflow -> NULL in both paths. + assert_scalar_array_parity( + ScalarValue::new_interval_dt(1, 0), + ScalarValue::TimestampSecond(Some(i64::MAX), None), + ScalarValue::TimestampNanosecond(Some(0), None), + ); + + // Month interval out-of-range binning -> NULL in both paths. + assert_scalar_array_parity( + ScalarValue::new_interval_mdn(1637426858, 0, 0), + ScalarValue::TimestampMillisecond(Some(1040292460), None), + ScalarValue::TimestampNanosecond( + Some(string_to_timestamp_nanos("1984-01-07 00:00:00").unwrap()), + None, + ), + ); + } + + #[test] + fn test_date_bin_time_source_requires_time_origin() { + // A TIME source combined with a non-TIME (timestamp) origin is rejected + // with a unit-specific message. This is the shared-input guard that was + // hoisted out of the per-type match arms; cover scalar and array for + // both Time32 and Time64 so the error text stays put. + use arrow::array::{Time32MillisecondArray, Time64NanosecondArray}; + + let stride = || ColumnarValue::Scalar(ScalarValue::new_interval_dt(0, 1000)); + let ts_origin = + || ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(0), None)); + + let assert_msg = |args: Vec, dt: DataType, msg: &str| { + let return_field = Arc::new(Field::new("f", dt, true)); + assert_eq!( + invoke_date_bin_with_args(args, 1, &return_field) + .err() + .unwrap() + .strip_backtrace(), + msg + ); + }; + + let time32_msg = + "Execution error: DATE_BIN with Time32 source requires Time32 origin"; + assert_msg( + vec![ + stride(), + ColumnarValue::Scalar(ScalarValue::Time32Millisecond(Some(0))), + ts_origin(), + ], + DataType::Time32(TimeUnit::Millisecond), + time32_msg, + ); + assert_msg( + vec![ + stride(), + ColumnarValue::Array(Arc::new(Time32MillisecondArray::from(vec![Some( + 0, + )]))), + ts_origin(), + ], + DataType::Time32(TimeUnit::Millisecond), + time32_msg, + ); + + let time64_msg = + "Execution error: DATE_BIN with Time64 source requires Time64 origin"; + assert_msg( + vec![ + stride(), + ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(0))), + ts_origin(), + ], + DataType::Time64(TimeUnit::Nanosecond), + time64_msg, + ); + assert_msg( + vec![ + stride(), + ColumnarValue::Array(Arc::new(Time64NanosecondArray::from(vec![Some( + 0, + )]))), + ts_origin(), + ], + DataType::Time64(TimeUnit::Nanosecond), + time64_msg, + ); + } + #[test] fn test_date_bin_compute_distance_rem_overflow() { // Regression for #22215: `time_diff % stride` panics with "attempt to