Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c79156f
remove casting for coalesce
jayzhan211 Apr 27, 2024
a36e6b2
add more test
jayzhan211 Apr 27, 2024
bf16c92
add more test
jayzhan211 Apr 27, 2024
407e3c7
crate only visibility
jayzhan211 Apr 27, 2024
03b9162
polish comment
jayzhan211 Apr 27, 2024
4abf29d
improve test
jayzhan211 Apr 27, 2024
4965e8d
backup
jayzhan211 Apr 28, 2024
81f0235
introduce new signautre for coalesce
jayzhan211 Apr 28, 2024
bae996c
cleanup
jayzhan211 Apr 28, 2024
ddf9b1c
cleanup
jayzhan211 Apr 28, 2024
c2799ea
ignore err msg
jayzhan211 Apr 28, 2024
2574896
fmt
jayzhan211 Apr 28, 2024
6a17e57
fix doc
jayzhan211 Apr 28, 2024
4cba8c5
cleanup
jayzhan211 Apr 28, 2024
f1cfb8d
add more test
jayzhan211 Apr 28, 2024
d2e83d3
switch to type_resolution coercion
jayzhan211 Apr 28, 2024
3a88ad7
Merge remote-tracking branch 'upstream/main' into fix-coelesce
jayzhan211 Apr 29, 2024
03880a3
fix i64 and u64 case
jayzhan211 Apr 29, 2024
481f548
add more tests
jayzhan211 Apr 29, 2024
dfc4176
cleanup
jayzhan211 Apr 29, 2024
46a9060
add null case
jayzhan211 Apr 29, 2024
d656645
fmt
jayzhan211 Apr 29, 2024
5683447
fix
jayzhan211 Apr 29, 2024
b949fae
rename to type_union_resolution
jayzhan211 Apr 29, 2024
5aaeb5b
add comment
jayzhan211 Apr 29, 2024
a968c0e
Merge remote-tracking branch 'upstream/main' into fix-coelesce
jayzhan211 May 1, 2024
cf679c5
Merge remote-tracking branch 'upstream/main' into fix-coelesce
jayzhan211 May 2, 2024
15471ab
cleanup
jayzhan211 May 2, 2024
e5cc46b
fix test
jayzhan211 May 2, 2024
a810e85
add comment
jayzhan211 May 2, 2024
cb16cda
rm test
jayzhan211 May 3, 2024
53bedda
Merge remote-tracking branch 'upstream/main' into fix-coelesce
jayzhan211 May 12, 2024
a37da2d
cleanup since rebase
jayzhan211 May 12, 2024
70239e0
add more test
jayzhan211 May 12, 2024
be116f8
add more test
jayzhan211 May 12, 2024
8f4e991
fix msg
jayzhan211 May 12, 2024
6a8fe6f
Merge remote-tracking branch 'upstream/main' into fix-coelesce
jayzhan211 May 14, 2024
20e618e
Merge remote-tracking branch 'upstream/main' into fix-coelesce
jayzhan211 May 14, 2024
4153593
fmt
jayzhan211 May 14, 2024
030a519
rm pure_string_coercion
jayzhan211 May 14, 2024
5b797d5
rm duplicate
jayzhan211 May 14, 2024
b954479
change type in select.slt
jayzhan211 May 25, 2024
829b5a2
fix slt
jayzhan211 May 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
backup
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 committed Apr 28, 2024
commit 4965e8da146d88fbb9d43c4eb3ef32f353f86360
163 changes: 152 additions & 11 deletions datafusion/expr/src/type_coercion/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use arrow::datatypes::{

use datafusion_common::{exec_datafusion_err, plan_datafusion_err, plan_err, Result};

use super::functions::coerced_from;

/// The type signature of an instantiation of binary operator expression such as
/// `lhs + rhs`
///
Expand Down Expand Up @@ -289,7 +291,134 @@ fn bitwise_coercion(left_type: &DataType, right_type: &DataType) -> Option<DataT
}
}

