Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 87 additions & 37 deletions datafusion/functions/src/datetime/date_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,13 +437,27 @@ fn timestamp_scale<T: ArrowTimestampType>() -> i64 {
}

// Scale to nanoseconds and report overflow as a normal error.
fn checked_scale_to_nanos(x: i64, scale: i64) -> Result<i64> {
match x.checked_mul(scale) {
fn checked_scale_to_nanos(value: i64, scale: i64) -> Result<i64> {
match value.checked_mul(scale) {
Some(scaled) => Ok(scaled),
None => exec_err!("date_bin timestamp value {x} * scale {scale} overflows i64"),
None => {
exec_err!("date_bin timestamp value {value} * scale {scale} overflows i64")
}
}
}

fn checked_scale_and_bin_to_nanos_or_null(
value: i64,
scale: i64,
stride: i64,
stride_fn: BinFunction,
origin: i64,
) -> Option<i64> {
checked_scale_to_nanos(value, scale)
.ok()
Comment on lines +456 to +457

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont really see a value add here considering we ignore the error from checked_scale_to_nanos since we convert it to an option, not to mention this new wrapper function takes 5 arguments 🤔

.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
}

fn validate_time_stride(stride: &Interval) -> Result<()> {
match stride {
Interval::Months(m) if *m > 0 => {
Expand Down Expand Up @@ -570,8 +584,11 @@ fn date_bin_impl(
) -> Option<i64> {
let scale = timestamp_scale::<T>();
value
.and_then(|val| val.checked_mul(scale))
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.and_then(|value| {
checked_scale_and_bin_to_nanos_or_null(
value, scale, stride, stride_fn, origin,
)
})
.map(|binned| binned / scale)
}

Expand Down Expand Up @@ -613,8 +630,15 @@ fn date_bin_impl(
return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
}
let result = v
.and_then(|x| (x as i64).checked_mul(NANOS_PER_MILLI))
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.and_then(|value| {
checked_scale_and_bin_to_nanos_or_null(
value as i64,
NANOS_PER_MILLI,
stride,
stride_fn,
origin,
)
})
.map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32);
ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result))
}
Expand All @@ -623,8 +647,15 @@ fn date_bin_impl(
return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
}
let result = v
.and_then(|x| (x as i64).checked_mul(NANOS_PER_SEC))
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.and_then(|value| {
checked_scale_and_bin_to_nanos_or_null(
value as i64,
NANOS_PER_SEC,
stride,
stride_fn,
origin,
)
})
.map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32);
ColumnarValue::Scalar(ScalarValue::Time32Second(result))
}
Expand All @@ -644,8 +675,15 @@ fn date_bin_impl(
return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
}
let result = v
.and_then(|x| x.checked_mul(NANOS_PER_MICRO))
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.and_then(|value| {
checked_scale_and_bin_to_nanos_or_null(
value,
NANOS_PER_MICRO,
stride,
stride_fn,
origin,
)
})
.map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO);
ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result))
}
Expand All @@ -664,10 +702,11 @@ fn date_bin_impl(
let scale = timestamp_scale::<T>();

// Per-row errors become NULL, matching scalar behavior.
let result: PrimitiveArray<T> = array.unary_opt(|val| {
val.checked_mul(scale)
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| binned / scale)
let result: PrimitiveArray<T> = array.unary_opt(|value| {
checked_scale_and_bin_to_nanos_or_null(
value, scale, stride, stride_fn, origin,
)
.map(|binned| binned / scale)
});

let array = result.with_timezone_opt(tz_opt.clone());
Expand Down Expand Up @@ -703,14 +742,17 @@ fn date_bin_impl(
}
let array = array.as_primitive::<Time32MillisecondType>();
let result: PrimitiveArray<Time32MillisecondType> =
array.unary_opt(|x| {
(x as i64)
.checked_mul(NANOS_PER_MILLI)
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| {
((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI)
as i32
})
array.unary_opt(|value| {
checked_scale_and_bin_to_nanos_or_null(
value as i64,
NANOS_PER_MILLI,
stride,
stride_fn,
origin,
)
.map(|binned| {
((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32
})
});
ColumnarValue::Array(Arc::new(result))
}
Expand All @@ -721,14 +763,19 @@ fn date_bin_impl(
);
}
let array = array.as_primitive::<Time32SecondType>();
let result: PrimitiveArray<Time32SecondType> = array.unary_opt(|x| {
(x as i64)
.checked_mul(NANOS_PER_SEC)
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
let result: PrimitiveArray<Time32SecondType> =
array.unary_opt(|value| {
checked_scale_and_bin_to_nanos_or_null(
value as i64,
NANOS_PER_SEC,
stride,
stride_fn,
origin,
)
.map(|binned| {
((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32
})
});
});
ColumnarValue::Array(Arc::new(result))
}
Time64(Microsecond) => {
Expand All @@ -739,12 +786,15 @@ fn date_bin_impl(
}
let array = array.as_primitive::<Time64MicrosecondType>();
let result: PrimitiveArray<Time64MicrosecondType> =
array.unary_opt(|x| {
x.checked_mul(NANOS_PER_MICRO)
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| {
(binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO
})
array.unary_opt(|value| {
checked_scale_and_bin_to_nanos_or_null(
value,
NANOS_PER_MICRO,
stride,
stride_fn,
origin,
)
.map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO)
});
ColumnarValue::Array(Arc::new(result))
}
Expand All @@ -756,11 +806,11 @@ fn date_bin_impl(
}
let array = array.as_primitive::<Time64NanosecondType>();
let result: PrimitiveArray<Time64NanosecondType> =
array.unary_opt(|x| {
array.try_unary(|x| {
stride_fn(stride, x, origin)
.map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY))
.ok()
});
.map_err(|e| ArrowError::ComputeError(e.to_string()))
})?;
ColumnarValue::Array(Arc::new(result))
}
_ => {
Expand Down
Loading