Skip to content

Commit 30c14ab

Browse files
authored
Optionally coerce names of maps and lists to match Parquet specification (#6828)
* optionally coerce names of maps and lists to match Parquet spec * less verbose * add ArrowWriter round trip test * move documentation to builder * use create_random_array for map and list arrays
1 parent 93ce75c commit 30c14ab

File tree

4 files changed

+179
-17
lines changed

4 files changed

+179
-17
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,7 @@ mod tests {
10881088
use arrow::datatypes::ToByteSlice;
10891089
use arrow::datatypes::{DataType, Schema};
10901090
use arrow::error::Result as ArrowResult;
1091+
use arrow::util::data_gen::create_random_array;
10911092
use arrow::util::pretty::pretty_format_batches;
10921093
use arrow::{array::*, buffer::Buffer};
10931094
use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer};
@@ -2491,6 +2492,56 @@ mod tests {
24912492
one_column_roundtrip(values, false);
24922493
}
24932494

2495+
#[test]
2496+
fn list_and_map_coerced_names() {
2497+
// Create map and list with non-Parquet naming
2498+
let list_field =
2499+
Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2500+
let map_field = Field::new_map(
2501+
"my_map",
2502+
"entries",
2503+
Field::new("keys", DataType::Int32, false),
2504+
Field::new("values", DataType::Int32, true),
2505+
false,
2506+
true,
2507+
);
2508+
2509+
let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2510+
let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2511+
2512+
let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2513+
2514+
// Write data to Parquet but coerce names to match spec
2515+
let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2516+
let file = tempfile::tempfile().unwrap();
2517+
let mut writer =
2518+
ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2519+
2520+
let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2521+
writer.write(&batch).unwrap();
2522+
let file_metadata = writer.close().unwrap();
2523+
2524+
// Coerced name of "item" should be "element"
2525+
assert_eq!(file_metadata.schema[3].name, "element");
2526+
// Coerced name of "entries" should be "key_value"
2527+
assert_eq!(file_metadata.schema[5].name, "key_value");
2528+
// Coerced name of "keys" should be "key"
2529+
assert_eq!(file_metadata.schema[6].name, "key");
2530+
// Coerced name of "values" should be "value"
2531+
assert_eq!(file_metadata.schema[7].name, "value");
2532+
2533+
// Double check schema after reading from the file
2534+
let reader = SerializedFileReader::new(file).unwrap();
2535+
let file_schema = reader.metadata().file_metadata().schema();
2536+
let fields = file_schema.get_fields();
2537+
let list_field = &fields[0].get_fields()[0];
2538+
assert_eq!(list_field.get_fields()[0].name(), "element");
2539+
let map_field = &fields[1].get_fields()[0];
2540+
assert_eq!(map_field.name(), "key_value");
2541+
assert_eq!(map_field.get_fields()[0].name(), "key");
2542+
assert_eq!(map_field.get_fields()[1].name(), "value");
2543+
}
2544+
24942545
#[test]
24952546
fn fallback_flush_data_page() {
24962547
//tests if the Fallback::flush_data_page clears all buffers correctly

parquet/src/arrow/schema/mod.rs

Lines changed: 104 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,11 @@ pub fn decimal_length_from_precision(precision: u8) -> usize {
303303

304304
/// Convert an arrow field to a parquet `Type`
305305
fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
306+
const PARQUET_LIST_ELEMENT_NAME: &str = "element";
307+
const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
308+
const PARQUET_KEY_FIELD_NAME: &str = "key";
309+
const PARQUET_VALUE_FIELD_NAME: &str = "value";
310+
306311
let name = field.name().as_str();
307312
let repetition = if field.is_nullable() {
308313
Repetition::OPTIONAL
@@ -527,10 +532,18 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
527532
.with_id(id)
528533
.build(),
529534
DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
535+
let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
536+
// Ensure proper naming per the Parquet specification
537+
let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
538+
Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
539+
} else {
540+
Arc::new(arrow_to_parquet_type(f, coerce_types)?)
541+
};
542+
530543
Type::group_type_builder(name)
531544
.with_fields(vec![Arc::new(
532545
Type::group_type_builder("list")
533-
.with_fields(vec![Arc::new(arrow_to_parquet_type(f, coerce_types)?)])
546+
.with_fields(vec![field_ref])
534547
.with_repetition(Repetition::REPEATED)
535548
.build()?,
536549
)])
@@ -559,13 +572,29 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
559572
}
560573
DataType::Map(field, _) => {
561574
if let DataType::Struct(struct_fields) = field.data_type() {
575+
// If coercing then set inner struct name to "key_value"
576+
let map_struct_name = if coerce_types {
577+
PARQUET_MAP_STRUCT_NAME
578+
} else {
579+
field.name()
580+
};
581+
582+
// If coercing then ensure struct fields are named "key" and "value"
583+
let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
584+
if coerce_types && fld.name() != name {
585+
let f = fld.as_ref().clone().with_name(name);
586+
Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
587+
} else {
588+
Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
589+
}
590+
};
591+
let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
592+
let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
593+
562594
Type::group_type_builder(name)
563595
.with_fields(vec![Arc::new(
564-
Type::group_type_builder(field.name())
565-
.with_fields(vec![
566-
Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?),
567-
Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?),
568-
])
596+
Type::group_type_builder(map_struct_name)
597+
.with_fields(vec![key_field, val_field])
569598
.with_repetition(Repetition::REPEATED)
570599
.build()?,
571600
)])
@@ -1420,6 +1449,75 @@ mod tests {
14201449
assert_eq!(arrow_fields, converted_arrow_fields);
14211450
}
14221451

1452+
#[test]
1453+
fn test_coerced_map_list() {
1454+
// Create Arrow schema with non-Parquet naming
1455+
let arrow_fields = vec![
1456+
Field::new_list(
1457+
"my_list",
1458+
Field::new("item", DataType::Boolean, true),
1459+
false,
1460+
),
1461+
Field::new_map(
1462+
"my_map",
1463+
"entries",
1464+
Field::new("keys", DataType::Utf8, false),
1465+
Field::new("values", DataType::Int32, true),
1466+
false,
1467+
true,
1468+
),
1469+
];
1470+
let arrow_schema = Schema::new(arrow_fields);
1471+
1472+
// Create Parquet schema with coerced names
1473+
let message_type = "
1474+
message parquet_schema {
1475+
REQUIRED GROUP my_list (LIST) {
1476+
REPEATED GROUP list {
1477+
OPTIONAL BOOLEAN element;
1478+
}
1479+
}
1480+
OPTIONAL GROUP my_map (MAP) {
1481+
REPEATED GROUP key_value {
1482+
REQUIRED BINARY key (STRING);
1483+
OPTIONAL INT32 value;
1484+
}
1485+
}
1486+
}
1487+
";
1488+
let parquet_group_type = parse_message_type(message_type).unwrap();
1489+
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1490+
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true).unwrap();
1491+
assert_eq!(
1492+
parquet_schema.columns().len(),
1493+
converted_arrow_schema.columns().len()
1494+
);
1495+
1496+
// Create Parquet schema without coerced names
1497+
let message_type = "
1498+
message parquet_schema {
1499+
REQUIRED GROUP my_list (LIST) {
1500+
REPEATED GROUP list {
1501+
OPTIONAL BOOLEAN item;
1502+
}
1503+
}
1504+
OPTIONAL GROUP my_map (MAP) {
1505+
REPEATED GROUP entries {
1506+
REQUIRED BINARY keys (STRING);
1507+
OPTIONAL INT32 values;
1508+
}
1509+
}
1510+
}
1511+
";
1512+
let parquet_group_type = parse_message_type(message_type).unwrap();
1513+
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1514+
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
1515+
assert_eq!(
1516+
parquet_schema.columns().len(),
1517+
converted_arrow_schema.columns().len()
1518+
);
1519+
}
1520+
14231521
#[test]
14241522
fn test_field_to_column_desc() {
14251523
let message_type = "

parquet/src/bin/parquet-rewrite.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ struct Args {
199199
/// Sets writer version.
200200
#[clap(long)]
201201
writer_version: Option<WriterVersionArgs>,
202+
203+
/// Sets whether to coerce Arrow types to match Parquet specification
204+
#[clap(long)]
205+
coerce_types: Option<bool>,
202206
}
203207

204208
fn main() {
@@ -262,6 +266,9 @@ fn main() {
262266
if let Some(value) = args.writer_version {
263267
writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
264268
}
269+
if let Some(value) = args.coerce_types {
270+
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
271+
}
265272
let writer_properties = writer_properties_builder.build();
266273
let mut parquet_writer = ArrowWriter::try_new(
267274
File::create(&args.output).expect("Unable to open output file"),

parquet/src/file/properties.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -287,15 +287,7 @@ impl WriterProperties {
287287
self.statistics_truncate_length
288288
}
289289

290-
/// Returns `coerce_types` boolean
291-
///
292-
/// Some Arrow types do not have a corresponding Parquet logical type.
293-
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
294-
/// Writers have the option to coerce these into native Parquet types. Type
295-
/// coercion allows for meaningful representations that do not require
296-
/// downstream readers to consider the embedded Arrow schema. However, type
297-
/// coercion also prevents the data from being losslessly round-tripped. This method
298-
/// returns `true` if type coercion enabled.
290+
/// Returns `true` if type coercion is enabled.
299291
pub fn coerce_types(&self) -> bool {
300292
self.coerce_types
301293
}
@@ -788,8 +780,22 @@ impl WriterPropertiesBuilder {
788780
self
789781
}
790782

791-
/// Sets flag to enable/disable type coercion.
792-
/// Takes precedence over globally defined settings.
783+
/// Sets flag to control if type coercion is enabled (defaults to `false`).
784+
///
785+
/// # Notes
786+
/// Some Arrow types do not have a corresponding Parquet logical type.
787+
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
788+
/// Also, for [`List`] and [`Map`] types, Parquet expects certain schema elements
789+
/// to have specific names to be considered fully compliant.
790+
/// Writers have the option to coerce these types and names to match those required
791+
/// by the Parquet specification.
792+
/// This type coercion allows for meaningful representations that do not require
793+
/// downstream readers to consider the embedded Arrow schema, and can allow for greater
794+
/// compatibility with other Parquet implementations. However, type
795+
/// coercion also prevents the data from being losslessly round-tripped.
796+
///
797+
/// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
798+
/// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
793799
pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
794800
self.coerce_types = coerce_types;
795801
self

0 commit comments

Comments
 (0)