#[derive(Debug, PartialEq, Eq)]
enum TypeCategory {
Array,
Boolean,
Numeric,
String,
DateTime,
Composite,
Unknown,
}

fn data_type_category(data_type: &DataType) -> TypeCategory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be something like

impl From<&DataType> for TypeCategory { 
...

And then you create a TypeCategory like

let category = TypeCategory::from(&type)

if data_type.is_numeric() {
return TypeCategory::Numeric;
}

if matches!(data_type, DataType::Boolean) {
return TypeCategory::Boolean;
}

if matches!(
data_type,
DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_)
) {
return TypeCategory::Array;
}

if matches!(data_type, DataType::Utf8 | DataType::LargeUtf8) {
return TypeCategory::String;
}

if matches!(
data_type,
DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Interval(_)
| DataType::Duration(_)
) {
return TypeCategory::DateTime;
}

if matches!(
data_type,
DataType::Dictionary(_, _) | DataType::Struct(_) | DataType::Union(_, _)
) {
return TypeCategory::Composite;
}

return TypeCategory::Unknown;
}

/// Coerce `lhs_type` and `rhs_type` to a common type for the purposes of constructs including
/// CASE, ARRAY, VALUES, and the GREATEST and LEAST functions.
/// See https://www.postgresql.org/docs/current/typeconv-union-case.html for more information.
pub fn type_resolution(data_types: &[DataType]) -> Option<DataType> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the biggest difference between comparison coercion is that we categorize types.

Copy link
Contributor

@erratic-pattern erratic-pattern Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc string is a bit confusing here because there are no lhs_type and rhs_type. I assume that case would be type_resolution(&[lhs_type, rhs_type])?

Also, maybe this function name could reflect that it's finding a union type to satisfy a set of input types, for example type_union, type_union_resolution, resolve_type_union. I think type_resolution is too generic.

if data_types.is_empty() {
return None;
}

// if all the data_types is the same return first one
if data_types.iter().all(|t| t == &data_types[0]) {
return Some(data_types[0].clone());
}

// if all the data_types are null, return string
if data_types.iter().all(|t| t == &DataType::Null) {
return Some(DataType::Utf8);
}

// Ignore Nulls, if any data_type category is not the same, return None
let data_types_category: Vec<TypeCategory> = data_types
.iter()
.filter(|&t| t != &DataType::Null)
.map(data_type_category)
.collect();
if data_types_category
.iter()
.any(|t| t != &data_types_category[0])
{
return None;
}

// Ignore Nulls
let mut candidate_type: Option<DataType> = None;
for data_type in data_types.iter() {
if data_type == &DataType::Null {
continue;
}
if let Some(ref candidate_t) = candidate_type {
println!("data_Type: {:?}", data_type);
println!("candidate_type: {:?}", candidate_t);
if let Some(t) = coerced_from(data_type, candidate_t) {
// if let Some(t) = type_resolution_coercion(data_type, &candidate_type) {
candidate_type = Some(t);
} else if coerced_from(candidate_t, data_type).is_some() {
// keep the candidate type
} else {
// Not coercible, return None
return None;
}
} else {
candidate_type = Some(data_type.clone());
}
}

println!("candidate_type: {:?}", candidate_type);

candidate_type
}

/// Coerce `lhs_type` and `rhs_type` to a common type for the purposes of type resolution
/// Unlike [comparison_coercion], usually the coerced type is more `wider`.
// fn type_resolution_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
// if lhs_type == rhs_type {
// // same type => all good
// return Some(lhs_type.clone());
// }

// // binary numeric is able to reuse
// comparison_binary_numeric_coercion(lhs_type, rhs_type)
// }

