From fbab6c8b13d7ee8ec805be8aafc77cf33b9cccaa Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Mon, 18 Jan 2021 19:14:09 -0800 Subject: [PATCH 1/9] ARROW-11310: [Rust] implement JSON writer --- rust/arrow/src/json/mod.rs | 2 + rust/arrow/src/json/writer.rs | 301 ++++++++++++++++++++++++++++++++++ 2 files changed, 303 insertions(+) create mode 100644 rust/arrow/src/json/writer.rs diff --git a/rust/arrow/src/json/mod.rs b/rust/arrow/src/json/mod.rs index 6a4dfc1fd4ac..85ab6aec01b3 100644 --- a/rust/arrow/src/json/mod.rs +++ b/rust/arrow/src/json/mod.rs @@ -18,6 +18,8 @@ //! Transfer data between the Arrow memory format and JSON line-delimited records. pub mod reader; +pub mod writer; pub use self::reader::Reader; pub use self::reader::ReaderBuilder; +pub use self::writer::Writer; diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs new file mode 100644 index 000000000000..89dc364c63e9 --- /dev/null +++ b/rust/arrow/src/json/writer.rs @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! JSON Writer +//! +//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also +//! provides a Writer struct to help serialize record batches directly into line-delimited JSON +//! objects as bytes. +//! +//! Serialize record batches into array of JSON objects: +//! +//! ``` +//! use std::sync::Arc; +//! +//! use arrow::array::Int32Array; +//! use arrow::datatypes::{DataType, Field, Schema}; +//! use arrow::json; +//! use arrow::record_batch::RecordBatch; +//! +//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); +//! let a = Int32Array::from(vec![1, 2, 3]); +//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); +//! +//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]); +//! assert_eq!( +//! serde_json::Value::Object(json_rows[1].clone()), +//! serde_json::json!({"a": 2}), +//! ); +//! ``` +//! +//! Serialize record batches into line-delimited JSON bytes: +//! +//! ``` +//! use std::sync::Arc; +//! +//! use arrow::array::Int32Array; +//! use arrow::datatypes::{DataType, Field, Schema}; +//! use arrow::json; +//! use arrow::record_batch::RecordBatch; +//! +//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); +//! let a = Int32Array::from(vec![1, 2, 3]); +//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); +//! +//! let buf = Vec::new(); +//! let mut writer = json::Writer::new(buf); +//! writer.write_batches(&vec![batch]).unwrap(); +//! ``` + +use std::io::{BufWriter, Write}; +use std::iter; + +use serde_json::map::Map as JsonMap; +use serde_json::Value; + +use crate::array::*; +use crate::datatypes::*; +use crate::error::Result; +use crate::record_batch::RecordBatch; + +fn set_column_by_primitive_type( + rows: &mut [JsonMap], + row_count: usize, + array: &ArrayRef, + col_name: &str, +) { + let primitive_arr = as_primitive_array::(array); + for (i, row) in rows.iter_mut().enumerate().take(row_count) { + row.insert( + col_name.to_string(), + primitive_arr + .value(i) + .into_json_value() + .unwrap_or(Value::Null), + ); + } +} + +fn set_column_for_json_rows( + rows: &mut [JsonMap], + row_count: usize, + array: &ArrayRef, + col_name: &str, +) { + match array.data_type() { + DataType::Null => { + for row in rows.iter_mut().take(row_count) { + row.insert(col_name.to_string(), Value::Null); + } + } + DataType::Boolean => { + let arr = as_boolean_array(array); + for (i, row) in rows.iter_mut().take(row_count).enumerate() { + row.insert(col_name.to_string(), arr.value(i).into()); + } + } + DataType::Int8 => { + set_column_by_primitive_type::(rows, row_count, array, col_name) + } + DataType::Int16 => { + set_column_by_primitive_type::(rows, row_count, array, col_name) + } + DataType::Int32 => { + set_column_by_primitive_type::(rows, row_count, array, col_name) + } + DataType::Int64 => { + set_column_by_primitive_type::(rows, row_count, array, col_name) + } + DataType::UInt8 => { + set_column_by_primitive_type::(rows, row_count, array, col_name) + } + DataType::UInt16 => { + set_column_by_primitive_type::(rows, row_count, array, col_name) + } + DataType::UInt32 => { + set_column_by_primitive_type::(rows, row_count, array, col_name) + } + DataType::UInt64 => { + set_column_by_primitive_type::(rows, row_count, array, col_name) + } + DataType::Float32 => { + set_column_by_primitive_type::(rows, row_count, array, col_name) + } + DataType::Float64 => { + set_column_by_primitive_type::(rows, row_count, array, col_name) + } + DataType::Utf8 => { + let strarr = as_string_array(array); + for (i, row) in rows.iter_mut().take(row_count).enumerate() { + row.insert(col_name.to_string(), strarr.value(i).into()); + } + } + DataType::Struct(_) => { + let arr = as_struct_array(array); + let inner_col_names = arr.column_names(); + + let mut inner_objs = iter::repeat(JsonMap::new()) + .take(row_count) + .collect::>>(); + + arr.columns() + .iter() + .enumerate() + .for_each(|(j, struct_col)| { + set_column_for_json_rows( + &mut inner_objs, + row_count, + struct_col, + inner_col_names[j], + ); + }); + + rows.iter_mut() + .take(row_count) + .zip(inner_objs.into_iter()) + .for_each(|(row, obj)| { + row.insert(col_name.to_string(), Value::Object(obj)); + }); + } + _ => { + panic!(format!("Unsupported datatype: {:#?}", array.data_type())); + } + } +} + +pub fn record_batches_to_json_rows( + batches: &[RecordBatch], +) -> Vec> { + let mut rows: Vec> = iter::repeat(JsonMap::new()) + .take(batches.iter().map(|b| b.num_rows()).sum()) + .collect(); + + if !rows.is_empty() { + let schema = batches[0].schema(); + let mut base = 0; + batches.iter().for_each(|batch| { + let row_count = batch.num_rows(); + batch.columns().iter().enumerate().for_each(|(j, col)| { + let col_name = schema.field(j).name(); + set_column_for_json_rows(&mut rows[base..], row_count, col, col_name); + }); + base += row_count; + }); + } + + rows +} + +/// A JSON writer +#[derive(Debug)] +pub struct Writer { + writer: BufWriter, +} + +impl Writer { + pub fn new(writer: W) -> Self { + Self::from_buf_writer(BufWriter::new(writer)) + } + + pub fn from_buf_writer(writer: BufWriter) -> Self { + Self { writer } + } + + pub fn write_row(&mut self, row: &Value) -> Result<()> { + self.writer.write_all(&serde_json::to_vec(row)?)?; + self.writer.write_all(b"\n")?; + Ok(()) + } + + pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> { + for row in record_batches_to_json_rows(batches) { + self.write_row(&Value::Object(row))?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::fs::{read_to_string, File}; + use std::sync::Arc; + + use crate::json::reader::*; + + use super::*; + + #[test] + fn write_simple_rows() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ]); + + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + let b = StringArray::from(vec!["a", "b", "c", "d", "e"]); + + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) + .unwrap(); + + let mut buf = Vec::new(); + { + let mut writer = Writer::new(&mut buf); + writer.write_batches(&vec![batch]).unwrap(); + } + + assert_eq!( + String::from_utf8(buf).unwrap(), + r#"{"a":1,"b":"a"} +{"a":2,"b":"b"} +{"a":3,"b":"c"} +{"a":4,"b":"d"} +{"a":5,"b":"e"} +"# + ); + } + + fn test_write_for_file(test_file: &str) { + let builder = ReaderBuilder::new() + .infer_schema(None) + .with_batch_size(1024); + let mut reader: Reader = builder + .build::(File::open(test_file).unwrap()) + .unwrap(); + let batch = reader.next().unwrap().unwrap(); + + let mut buf = Vec::new(); + { + let mut writer = Writer::new(&mut buf); + writer.write_batches(&vec![batch]).unwrap(); + } + + let result = String::from_utf8(buf).unwrap(); + let expected = read_to_string(test_file).unwrap(); + for (r, e) in result.lines().zip(expected.lines()) { + assert_eq!( + serde_json::from_str::(r).unwrap(), + serde_json::from_str::(e).unwrap() + ); + } + } + + #[test] + fn write_basic_rows() { + test_write_for_file("test/data/basic.json"); + } +} From f20095948e13bbf126e8c0a46857e4dc0320d609 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Tue, 26 Jan 2021 20:55:56 -0800 Subject: [PATCH 2/9] address review feedback --- rust/arrow/src/json/writer.rs | 68 ++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs index 89dc364c63e9..32facff62de7 100644 --- a/rust/arrow/src/json/writer.rs +++ b/rust/arrow/src/json/writer.rs @@ -79,15 +79,19 @@ fn set_column_by_primitive_type( col_name: &str, ) { let primitive_arr = as_primitive_array::(array); - for (i, row) in rows.iter_mut().enumerate().take(row_count) { - row.insert( - col_name.to_string(), - primitive_arr - .value(i) - .into_json_value() - .unwrap_or(Value::Null), - ); - } + + rows.iter_mut() + .zip(primitive_arr.iter()) + .take(row_count) + .for_each(|(row, maybe_value)| { + row.insert( + col_name.to_string(), + match maybe_value { + Some(v) => v.into_json_value().unwrap_or(Value::Null), + None => Value::Null, + }, + ); + }); } fn set_column_for_json_rows( @@ -104,9 +108,17 @@ fn set_column_for_json_rows( } DataType::Boolean => { let arr = as_boolean_array(array); - for (i, row) in rows.iter_mut().take(row_count).enumerate() { - row.insert(col_name.to_string(), arr.value(i).into()); - } + rows.iter_mut().zip(arr.iter()).take(row_count).for_each( + |(row, maybe_value)| { + row.insert( + col_name.to_string(), + match maybe_value { + Some(v) => v.into(), + None => Value::Null, + }, + ); + }, + ); } DataType::Int8 => { set_column_by_primitive_type::(rows, row_count, array, col_name) @@ -140,9 +152,17 @@ fn set_column_for_json_rows( } DataType::Utf8 => { let strarr = as_string_array(array); - for (i, row) in rows.iter_mut().take(row_count).enumerate() { - row.insert(col_name.to_string(), strarr.value(i).into()); - } + rows.iter_mut().zip(strarr.iter()).take(row_count).for_each( + |(row, maybe_value)| { + row.insert( + col_name.to_string(), + match maybe_value { + Some(v) => v.into(), + None => Value::Null, + }, + ); + }, + ); } DataType::Struct(_) => { let arr = as_struct_array(array); @@ -241,12 +261,12 @@ mod tests { #[test] fn write_simple_rows() { let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Utf8, false), + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Utf8, false), ]); - let a = Int32Array::from(vec![1, 2, 3, 4, 5]); - let b = StringArray::from(vec!["a", "b", "c", "d", "e"]); + let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]); + let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]); let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) @@ -260,11 +280,11 @@ mod tests { assert_eq!( String::from_utf8(buf).unwrap(), - r#"{"a":1,"b":"a"} -{"a":2,"b":"b"} -{"a":3,"b":"c"} -{"a":4,"b":"d"} -{"a":5,"b":"e"} + r#"{"c1":1,"c2":"a"} +{"c1":2,"c2":"b"} +{"c1":3,"c2":"c"} +{"c1":null,"c2":"d"} +{"c1":5,"c2":null} "# ); } From 55ddadd933bfbfa033be25ff5f86d07170eacdbe Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 30 Jan 2021 23:20:51 -0800 Subject: [PATCH 3/9] add list type support --- rust/arrow/src/array/array_list.rs | 16 ++- rust/arrow/src/array/iterator.rs | 46 ++++++++- rust/arrow/src/json/writer.rs | 157 ++++++++++++++++++++--------- 3 files changed, 171 insertions(+), 48 deletions(-) diff --git a/rust/arrow/src/array/array_list.rs b/rust/arrow/src/array/array_list.rs index 0f881a86f23a..541c28f34ebe 100644 --- a/rust/arrow/src/array/array_list.rs +++ b/rust/arrow/src/array/array_list.rs @@ -24,7 +24,7 @@ use num::Num; use super::{ array::print_long_array, make_array, raw_pointer::RawPtrBox, Array, ArrayDataRef, - ArrayRef, + ArrayRef, GenericListArrayIter, }; use crate::datatypes::ArrowNativeType; use crate::datatypes::*; @@ -102,6 +102,20 @@ impl GenericListArray { fn value_offset_at(&self, i: usize) -> OffsetSize { unsafe { *self.value_offsets.as_ptr().add(i) } } + + /// constructs a new iterator + pub fn iter<'a>(&'a self) -> GenericListArrayIter<'a, OffsetSize> { + GenericListArrayIter::<'a, OffsetSize>::new(&self) + } +} + +impl<'a, S: OffsetSizeTrait> IntoIterator for &'a GenericListArray { + type Item = Option; + type IntoIter = GenericListArrayIter<'a, S>; + + fn into_iter(self) -> Self::IntoIter { + GenericListArrayIter::<'a, S>::new(self) + } } impl From for GenericListArray { diff --git a/rust/arrow/src/array/iterator.rs b/rust/arrow/src/array/iterator.rs index 463eb031ce62..ff1a8306126e 100644 --- a/rust/arrow/src/array/iterator.rs +++ b/rust/arrow/src/array/iterator.rs @@ -18,8 +18,9 @@ use crate::datatypes::ArrowPrimitiveType; use super::{ - Array, BinaryOffsetSizeTrait, BooleanArray, GenericBinaryArray, GenericStringArray, - PrimitiveArray, StringOffsetSizeTrait, + Array, ArrayRef, BinaryOffsetSizeTrait, BooleanArray, GenericBinaryArray, + GenericListArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, + StringOffsetSizeTrait, }; /// an iterator that returns Some(T) or None, that can be used on any PrimitiveArray @@ -238,6 +239,47 @@ impl<'a, T: BinaryOffsetSizeTrait> std::iter::Iterator for GenericBinaryIter<'a, } } +#[derive(Debug)] +pub struct GenericListArrayIter<'a, S> +where + S: OffsetSizeTrait, +{ + array: &'a GenericListArray, + i: usize, + len: usize, +} + +impl<'a, S: OffsetSizeTrait> GenericListArrayIter<'a, S> { + pub fn new(array: &'a GenericListArray) -> Self { + GenericListArrayIter:: { + array, + i: 0, + len: array.len(), + } + } +} + +impl<'a, S: OffsetSizeTrait> std::iter::Iterator for GenericListArrayIter<'a, S> { + type Item = Option; + + fn next(&mut self) -> Option { + let i = self.i; + if i >= self.len { + None + } else if self.array.is_null(i) { + self.i += 1; + Some(None) + } else { + self.i += 1; + Some(Some(self.array.value(i))) + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.len - self.i, Some(self.len - self.i)) + } +} + /// all arrays have known size. impl<'a, T: BinaryOffsetSizeTrait> std::iter::ExactSizeIterator for GenericBinaryIter<'a, T> diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs index 32facff62de7..08716cd19730 100644 --- a/rust/arrow/src/json/writer.rs +++ b/rust/arrow/src/json/writer.rs @@ -72,6 +72,66 @@ use crate::datatypes::*; use crate::error::Result; use crate::record_batch::RecordBatch; +fn primitive_array_to_json(array: &ArrayRef) -> Vec { + as_primitive_array::(array) + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => v.into_json_value().unwrap_or(Value::Null), + None => Value::Null, + }) + .collect() +} + +pub fn array_to_json_array(array: &ArrayRef) -> Vec { + match array.data_type() { + DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(), + DataType::Boolean => as_boolean_array(array) + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => v.into(), + None => Value::Null, + }) + .collect(), + + DataType::Utf8 => as_string_array(array) + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => v.into(), + None => Value::Null, + }) + .collect(), + DataType::Int8 => primitive_array_to_json::(array), + DataType::Int16 => primitive_array_to_json::(array), + DataType::Int32 => primitive_array_to_json::(array), + DataType::Int64 => primitive_array_to_json::(array), + DataType::UInt8 => primitive_array_to_json::(array), + DataType::UInt16 => primitive_array_to_json::(array), + DataType::UInt32 => primitive_array_to_json::(array), + DataType::UInt64 => primitive_array_to_json::(array), + DataType::Float32 => primitive_array_to_json::(array), + DataType::Float64 => primitive_array_to_json::(array), + _ => { + panic!(format!( + "Unsupported datatype for array conversion: {:#?}", + array.data_type() + )); + } + } +} + +macro_rules! set_column_by_array_type { + ($cast_fn:ident, $col_name:ident, $rows:ident, $array:ident, $row_count:ident) => { + let arr = $cast_fn($array); + $rows.iter_mut().zip(arr.iter()).take($row_count).for_each( + |(row, maybe_value)| { + if let Some(v) = maybe_value { + row.insert($col_name.to_string(), v.into()); + } + }, + ); + }; +} + fn set_column_by_primitive_type( rows: &mut [JsonMap], row_count: usize, @@ -84,13 +144,10 @@ fn set_column_by_primitive_type( .zip(primitive_arr.iter()) .take(row_count) .for_each(|(row, maybe_value)| { - row.insert( - col_name.to_string(), - match maybe_value { - Some(v) => v.into_json_value().unwrap_or(Value::Null), - None => Value::Null, - }, - ); + // when value is null, we simply skip setting the key + if let Some(j) = maybe_value.and_then(|v| v.into_json_value()) { + row.insert(col_name.to_string(), j); + } }); } @@ -101,25 +158,6 @@ fn set_column_for_json_rows( col_name: &str, ) { match array.data_type() { - DataType::Null => { - for row in rows.iter_mut().take(row_count) { - row.insert(col_name.to_string(), Value::Null); - } - } - DataType::Boolean => { - let arr = as_boolean_array(array); - rows.iter_mut().zip(arr.iter()).take(row_count).for_each( - |(row, maybe_value)| { - row.insert( - col_name.to_string(), - match maybe_value { - Some(v) => v.into(), - None => Value::Null, - }, - ); - }, - ); - } DataType::Int8 => { set_column_by_primitive_type::(rows, row_count, array, col_name) } @@ -150,29 +188,25 @@ fn set_column_for_json_rows( DataType::Float64 => { set_column_by_primitive_type::(rows, row_count, array, col_name) } + DataType::Null => { + // when value is null, we simply skip setting the key + } + DataType::Boolean => { + set_column_by_array_type!(as_boolean_array, col_name, rows, array, row_count); + } DataType::Utf8 => { - let strarr = as_string_array(array); - rows.iter_mut().zip(strarr.iter()).take(row_count).for_each( - |(row, maybe_value)| { - row.insert( - col_name.to_string(), - match maybe_value { - Some(v) => v.into(), - None => Value::Null, - }, - ); - }, - ); + set_column_by_array_type!(as_string_array, col_name, rows, array, row_count); } DataType::Struct(_) => { - let arr = as_struct_array(array); - let inner_col_names = arr.column_names(); + let structarr = as_struct_array(array); + let inner_col_names = structarr.column_names(); let mut inner_objs = iter::repeat(JsonMap::new()) .take(row_count) .collect::>>(); - arr.columns() + structarr + .columns() .iter() .enumerate() .for_each(|(j, struct_col)| { @@ -191,6 +225,20 @@ fn set_column_for_json_rows( row.insert(col_name.to_string(), Value::Object(obj)); }); } + DataType::List(_) => { + let listarr = as_list_array::(array); + rows.iter_mut() + .zip(listarr.iter()) + .take(row_count) + .for_each(|(row, maybe_value)| { + if let Some(v) = maybe_value { + row.insert( + col_name.to_string(), + Value::Array(array_to_json_array(&v)), + ); + } + }); + } _ => { panic!(format!("Unsupported datatype: {:#?}", array.data_type())); } @@ -307,10 +355,14 @@ mod tests { let result = String::from_utf8(buf).unwrap(); let expected = read_to_string(test_file).unwrap(); for (r, e) in result.lines().zip(expected.lines()) { - assert_eq!( - serde_json::from_str::(r).unwrap(), - serde_json::from_str::(e).unwrap() - ); + let mut expected_json = serde_json::from_str::(e).unwrap(); + // remove null value from object to make comparision consistent: + if let Value::Object(obj) = expected_json { + expected_json = Value::Object( + obj.into_iter().filter(|(_, v)| *v != Value::Null).collect(), + ); + } + assert_eq!(serde_json::from_str::(r).unwrap(), expected_json,); } } @@ -318,4 +370,19 @@ mod tests { fn write_basic_rows() { test_write_for_file("test/data/basic.json"); } + + #[test] + fn write_arrays() { + test_write_for_file("test/data/arrays.json"); + } + + #[test] + fn write_basic_nulls() { + test_write_for_file("test/data/basic_nulls.json"); + } + + #[test] + fn write_mixed() { + test_write_for_file("test/data/mixed_arrays.json"); + } } From 255b0ee98f92928b72d7d4146c8ec95356823e21 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sun, 31 Jan 2021 00:10:53 -0800 Subject: [PATCH 4/9] add test for nested struct --- rust/arrow/src/json/writer.rs | 66 +++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs index 08716cd19730..829fd55c70fe 100644 --- a/rust/arrow/src/json/writer.rs +++ b/rust/arrow/src/json/writer.rs @@ -331,8 +331,65 @@ mod tests { r#"{"c1":1,"c2":"a"} {"c1":2,"c2":"b"} {"c1":3,"c2":"c"} -{"c1":null,"c2":"d"} -{"c1":5,"c2":null} +{"c2":"d"} +{"c1":5} +"# + ); + } + + #[test] + fn write_nested_structs() { + let schema = Schema::new(vec![ + Field::new( + "c1", + DataType::Struct(vec![ + Field::new("c11", DataType::Int32, false), + Field::new( + "c12", + DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)]), + false, + ), + ]), + false, + ), + Field::new("c2", DataType::Utf8, false), + ]); + + let a = StructArray::from(vec![ + ( + Field::new("c11", DataType::Int32, false), + Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef, + ), + ( + Field::new( + "c12", + DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)]), + false, + ), + Arc::new(StructArray::from(vec![( + Field::new("c121", DataType::Utf8, false), + Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) + as ArrayRef, + )])) as ArrayRef, + ), + ]); + let b = StringArray::from(vec![Some("a"), Some("b"), Some("c")]); + + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) + .unwrap(); + + let mut buf = Vec::new(); + { + let mut writer = Writer::new(&mut buf); + writer.write_batches(&vec![batch]).unwrap(); + } + + assert_eq!( + String::from_utf8(buf).unwrap(), + r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"} +{"c1":{"c12":{"c121":"f"}},"c2":"b"} +{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"} "# ); } @@ -380,9 +437,4 @@ mod tests { fn write_basic_nulls() { test_write_for_file("test/data/basic_nulls.json"); } - - #[test] - fn write_mixed() { - test_write_for_file("test/data/mixed_arrays.json"); - } } From 3b4f72b2b9cdf5b1424b3ee7fa6444239e6b6b71 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sun, 31 Jan 2021 13:48:06 -0800 Subject: [PATCH 5/9] add test for struct with list field --- rust/arrow/src/array/cast.rs | 10 ++++- rust/arrow/src/array/mod.rs | 5 ++- rust/arrow/src/compute/kernels/sort.rs | 2 +- rust/arrow/src/json/writer.rs | 52 +++++++++++++++++++++++++- 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/rust/arrow/src/array/cast.rs b/rust/arrow/src/array/cast.rs index 7ec0a03a9839..0a67c1495f85 100644 --- a/rust/arrow/src/array/cast.rs +++ b/rust/arrow/src/array/cast.rs @@ -40,12 +40,20 @@ where .expect("Unable to downcast to dictionary array") } -pub fn as_list_array(arr: &ArrayRef) -> &GenericListArray { +pub fn as_generic_list_array(arr: &ArrayRef) -> &GenericListArray { arr.as_any() .downcast_ref::>() .expect("Unable to downcast to list array") } +pub fn as_list_array(arr: &ArrayRef) -> &ListArray { + as_generic_list_array::(arr) +} + +pub fn as_large_list_array(arr: &ArrayRef) -> &LargeListArray { + as_generic_list_array::(arr) +} + macro_rules! array_downcast_fn { ($name: ident, $arrty: ty, $arrty_str:expr) => { #[doc = "Force downcast ArrayRef to "] diff --git a/rust/arrow/src/array/mod.rs b/rust/arrow/src/array/mod.rs index a3d619fe5720..1fae9a7f3e4a 100644 --- a/rust/arrow/src/array/mod.rs +++ b/rust/arrow/src/array/mod.rs @@ -271,8 +271,9 @@ pub use self::ord::{build_compare, DynComparator}; // --------------------- Array downcast helper functions --------------------- pub use self::cast::{ - as_boolean_array, as_dictionary_array, as_largestring_array, as_list_array, - as_null_array, as_primitive_array, as_string_array, as_struct_array, + as_boolean_array, as_dictionary_array, as_generic_list_array, as_large_list_array, + as_largestring_array, as_list_array, as_null_array, as_primitive_array, + as_string_array, as_struct_array, }; // ------------------------------ C Data Interface --------------------------- diff --git a/rust/arrow/src/compute/kernels/sort.rs b/rust/arrow/src/compute/kernels/sort.rs index e86372dc0cde..7a07a54c2faf 100644 --- a/rust/arrow/src/compute/kernels/sort.rs +++ b/rust/arrow/src/compute/kernels/sort.rs @@ -501,7 +501,7 @@ where .downcast_ref::() .map_or_else( || { - let values = as_list_array::(values); + let values = as_generic_list_array::(values); value_indices .iter() .copied() diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs index 829fd55c70fe..ed1a3ef132c3 100644 --- a/rust/arrow/src/json/writer.rs +++ b/rust/arrow/src/json/writer.rs @@ -226,7 +226,7 @@ fn set_column_for_json_rows( }); } DataType::List(_) => { - let listarr = as_list_array::(array); + let listarr = as_list_array(array); rows.iter_mut() .zip(listarr.iter()) .take(row_count) @@ -302,6 +302,7 @@ mod tests { use std::fs::{read_to_string, File}; use std::sync::Arc; + use crate::buffer::*; use crate::json::reader::*; use super::*; @@ -394,6 +395,55 @@ mod tests { ); } + #[test] + fn write_struct_with_list_field() { + let schema = Schema::new(vec![ + Field::new( + "c_struct", + DataType::List(Box::new(Field::new("c_list", DataType::Utf8, false))), + false, + ), + Field::new("c2", DataType::Int32, false), + ]); + + let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]); + // [["a", "a1"], ["b"], ["c"], ["d"], ["e"]] + let a_value_offsets = Buffer::from(&[0, 2, 3, 4, 5, 6].to_byte_slice()); + let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new( + "c_list", + DataType::Utf8, + false, + )))) + .len(5) + .add_buffer(a_value_offsets) + .add_child_data(a_values.data()) + .null_bit_buffer(Buffer::from(vec![0b00011111])) + .build(); + let a = ListArray::from(a_list_data); + + let b = Int32Array::from(vec![1, 2, 3, 4, 5]); + + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) + .unwrap(); + + let mut buf = Vec::new(); + { + let mut writer = Writer::new(&mut buf); + writer.write_batches(&vec![batch]).unwrap(); + } + + assert_eq!( + String::from_utf8(buf).unwrap(), + r#"{"c_struct":["a","a1"],"c2":1} +{"c_struct":["b"],"c2":2} +{"c_struct":["c"],"c2":3} +{"c_struct":["d"],"c2":4} +{"c_struct":["e"],"c2":5} +"# + ); + } + fn test_write_for_file(test_file: &str) { let builder = ReaderBuilder::new() .infer_schema(None) From e454d2bb406a8e1fed4b5d9f42c77e04a551d925 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sun, 31 Jan 2021 18:25:11 -0800 Subject: [PATCH 6/9] suport nested list --- rust/arrow/src/json/writer.rs | 73 ++++++++++++++++++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs index ed1a3ef132c3..a166cfc93ecd 100644 --- a/rust/arrow/src/json/writer.rs +++ b/rust/arrow/src/json/writer.rs @@ -110,6 +110,13 @@ pub fn array_to_json_array(array: &ArrayRef) -> Vec { DataType::UInt64 => primitive_array_to_json::(array), DataType::Float32 => primitive_array_to_json::(array), DataType::Float64 => primitive_array_to_json::(array), + DataType::List(_) => as_list_array(array) + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => Value::Array(array_to_json_array(&v)), + None => Value::Null, + }) + .collect(), _ => { panic!(format!( "Unsupported datatype for array conversion: {:#?}", @@ -407,7 +414,7 @@ mod tests { ]); let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]); - // [["a", "a1"], ["b"], ["c"], ["d"], ["e"]] + // list column rows: ["a", "a1"], ["b"], ["c"], ["d"], ["e"] let a_value_offsets = Buffer::from(&[0, 2, 3, 4, 5, 6].to_byte_slice()); let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new( "c_list", @@ -444,6 +451,70 @@ mod tests { ); } + #[test] + fn write_nested_list() { + let schema = Schema::new(vec![ + Field::new( + "c1", + DataType::List(Box::new(Field::new( + "a", + DataType::List(Box::new(Field::new("b", DataType::Int32, false))), + false, + ))), + false, + ), + Field::new("c2", DataType::Utf8, false), + ]); + + // list column rows: [[1, 2], [3]], [], [[4, 5, 6]] + let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); + + let a_value_offsets = Buffer::from(&[0, 2, 3, 6].to_byte_slice()); + // Construct a list array from the above two + let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new( + "b", + DataType::Int32, + false, + )))) + .len(3) + .add_buffer(a_value_offsets) + .null_bit_buffer(Buffer::from(vec![0b00000111])) + .add_child_data(a_values.data()) + .build(); + + let c1_value_offsets = Buffer::from(&[0, 2, 2, 3].to_byte_slice()); + let c1_list_data = ArrayData::builder(DataType::List(Box::new(Field::new( + "a", + DataType::List(Box::new(Field::new("b", DataType::Int32, false))), + false, + )))) + .len(3) + .add_buffer(c1_value_offsets) + .add_child_data(a_list_data) + .build(); + + let c1 = ListArray::from(c1_list_data); + let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]); + + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]) + .unwrap(); + + let mut buf = Vec::new(); + { + let mut writer = Writer::new(&mut buf); + writer.write_batches(&vec![batch]).unwrap(); + } + + assert_eq!( + String::from_utf8(buf).unwrap(), + r#"{"c1":[[1,2],[3]],"c2":"foo"} +{"c1":[],"c2":"bar"} +{"c1":[[4,5,6]]} +"# + ); + } + fn test_write_for_file(test_file: &str) { let builder = ReaderBuilder::new() .infer_schema(None) From 6bf7b6ecdfbf8847abff1858f6d5b4c5316a4716 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sun, 31 Jan 2021 21:47:14 -0800 Subject: [PATCH 7/9] support list of struct --- rust/arrow/src/json/writer.rs | 228 ++++++++++++++++++++++------------ 1 file changed, 151 insertions(+), 77 deletions(-) diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs index a166cfc93ecd..566b0cf41311 100644 --- a/rust/arrow/src/json/writer.rs +++ b/rust/arrow/src/json/writer.rs @@ -82,6 +82,32 @@ fn primitive_array_to_json(array: &ArrayRef) -> Vec Vec> { + let inner_col_names = array.column_names(); + + let mut inner_objs = iter::repeat(JsonMap::new()) + .take(row_count) + .collect::>>(); + + array + .columns() + .iter() + .enumerate() + .for_each(|(j, struct_col)| { + set_column_for_json_rows( + &mut inner_objs, + row_count, + struct_col, + inner_col_names[j], + ); + }); + + inner_objs +} + pub fn array_to_json_array(array: &ArrayRef) -> Vec { match array.data_type() { DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(), @@ -117,6 +143,11 @@ pub fn array_to_json_array(array: &ArrayRef) -> Vec { None => Value::Null, }) .collect(), + DataType::Struct(_) => { + let jsonmaps = + struct_array_to_jsonmap_array(as_struct_array(array), array.len()); + jsonmaps.into_iter().map(|m| Value::Object(m)).collect() + } _ => { panic!(format!( "Unsupported datatype for array conversion: {:#?}", @@ -205,26 +236,8 @@ fn set_column_for_json_rows( set_column_by_array_type!(as_string_array, col_name, rows, array, row_count); } DataType::Struct(_) => { - let structarr = as_struct_array(array); - let inner_col_names = structarr.column_names(); - - let mut inner_objs = iter::repeat(JsonMap::new()) - .take(row_count) - .collect::>>(); - - structarr - .columns() - .iter() - .enumerate() - .for_each(|(j, struct_col)| { - set_column_for_json_rows( - &mut inner_objs, - row_count, - struct_col, - inner_col_names[j], - ); - }); - + let inner_objs = + struct_array_to_jsonmap_array(as_struct_array(array), row_count); rows.iter_mut() .take(row_count) .zip(inner_objs.into_iter()) @@ -363,7 +376,7 @@ mod tests { Field::new("c2", DataType::Utf8, false), ]); - let a = StructArray::from(vec![ + let c1 = StructArray::from(vec![ ( Field::new("c11", DataType::Int32, false), Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef, @@ -381,10 +394,10 @@ mod tests { )])) as ArrayRef, ), ]); - let b = StringArray::from(vec![Some("a"), Some("b"), Some("c")]); + let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]); let batch = - RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]) .unwrap(); let mut buf = Vec::new(); @@ -404,28 +417,23 @@ mod tests { #[test] fn write_struct_with_list_field() { - let schema = Schema::new(vec![ - Field::new( - "c_struct", - DataType::List(Box::new(Field::new("c_list", DataType::Utf8, false))), - false, - ), - Field::new("c2", DataType::Int32, false), - ]); + let field_c1 = Field::new( + "c1", + DataType::List(Box::new(Field::new("c_list", DataType::Utf8, false))), + false, + ); + let field_c2 = Field::new("c2", DataType::Int32, false); + let schema = Schema::new(vec![field_c1.clone(), field_c2]); let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]); // list column rows: ["a", "a1"], ["b"], ["c"], ["d"], ["e"] let a_value_offsets = Buffer::from(&[0, 2, 3, 4, 5, 6].to_byte_slice()); - let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new( - "c_list", - DataType::Utf8, - false, - )))) - .len(5) - .add_buffer(a_value_offsets) - .add_child_data(a_values.data()) - .null_bit_buffer(Buffer::from(vec![0b00011111])) - .build(); + let a_list_data = ArrayData::builder(field_c1.data_type().clone()) + .len(5) + .add_buffer(a_value_offsets) + .add_child_data(a_values.data()) + .null_bit_buffer(Buffer::from(vec![0b00011111])) + .build(); let a = ListArray::from(a_list_data); let b = Int32Array::from(vec![1, 2, 3, 4, 5]); @@ -442,56 +450,48 @@ mod tests { assert_eq!( String::from_utf8(buf).unwrap(), - r#"{"c_struct":["a","a1"],"c2":1} -{"c_struct":["b"],"c2":2} -{"c_struct":["c"],"c2":3} -{"c_struct":["d"],"c2":4} -{"c_struct":["e"],"c2":5} + r#"{"c1":["a","a1"],"c2":1} +{"c1":["b"],"c2":2} +{"c1":["c"],"c2":3} +{"c1":["d"],"c2":4} +{"c1":["e"],"c2":5} "# ); } #[test] fn write_nested_list() { - let schema = Schema::new(vec![ - Field::new( - "c1", - DataType::List(Box::new(Field::new( - "a", - DataType::List(Box::new(Field::new("b", DataType::Int32, false))), - false, - ))), - false, - ), - Field::new("c2", DataType::Utf8, false), - ]); + let list_inner_type = Field::new( + "a", + DataType::List(Box::new(Field::new("b", DataType::Int32, false))), + false, + ); + let field_c1 = Field::new( + "c1", + DataType::List(Box::new(list_inner_type.clone())), + false, + ); + let field_c2 = Field::new("c2", DataType::Utf8, false); + let schema = Schema::new(vec![field_c1.clone(), field_c2]); // list column rows: [[1, 2], [3]], [], [[4, 5, 6]] let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); let a_value_offsets = Buffer::from(&[0, 2, 3, 6].to_byte_slice()); // Construct a list array from the above two - let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new( - "b", - DataType::Int32, - false, - )))) - .len(3) - .add_buffer(a_value_offsets) - .null_bit_buffer(Buffer::from(vec![0b00000111])) - .add_child_data(a_values.data()) - .build(); + let a_list_data = ArrayData::builder(list_inner_type.data_type().clone()) + .len(3) + .add_buffer(a_value_offsets) + .null_bit_buffer(Buffer::from(vec![0b00000111])) + .add_child_data(a_values.data()) + .build(); let c1_value_offsets = Buffer::from(&[0, 2, 2, 3].to_byte_slice()); - let c1_list_data = ArrayData::builder(DataType::List(Box::new(Field::new( - "a", - DataType::List(Box::new(Field::new("b", DataType::Int32, false))), - false, - )))) - .len(3) - .add_buffer(c1_value_offsets) - .add_child_data(a_list_data) - .build(); + let c1_list_data = ArrayData::builder(field_c1.data_type().clone()) + .len(3) + .add_buffer(c1_value_offsets) + .add_child_data(a_list_data) + .build(); let c1 = ListArray::from(c1_list_data); let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]); @@ -515,6 +515,80 @@ mod tests { ); } + #[test] + fn write_list_of_struct() { + let field_c1 = Field::new( + "c1", + DataType::List(Box::new(Field::new( + "s", + DataType::Struct(vec![ + Field::new("c11", DataType::Int32, false), + Field::new( + "c12", + DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)]), + false, + ), + ]), + false, + ))), + true, + ); + let field_c2 = Field::new("c2", DataType::Int32, false); + let schema = Schema::new(vec![field_c1.clone(), field_c2]); + + let struct_values = StructArray::from(vec![ + ( + Field::new("c11", DataType::Int32, false), + Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef, + ), + ( + Field::new( + "c12", + DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)]), + false, + ), + Arc::new(StructArray::from(vec![( + Field::new("c121", DataType::Utf8, false), + Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) + as ArrayRef, + )])) as ArrayRef, + ), + ]); + + // list column rows (c1): + // [{"c11": 1, "c12": {"c121": "e"}}, {"c12": {"c121": "f"}}], + // null, + // [{"c11": 5, "c12": {"c121": "g"}}] + let c1_value_offsets = Buffer::from(&[0, 2, 2, 3].to_byte_slice()); + let c1_list_data = ArrayData::builder(field_c1.data_type().clone()) + .len(3) + .add_buffer(c1_value_offsets) + .add_child_data(struct_values.data()) + .null_bit_buffer(Buffer::from(vec![0b00000101])) + .build(); + let c1 = ListArray::from(c1_list_data); + + let c2 = Int32Array::from(vec![1, 2, 3]); + + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]) + .unwrap(); + + let mut buf = Vec::new(); + { + let mut writer = Writer::new(&mut buf); + writer.write_batches(&vec![batch]).unwrap(); + } + + assert_eq!( + String::from_utf8(buf).unwrap(), + r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1} +{"c2":2} +{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3} +"# + ); + } + fn test_write_for_file(test_file: &str) { let builder = ReaderBuilder::new() .infer_schema(None) From db98870d03a8c2a469f96d0ae43772acdd036a1f Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sun, 31 Jan 2021 21:50:47 -0800 Subject: [PATCH 8/9] fix clippy warnings --- rust/arrow/src/json/writer.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs index 566b0cf41311..2921fa919a4d 100644 --- a/rust/arrow/src/json/writer.rs +++ b/rust/arrow/src/json/writer.rs @@ -146,7 +146,7 @@ pub fn array_to_json_array(array: &ArrayRef) -> Vec { DataType::Struct(_) => { let jsonmaps = struct_array_to_jsonmap_array(as_struct_array(array), array.len()); - jsonmaps.into_iter().map(|m| Value::Object(m)).collect() + jsonmaps.into_iter().map(Value::Object).collect() } _ => { panic!(format!( @@ -344,7 +344,7 @@ mod tests { let mut buf = Vec::new(); { let mut writer = Writer::new(&mut buf); - writer.write_batches(&vec![batch]).unwrap(); + writer.write_batches(&[batch]).unwrap(); } assert_eq!( @@ -403,7 +403,7 @@ mod tests { let mut buf = Vec::new(); { let mut writer = Writer::new(&mut buf); - writer.write_batches(&vec![batch]).unwrap(); + writer.write_batches(&[batch]).unwrap(); } assert_eq!( @@ -445,7 +445,7 @@ mod tests { let mut buf = Vec::new(); { let mut writer = Writer::new(&mut buf); - writer.write_batches(&vec![batch]).unwrap(); + writer.write_batches(&[batch]).unwrap(); } assert_eq!( @@ -503,7 +503,7 @@ mod tests { let mut buf = Vec::new(); { let mut writer = Writer::new(&mut buf); - writer.write_batches(&vec![batch]).unwrap(); + writer.write_batches(&[batch]).unwrap(); } assert_eq!( @@ -577,7 +577,7 @@ mod tests { let mut buf = Vec::new(); { let mut writer = Writer::new(&mut buf); - writer.write_batches(&vec![batch]).unwrap(); + writer.write_batches(&[batch]).unwrap(); } assert_eq!( @@ -601,7 +601,7 @@ mod tests { let mut buf = Vec::new(); { let mut writer = Writer::new(&mut buf); - writer.write_batches(&vec![batch]).unwrap(); + writer.write_batches(&[batch]).unwrap(); } let result = String::from_utf8(buf).unwrap(); From 1cf556d61efbaf4c12c5be52a790920a89dd7cf4 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Mon, 1 Feb 2021 21:34:49 -0800 Subject: [PATCH 9/9] address feedback, add support for LargeListArray and inline cast function --- rust/arrow/src/array/cast.rs | 5 +++++ rust/arrow/src/json/writer.rs | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/rust/arrow/src/array/cast.rs b/rust/arrow/src/array/cast.rs index 0a67c1495f85..0477f2831f9a 100644 --- a/rust/arrow/src/array/cast.rs +++ b/rust/arrow/src/array/cast.rs @@ -40,16 +40,21 @@ where .expect("Unable to downcast to dictionary array") } +#[doc = "Force downcast ArrayRef to GenericListArray"] pub fn as_generic_list_array(arr: &ArrayRef) -> &GenericListArray { arr.as_any() .downcast_ref::>() .expect("Unable to downcast to list array") } +#[doc = "Force downcast ArrayRef to ListArray"] +#[inline] pub fn as_list_array(arr: &ArrayRef) -> &ListArray { as_generic_list_array::(arr) } +#[doc = "Force downcast ArrayRef to LargeListArray"] +#[inline] pub fn as_large_list_array(arr: &ArrayRef) -> &LargeListArray { as_generic_list_array::(arr) } diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs index 2921fa919a4d..547e26acff08 100644 --- a/rust/arrow/src/json/writer.rs +++ b/rust/arrow/src/json/writer.rs @@ -143,6 +143,13 @@ pub fn array_to_json_array(array: &ArrayRef) -> Vec { None => Value::Null, }) .collect(), + DataType::LargeList(_) => as_large_list_array(array) + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => Value::Array(array_to_json_array(&v)), + None => Value::Null, + }) + .collect(), DataType::Struct(_) => { let jsonmaps = struct_array_to_jsonmap_array(as_struct_array(array), array.len()); @@ -259,6 +266,20 @@ fn set_column_for_json_rows( } }); } + DataType::LargeList(_) => { + let listarr = as_large_list_array(array); + rows.iter_mut() + .zip(listarr.iter()) + .take(row_count) + .for_each(|(row, maybe_value)| { + if let Some(v) = maybe_value { + row.insert( + col_name.to_string(), + Value::Array(array_to_json_array(&v)), + ); + } + }); + } _ => { panic!(format!("Unsupported datatype: {:#?}", array.data_type())); }