Skip to content

Commit b379fe1

Browse files
Keep track of consecutive Date32 failures
1 parent d99c637 commit b379fe1

File tree

1 file changed

+65
-70
lines changed

1 file changed

+65
-70
lines changed

datafusion/functions/src/datetime/to_char.rs

Lines changed: 65 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::any::Any;
1919
use std::sync::Arc;
2020

2121
use arrow::array::cast::AsArray;
22-
use arrow::array::{new_null_array, Array, ArrayRef, StringArray};
22+
use arrow::array::{new_null_array, Array, ArrayRef, GenericStringArray, StringArray};
2323
use arrow::compute::cast;
2424
use arrow::datatypes::DataType;
2525
use arrow::datatypes::DataType::{
@@ -28,7 +28,9 @@ use arrow::datatypes::DataType::{
2828
use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
2929
use arrow::error::ArrowError;
3030
use arrow::util::display::{ArrayFormatter, DurationFormat, FormatOptions};
31-
use datafusion_common::{exec_err, utils::take_function_args, Result, ScalarValue};
31+
use datafusion_common::{
32+
exec_datafusion_err, exec_err, utils::take_function_args, Result, ScalarValue,
33+
};
3234
use datafusion_expr::TypeSignature::Exact;
3335
use datafusion_expr::{
3436
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
@@ -259,13 +261,31 @@ fn to_char_scalar(
259261
}
260262
}
261263

262-
fn to_char_array(args: &[ColumnarValue]) -> Result<ColumnarValue> {
263-
let arrays = ColumnarValue::values_to_arrays(args)?;
264+
fn to_char_array_inner(
265+
values: ArrayRef,
266+
format_array: &GenericStringArray<i32>,
267+
) -> Result<Vec<Option<String>>, Result<ColumnarValue>> {
264268
let mut results: Vec<Option<String>> = vec![];
265-
let format_array = arrays[1].as_string::<i32>();
266-
let data_type = arrays[0].data_type();
267269

268-
for idx in 0..arrays[0].len() {
270+
let data_type = values.data_type();
271+
272+
// track consecutive Date32 values that fail with datetime formats as a single range.
273+
// this allows us to perform just one batch cast to Date64 instead of individual casts.
274+
let mut date_retry_range = None;
275+
276+
for idx in 0..values.len() {
277+
if let Some((start_offset, length)) = date_retry_range {
278+
let value_slice = cast(values.slice(start_offset, length).as_ref(), &Date64)
279+
.map_err(|e| Err(exec_datafusion_err!("{}", e)))?;
280+
281+
results.extend(to_char_array_inner(
282+
value_slice,
283+
&format_array.slice(start_offset, length),
284+
)?);
285+
286+
date_retry_range = None;
287+
}
288+
269289
let format = if format_array.is_null(idx) {
270290
None
271291
} else {
@@ -275,38 +295,57 @@ fn to_char_array(args: &[ColumnarValue]) -> Result<ColumnarValue> {
275295
results.push(None);
276296
continue;
277297
}
278-
let format_options = match build_format_options(data_type, format) {
279-
Ok(value) => value,
280-
Err(value) => return value,
281-
};
298+
let format_options = build_format_options(data_type, format)?;
299+
282300
// this isn't ideal but this can't use ValueFormatter as it isn't independent
283301
// from ArrayFormatter
284-
let formatter = ArrayFormatter::try_new(arrays[0].as_ref(), &format_options)?;
302+
let formatter = ArrayFormatter::try_new(values.as_ref(), &format_options)
303+
.map_err(|e| Err(exec_datafusion_err!("{}", e)))?;
304+
285305
let result = formatter.value(idx).try_to_string();
286306
match result {
287307
Ok(value) => results.push(Some(value)),
288308
Err(e) => {
289-
// if the data type was a Date32, formatting could have failed because the format string
290-
// contained datetime specifiers, so we'll treat this specific date element as a timestamp
309+
// when a Date32 fails formatting due to datetime specifiers in the format string:
310+
// - if we already have a retry range, extend it by one more element
311+
// - if this is the first failure, start a new retry range at this index
312+
// we'll later convert this entire range to Date64 timestamps in a single batch operation
291313
if data_type == &Date32 {
292-
let failed_date_value = arrays[0].slice(idx, 1);
293-
294-
match retry_date_as_timestamp(failed_date_value, &format_options) {
295-
Ok(value) => {
296-
results.push(Some(value));
297-
continue;
298-
}
299-
Err(e) => {
300-
return exec_err!("{}", e);
301-
}
302-
}
314+
date_retry_range = match date_retry_range {
315+
Some((offset, length)) => Some((offset, length + 1)),
316+
None => Some((idx, 1)),
317+
};
318+
319+
continue;
303320
}
304321

305-
return exec_err!("{}", e);
322+
return Err(exec_err!("{}", e));
306323
}
307324
}
308325
}
309326

327+
if let Some((start_offset, length)) = date_retry_range {
328+
let value_slice = cast(values.slice(start_offset, length).as_ref(), &Date64)
329+
.map_err(|e| exec_err!("{}", e))?;
330+
331+
results.extend(to_char_array_inner(
332+
value_slice,
333+
&format_array.slice(start_offset, length),
334+
)?);
335+
}
336+
337+
Ok(results)
338+
}
339+
340+
fn to_char_array(args: &[ColumnarValue]) -> Result<ColumnarValue> {
341+
let arrays = ColumnarValue::values_to_arrays(args)?;
342+
let format_array = arrays[1].as_string::<i32>();
343+
let results = match to_char_array_inner(Arc::clone(&arrays[0]), format_array) {
344+
Ok(results) => results,
345+
Err(Ok(columnar_value)) => return Ok(columnar_value),
346+
Err(Err(e)) => return exec_err!("{}", e),
347+
};
348+
310349
match args[0] {
311350
ColumnarValue::Array(_) => Ok(ColumnarValue::Array(Arc::new(StringArray::from(
312351
results,
@@ -320,19 +359,6 @@ fn to_char_array(args: &[ColumnarValue]) -> Result<ColumnarValue> {
320359
}
321360
}
322361

323-
fn retry_date_as_timestamp(
324-
array_ref: ArrayRef,
325-
format_options: &FormatOptions,
326-
) -> Result<String> {
327-
let target_data_type = Date64;
328-
329-
let date_value = cast(&array_ref, &target_data_type)?;
330-
let formatter = ArrayFormatter::try_new(date_value.as_ref(), format_options)?;
331-
let result = formatter.value(0).try_to_string()?;
332-
333-
Ok(result)
334-
}
335-
336362
#[cfg(test)]
337363
mod tests {
338364
use crate::datetime::to_char::ToCharFunc;
@@ -348,37 +374,6 @@ mod tests {
348374
use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
349375
use std::sync::Arc;
350376

351-
#[test]
352-
fn test_array_array() {
353-
let array_array_data = vec![(
354-
Arc::new(Date32Array::from(vec![18506, 18507])) as ArrayRef,
355-
StringArray::from(vec!["%Y::%m::%d", "%Y::%m::%d %S::%M::%H %f"]),
356-
StringArray::from(vec!["2020::09::01", "2020::09::02 00::00::00 000000000"]),
357-
)];
358-
359-
for (value, format, expected) in array_array_data {
360-
let batch_len = value.len();
361-
let args = datafusion_expr::ScalarFunctionArgs {
362-
args: vec![
363-
ColumnarValue::Array(value),
364-
ColumnarValue::Array(Arc::new(format) as ArrayRef),
365-
],
366-
number_rows: batch_len,
367-
return_type: &DataType::Utf8,
368-
};
369-
let result = ToCharFunc::new()
370-
.invoke_with_args(args)
371-
.expect("that to_char parsed values without error");
372-
373-
if let ColumnarValue::Array(result) = result {
374-
assert_eq!(result.len(), 2);
375-
assert_eq!(&expected as &dyn Array, result.as_ref());
376-
} else {
377-
panic!("Expected an array value")
378-
}
379-
}
380-
}
381-
382377
#[test]
383378
fn test_to_char() {
384379
let date = "2020-01-02T03:04:05"

0 commit comments

Comments
 (0)