/// Coerce `lhs_type` and `rhs_type` to a common type for the purposes of a comparison operation
/// Unlike [type_resolution_coercion], usually the coerced type is for comparison only.
/// For example, compare with Dictionary and Dictionary, only value type is what we care about
pub fn comparison_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
if lhs_type == rhs_type {
// same type => equality is possible
Expand Down Expand Up @@ -375,20 +504,13 @@ pub(crate) fn comparison_binary_numeric_coercion(
return Some(lhs_type.clone());
}

if let Some(t) = decimal_coercion(lhs_type, rhs_type) {
return Some(t);
}

// these are ordered from most informative to least informative so
// that the coercion does not lose information via truncation
match (lhs_type, rhs_type) {
// Prefer decimal data type over floating point for comparison operation
(Decimal128(_, _), Decimal128(_, _)) => {
get_wider_decimal_type(lhs_type, rhs_type)
}
(Decimal128(_, _), _) => get_comparison_common_decimal_type(lhs_type, rhs_type),
(_, Decimal128(_, _)) => get_comparison_common_decimal_type(rhs_type, lhs_type),
(Decimal256(_, _), Decimal256(_, _)) => {
get_wider_decimal_type(lhs_type, rhs_type)
}
(Decimal256(_, _), _) => get_comparison_common_decimal_type(lhs_type, rhs_type),
(_, Decimal256(_, _)) => get_comparison_common_decimal_type(rhs_type, lhs_type),
(Float64, _) | (_, Float64) => Some(Float64),
(_, Float32) | (Float32, _) => Some(Float32),
// The following match arms encode the following logic: Given the two
Expand Down Expand Up @@ -426,6 +548,25 @@ pub(crate) fn comparison_binary_numeric_coercion(
}
}

pub fn decimal_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
use arrow::datatypes::DataType::*;

match (lhs_type, rhs_type) {
// Prefer decimal data type over floating point for comparison operation
(Decimal128(_, _), Decimal128(_, _)) => {
get_wider_decimal_type(lhs_type, rhs_type)
}
(Decimal128(_, _), _) => get_comparison_common_decimal_type(lhs_type, rhs_type),
(_, Decimal128(_, _)) => get_comparison_common_decimal_type(rhs_type, lhs_type),
(Decimal256(_, _), Decimal256(_, _)) => {
get_wider_decimal_type(lhs_type, rhs_type)
}
(Decimal256(_, _), _) => get_comparison_common_decimal_type(lhs_type, rhs_type),
(_, Decimal256(_, _)) => get_comparison_common_decimal_type(rhs_type, lhs_type),
(_, _) => None,
}
}

