Skip to content

Commit 293bf3e

Browse files
Kontinuationalamb
andauthored
fix: Preserves field metadata when creating logical plan for VALUES expression (#17525)
* [ISSUE 17425] Initial attempt to fix this problem * Add tests for the fix * Require that the metadata of values in VALUES clause must be identical * fix merge error --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 980c948 commit 293bf3e

File tree

2 files changed

+142
-3
lines changed

2 files changed

+142
-3
lines changed

datafusion/core/tests/user_defined/user_defined_scalar_functions.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1833,6 +1833,87 @@ async fn test_config_options_work_for_scalar_func() -> Result<()> {
18331833
Ok(())
18341834
}
18351835

1836+
/// https://github.com/apache/datafusion/issues/17425
1837+
#[tokio::test]
1838+
async fn test_extension_metadata_preserve_in_sql_values() -> Result<()> {
1839+
#[derive(Debug, Hash, PartialEq, Eq)]
1840+
struct MakeExtension {
1841+
signature: Signature,
1842+
}
1843+
1844+
impl Default for MakeExtension {
1845+
fn default() -> Self {
1846+
Self {
1847+
signature: Signature::user_defined(Volatility::Immutable),
1848+
}
1849+
}
1850+
}
1851+
1852+
impl ScalarUDFImpl for MakeExtension {
1853+
fn as_any(&self) -> &dyn Any {
1854+
self
1855+
}
1856+
1857+
fn name(&self) -> &str {
1858+
"make_extension"
1859+
}
1860+
1861+
fn signature(&self) -> &Signature {
1862+
&self.signature
1863+
}
1864+
1865+
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
1866+
Ok(arg_types.to_vec())
1867+
}
1868+
1869+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1870+
unreachable!("This shouldn't have been called")
1871+
}
1872+
1873+
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
1874+
Ok(args.arg_fields[0]
1875+
.as_ref()
1876+
.clone()
1877+
.with_metadata(HashMap::from([(
1878+
"ARROW:extension:metadata".to_string(),
1879+
"foofy.foofy".to_string(),
1880+
)]))
1881+
.into())
1882+
}
1883+
1884+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1885+
Ok(args.args[0].clone())
1886+
}
1887+
}
1888+
1889+
let ctx = SessionContext::new();
1890+
ctx.register_udf(MakeExtension::default().into());
1891+
1892+
let batches = ctx
1893+
.sql(
1894+
"
1895+
SELECT extension FROM (VALUES
1896+
('one', make_extension('foofy one')),
1897+
('two', make_extension('foofy two')),
1898+
('three', make_extension('foofy three')))
1899+
AS t(string, extension)
1900+
",
1901+
)
1902+
.await?
1903+
.collect()
1904+
.await?;
1905+
1906+
assert_eq!(
1907+
batches[0]
1908+
.schema()
1909+
.field(0)
1910+
.metadata()
1911+
.get("ARROW:extension:metadata"),
1912+
Some(&"foofy.foofy".into())
1913+
);
1914+
Ok(())
1915+
}
1916+
18361917
/// https://github.com/apache/datafusion/issues/17422
18371918
#[tokio::test]
18381919
async fn test_extension_metadata_preserve_in_subquery() -> Result<()> {

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::iter::once;
2525
use std::sync::Arc;
2626

2727
use crate::dml::CopyTo;
28-
use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
28+
use crate::expr::{Alias, FieldMetadata, PlannedReplaceSelectItem, Sort as SortExpr};
2929
use crate::expr_rewriter::{
3030
coerce_plan_expr_for_schema, normalize_col,
3131
normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
@@ -306,8 +306,17 @@ impl LogicalPlanBuilder {
306306

307307
for j in 0..n_cols {
308308
let mut common_type: Option<DataType> = None;
309+
let mut common_metadata: Option<FieldMetadata> = None;
309310
for (i, row) in values.iter().enumerate() {
310311
let value = &row[j];
312+
let metadata = value.metadata(&schema)?;
313+
if let Some(ref cm) = common_metadata {
314+
if &metadata != cm {
315+
return plan_err!("Inconsistent metadata across values list at row {i} column {j}. Was {:?} but found {:?}", cm, metadata);
316+
}
317+
} else {
318+
common_metadata = Some(metadata.clone());
319+
}
311320
let data_type = value.get_type(&schema)?;
312321
if data_type == DataType::Null {
313322
continue;
@@ -326,7 +335,11 @@ impl LogicalPlanBuilder {
326335
}
327336
// assuming common_type was not set, and no error, therefore the type should be NULL
328337
// since the code loop skips NULL
329-
fields.push(common_type.unwrap_or(DataType::Null), true);
338+
fields.push_with_metadata(
339+
common_type.unwrap_or(DataType::Null),
340+
true,
341+
common_metadata,
342+
);
330343
}
331344

332345
Self::infer_inner(values, fields, &schema)
@@ -1507,10 +1520,23 @@ impl ValuesFields {
15071520
}
15081521

15091522
pub fn push(&mut self, data_type: DataType, nullable: bool) {
1523+
self.push_with_metadata(data_type, nullable, None);
1524+
}
1525+
1526+
pub fn push_with_metadata(
1527+
&mut self,
1528+
data_type: DataType,
1529+
nullable: bool,
1530+
metadata: Option<FieldMetadata>,
1531+
) {
15101532
// Naming follows the convention described here:
15111533
// https://www.postgresql.org/docs/current/queries-values.html
15121534
let name = format!("column{}", self.inner.len() + 1);
1513-
self.inner.push(Field::new(name, data_type, nullable));
1535+
let mut field = Field::new(name, data_type, nullable);
1536+
if let Some(metadata) = metadata {
1537+
field.set_metadata(metadata.to_hashmap());
1538+
}
1539+
self.inner.push(field);
15141540
}
15151541

15161542
pub fn into_fields(self) -> Fields {
@@ -2153,7 +2179,10 @@ pub fn unnest_with_options(
21532179

21542180
#[cfg(test)]
21552181
mod tests {
2182+
use std::vec;
2183+
21562184
use super::*;
2185+
use crate::lit_with_metadata;
21572186
use crate::logical_plan::StringifiedPlan;
21582187
use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
21592188

@@ -2773,6 +2802,35 @@ mod tests {
27732802
Ok(())
27742803
}
27752804

2805+
#[test]
2806+
fn test_values_metadata() -> Result<()> {
2807+
let metadata: HashMap<String, String> =
2808+
[("ARROW:extension:metadata".to_string(), "test".to_string())]
2809+
.into_iter()
2810+
.collect();
2811+
let metadata = FieldMetadata::from(metadata);
2812+
let values = LogicalPlanBuilder::values(vec![
2813+
vec![lit_with_metadata(1, Some(metadata.clone()))],
2814+
vec![lit_with_metadata(2, Some(metadata.clone()))],
2815+
])?
2816+
.build()?;
2817+
assert_eq!(*values.schema().field(0).metadata(), metadata.to_hashmap());
2818+
2819+
// Do not allow VALUES with different metadata mixed together
2820+
let metadata2: HashMap<String, String> =
2821+
[("ARROW:extension:metadata".to_string(), "test2".to_string())]
2822+
.into_iter()
2823+
.collect();
2824+
let metadata2 = FieldMetadata::from(metadata2);
2825+
assert!(LogicalPlanBuilder::values(vec![
2826+
vec![lit_with_metadata(1, Some(metadata.clone()))],
2827+
vec![lit_with_metadata(2, Some(metadata2.clone()))],
2828+
])
2829+
.is_err());
2830+
2831+
Ok(())
2832+
}
2833+
27762834
#[test]
27772835
fn test_unique_field_aliases() {
27782836
let t1_field_1 = Field::new("a", DataType::Int32, false);

0 commit comments

Comments
 (0)