diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 06ffd8ba5b3c6..68f969702b239 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -437,10 +437,12 @@ fn timestamp_scale() -> i64 { } // Scale to nanoseconds and report overflow as a normal error. -fn checked_scale_to_nanos(x: i64, scale: i64) -> Result { - match x.checked_mul(scale) { +fn checked_scale_to_nanos(value: i64, scale: i64) -> Result { + match value.checked_mul(scale) { Some(scaled) => Ok(scaled), - None => exec_err!("date_bin timestamp value {x} * scale {scale} overflows i64"), + None => { + exec_err!("date_bin timestamp value {value} * scale {scale} overflows i64") + } } } @@ -570,7 +572,7 @@ fn date_bin_impl( ) -> Option { let scale = timestamp_scale::(); value - .and_then(|val| val.checked_mul(scale)) + .and_then(|value| checked_scale_to_nanos(value, scale).ok()) .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| binned / scale) } @@ -613,7 +615,9 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } let result = v - .and_then(|x| (x as i64).checked_mul(NANOS_PER_MILLI)) + .and_then(|value| { + checked_scale_to_nanos(value as i64, NANOS_PER_MILLI).ok() + }) .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32); ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result)) @@ -623,7 +627,9 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } let result = v - .and_then(|x| (x as i64).checked_mul(NANOS_PER_SEC)) + .and_then(|value| { + checked_scale_to_nanos(value as i64, NANOS_PER_SEC).ok() + }) .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32); ColumnarValue::Scalar(ScalarValue::Time32Second(result)) @@ -644,7 +650,7 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); } let result = v - .and_then(|x| x.checked_mul(NANOS_PER_MICRO)) + .and_then(|value| checked_scale_to_nanos(value, NANOS_PER_MICRO).ok()) .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO); ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result)) @@ -664,8 +670,9 @@ fn date_bin_impl( let scale = timestamp_scale::(); // Per-row errors become NULL, matching scalar behavior. - let result: PrimitiveArray = array.unary_opt(|val| { - val.checked_mul(scale) + let result: PrimitiveArray = array.unary_opt(|value| { + checked_scale_to_nanos(value, scale) + .ok() .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| binned / scale) }); @@ -703,9 +710,9 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.unary_opt(|x| { - (x as i64) - .checked_mul(NANOS_PER_MILLI) + array.unary_opt(|value| { + checked_scale_to_nanos(value as i64, NANOS_PER_MILLI) + .ok() .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| { ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) @@ -721,14 +728,15 @@ fn date_bin_impl( ); } let array = array.as_primitive::(); - let result: PrimitiveArray = array.unary_opt(|x| { - (x as i64) - .checked_mul(NANOS_PER_SEC) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| { - ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32 - }) - }); + let result: PrimitiveArray = + array.unary_opt(|value| { + checked_scale_to_nanos(value as i64, NANOS_PER_SEC) + .ok() + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| { + ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32 + }) + }); ColumnarValue::Array(Arc::new(result)) } Time64(Microsecond) => { @@ -739,8 +747,9 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.unary_opt(|x| { - x.checked_mul(NANOS_PER_MICRO) + array.unary_opt(|value| { + checked_scale_to_nanos(value, NANOS_PER_MICRO) + .ok() .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| { (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO @@ -756,11 +765,11 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.unary_opt(|x| { + array.try_unary(|x| { stride_fn(stride, x, origin) .map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY)) - .ok() - }); + .map_err(|e| ArrowError::ComputeError(e.to_string())) + })?; ColumnarValue::Array(Arc::new(result)) } _ => {