/// Coerce `lhs_type` and `rhs_type` to a common type for the purposes of
/// a comparison operation where one is a decimal
fn get_comparison_common_decimal_type(
Expand Down
74 changes: 53 additions & 21 deletions datafusion/expr/src/type_coercion/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use crate::signature::{
ArrayFunctionSignature, FIXED_SIZE_LIST_WILDCARD, TIMEZONE_WILDCARD,
};
use crate::type_coercion::binary::{decimal_coercion, type_resolution};
use crate::{Signature, TypeSignature};
use arrow::{
compute::can_cast_types,
Expand Down Expand Up @@ -53,16 +54,22 @@ pub fn data_types(
}
}

println!("current_types: {:?}", current_types);
println!("type_signature: {:?}", &signature.type_signature);
let valid_types = get_valid_types(&signature.type_signature, current_types)?;

println!("valid_types: {:?}", valid_types);

if valid_types
.iter()
.any(|data_type| data_type == current_types)
{
return Ok(current_types.to_vec());
}

if !signature.type_signature.skip_coercion() {
println!("valid_types: {:?}", valid_types);

if true {
// Try and coerce the argument types to match the signature, returning the
// coerced types from the first matching signature.
for valid_types in valid_types {
Expand Down Expand Up @@ -187,25 +194,31 @@ fn get_valid_types(
.map(|valid_type| (0..*number).map(|_| valid_type.clone()).collect())
.collect(),
TypeSignature::VariadicEqualOrNull => {
current_types
.iter()
.find(|&t| t != &DataType::Null)
.map_or_else(
|| vec![vec![DataType::Null; current_types.len()]],
|t| {
let valid_types = current_types
.iter()
.map(|d| {
if d != &DataType::Null {
t.clone()
} else {
DataType::Null
}
})
.collect::<Vec<_>>();
vec![valid_types]
},
)
if let Some(common_type) = type_resolution(current_types) {
vec![vec![common_type; current_types.len()]]
} else {
vec![]
}

// current_types
// .iter()
// .find(|&t| t != &DataType::Null)
// .map_or_else(
// || vec![vec![DataType::Null; current_types.len()]],
// |t| {
// let valid_types = current_types
// .iter()
// .map(|d| {
// if d != &DataType::Null {
// t.clone()
// } else {
// DataType::Null
// }
// })
// .collect::<Vec<_>>();
// vec![valid_types]
// },
// )
}
TypeSignature::VariadicEqual => {
let new_type = current_types.iter().skip(1).try_fold(
Expand Down Expand Up @@ -330,11 +343,12 @@ pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool {
false
}

fn coerced_from<'a>(
pub(crate) fn coerced_from<'a>(
type_into: &'a DataType,
type_from: &'a DataType,
) -> Option<DataType> {
use self::DataType::*;

// match Dictionary first
match (type_into, type_from) {
// coerced dictionary first
Expand All @@ -348,6 +362,18 @@ fn coerced_from<'a>(
{
Some(type_into.clone())
}
// coerce decimal
// (Decimal128(_, _) | Decimal256(_, _), Null) => {
// Some(type_into.clone())
// }
(Decimal128(_, _), Decimal128(_, _)) | (Decimal256(_, _), Decimal256(_, _)) => {
decimal_coercion(type_into, type_from)
}
(Decimal128(_, _) | Decimal256(_, _), _)
if matches!(type_from, Int8 | Int16 | Int32 | Int64) =>
{
decimal_coercion(type_into, type_from)
}
// coerced into type_into
(Int8, _) if matches!(type_from, Null | Int8) => Some(type_into.clone()),
(Int16, _) if matches!(type_from, Null | Int8 | Int16 | UInt8) => {
Expand Down Expand Up @@ -473,6 +499,12 @@ fn coerced_from<'a>(
{
Some(type_into.clone())
}
// (Decimal128(_, _), _) if matches!(type_from, Null | Int8 | Int16 | Int32 | Int64) => {
// Some(type_into.clone())
// }
// (Decimal256(_, _), _) if matches!(type_from, Null | Int8 | Int16 | Int32 | Int64) => {
// Some(type_into.clone())
// }
_ => None,
}
}
Expand Down
21 changes: 15 additions & 6 deletions datafusion/functions/src/core/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, is_not_null, is_null};
use arrow::datatypes::DataType;

use datafusion_common::{exec_err, Result};
use datafusion_common::{exec_err, internal_err, Result};
use datafusion_expr::type_coercion::binary::type_resolution;
use datafusion_expr::ColumnarValue;
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};

Expand Down Expand Up @@ -59,11 +60,19 @@ impl ScalarUDFImpl for CoalesceFunc {
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types
.iter()
.find(|&t| t != &DataType::Null)
.unwrap_or(&DataType::Null)
.clone())
if let Some(common_type) = type_resolution(arg_types) {
println!("args: {:?}", arg_types);
println!("common_type: {:?}", common_type);
return Ok(common_type);
} else {
return internal_err!("Error should be thrown via signature validation");
}

// Ok(arg_types
// .iter()
// .find(|&t| t != &DataType::Null)
// .unwrap_or(&DataType::Null)
// .clone())
}

/// coalesce evaluates to the first value which is not NULL
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
}

fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
println!("self.return_type: {:?}", self.return_type);
Ok(self.return_type.clone())
}

Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub fn create_physical_expr(
let return_type =
fun.return_type_from_exprs(args, input_dfschema, &input_expr_types)?;

println!("return_type: {:?}", return_type);
println!("input_expr_types: {:?}", input_expr_types);
let fun_def = ScalarFunctionDefinition::UDF(Arc::new(fun.clone()));
Ok(Arc::new(ScalarFunctionExpr::new(
fun.name(),
Expand Down
Loading