From dd1110b91988345cb1d22ddf4300a613ddf06572 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Thu, 29 Aug 2024 23:52:33 +0800 Subject: [PATCH 1/7] add terminator config to CsvConfig --- datafusion-examples/examples/csv_opener.rs | 1 + datafusion/common/src/config.rs | 13 ++++ .../src/datasource/file_format/options.rs | 9 +++ .../core/src/datasource/physical_plan/csv.rs | 67 +++++++++++++++++++ .../proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/pbjson.rs | 20 ++++++ .../proto-common/src/generated/prost.rs | 3 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 3 + .../proto/src/logical_plan/file_formats.rs | 6 ++ 11 files changed, 125 insertions(+) diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs index 1f45026a214d7..e7b7ead109bc0 100644 --- a/datafusion-examples/examples/csv_opener.rs +++ b/datafusion-examples/examples/csv_opener.rs @@ -47,6 +47,7 @@ async fn main() -> Result<()> { true, b',', b'"', + None, object_store, Some(b'#'), ); diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 37d26c6f00c4a..eb404a0085139 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1628,6 +1628,7 @@ config_namespace! { pub has_header: Option, default = None pub delimiter: u8, default = b',' pub quote: u8, default = b'"' + pub terminator: Option, default = None pub escape: Option, default = None pub double_quote: Option, default = None /// Specifies whether newlines in (quoted) values are supported. @@ -1696,6 +1697,13 @@ impl CsvOptions { self } + /// The character that terminates a row. + /// - default to '\n' + pub fn with_terminator(mut self, terminator: Option) -> Self { + self.terminator = terminator; + self + } + /// The escape character in a row. /// - default is None pub fn with_escape(mut self, escape: Option) -> Self { @@ -1742,6 +1750,11 @@ impl CsvOptions { self.quote } + /// The terminator character. + pub fn terminator(&self) -> Option { + self.terminator + } + /// The escape character. pub fn escape(&self) -> Option { self.escape diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 552977baba17b..101e103d4f34e 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -59,6 +59,8 @@ pub struct CsvReadOptions<'a> { pub delimiter: u8, /// An optional quote character. Defaults to `b'"'`. pub quote: u8, + /// An optional terminator character. Defaults to None (CRLF). + pub terminator: Option, /// An optional escape character. Defaults to None. pub escape: Option, /// If enabled, lines beginning with this byte are ignored. @@ -102,6 +104,7 @@ impl<'a> CsvReadOptions<'a> { schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD, delimiter: b',', quote: b'"', + terminator: None, escape: None, newlines_in_values: false, file_extension: DEFAULT_CSV_EXTENSION, @@ -136,6 +139,12 @@ impl<'a> CsvReadOptions<'a> { self } + /// Specify terminator to use for CSV read + pub fn terminator(mut self, terminator: Option) -> Self { + self.terminator = terminator; + self + } + /// Specify delimiter to use for CSV read pub fn escape(mut self, escape: u8) -> Self { self.escape = Some(escape); diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 5ab32ed36e539..fcf798df88b79 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -77,6 +77,7 @@ pub struct CsvExec { has_header: bool, delimiter: u8, quote: u8, + terminator: Option, escape: Option, comment: Option, newlines_in_values: bool, @@ -98,6 +99,7 @@ pub struct CsvExecBuilder { has_header: bool, delimiter: u8, quote: u8, + terminator: Option, escape: Option, comment: Option, newlines_in_values: bool, @@ -112,6 +114,7 @@ impl CsvExecBuilder { has_header: false, delimiter: b',', quote: b'"', + terminator: None, escape: None, comment: None, newlines_in_values: false, @@ -143,6 +146,14 @@ impl CsvExecBuilder { self } + /// Set the line terminator. If not set, the default is CRLF. + /// + /// The default is None. + pub fn with_terminator(mut self, terminator: Option) -> Self { + self.terminator = terminator; + self + } + /// Set the escape character. /// /// The default is `None` (i.e. quotes cannot be escaped). @@ -191,6 +202,7 @@ impl CsvExecBuilder { has_header, delimiter, quote, + terminator, escape, comment, newlines_in_values, @@ -210,6 +222,7 @@ impl CsvExecBuilder { has_header, delimiter, quote, + terminator, escape, newlines_in_values, metrics: ExecutionPlanMetricsSet::new(), @@ -229,6 +242,7 @@ impl CsvExec { has_header: bool, delimiter: u8, quote: u8, + terminator: Option, escape: Option, comment: Option, newlines_in_values: bool, @@ -238,6 +252,7 @@ impl CsvExec { .with_has_header(has_header) .with_delimeter(delimiter) .with_quote(quote) + .with_terminator(terminator) .with_escape(escape) .with_comment(comment) .with_newlines_in_values(newlines_in_values) @@ -270,6 +285,11 @@ impl CsvExec { self.quote } + /// The line terminator + pub fn terminator(&self) -> Option { + self.terminator + } + /// Lines beginning with this byte are ignored. pub fn comment(&self) -> Option { self.comment @@ -406,6 +426,7 @@ impl ExecutionPlan for CsvExec { delimiter: self.delimiter, quote: self.quote, escape: self.escape, + terminator: self.terminator, object_store, comment: self.comment, }); @@ -441,6 +462,7 @@ impl ExecutionPlan for CsvExec { delimiter: self.delimiter, quote: self.quote, escape: self.escape, + terminator: self.terminator, comment: self.comment, newlines_in_values: self.newlines_in_values, metrics: self.metrics.clone(), @@ -459,6 +481,7 @@ pub struct CsvConfig { has_header: bool, delimiter: u8, quote: u8, + terminator: Option, escape: Option, object_store: Arc, comment: Option, @@ -474,6 +497,7 @@ impl CsvConfig { has_header: bool, delimiter: u8, quote: u8, + terminator: Option, object_store: Arc, comment: Option, ) -> Self { @@ -484,6 +508,7 @@ impl CsvConfig { has_header, delimiter, quote, + terminator, escape: None, object_store, comment, @@ -503,6 +528,9 @@ impl CsvConfig { .with_header(self.has_header) .with_quote(self.quote); + if let Some(terminator) = self.terminator { + builder = builder.with_terminator(terminator); + } if let Some(proj) = &self.file_projection { builder = builder.with_projection(proj.clone()); } @@ -775,6 +803,7 @@ mod tests { .with_has_header(true) .with_delimeter(b',') .with_quote(b'"') + .with_terminator(None) .with_escape(None) .with_comment(None) .with_newlines_in_values(false) @@ -844,6 +873,7 @@ mod tests { .with_has_header(true) .with_delimeter(b',') .with_quote(b'"') + .with_terminator(None) .with_escape(None) .with_comment(None) .with_newlines_in_values(false) @@ -913,6 +943,7 @@ mod tests { .with_has_header(true) .with_delimeter(b',') .with_quote(b'"') + .with_terminator(None) .with_escape(None) .with_comment(None) .with_newlines_in_values(false) @@ -979,6 +1010,7 @@ mod tests { .with_has_header(true) .with_delimeter(b',') .with_quote(b'"') + .with_terminator(None) .with_escape(None) .with_comment(None) .with_newlines_in_values(false) @@ -1044,6 +1076,7 @@ mod tests { .with_has_header(true) .with_delimeter(b',') .with_quote(b'"') + .with_terminator(None) .with_escape(None) .with_comment(None) .with_newlines_in_values(false) @@ -1139,6 +1172,7 @@ mod tests { .with_has_header(true) .with_delimeter(b',') .with_quote(b'"') + .with_terminator(None) .with_escape(None) .with_comment(None) .with_newlines_in_values(false) @@ -1210,6 +1244,37 @@ mod tests { crate::assert_batches_eq!(expected, &result); } + #[tokio::test] + async fn test_terminator() { + let session_ctx = SessionContext::new(); + let store = object_store::memory::InMemory::new(); + + let data = bytes::Bytes::from("a,b 1,2 3,4"); + let path = object_store::path::Path::from("a.csv"); + store.put(&path, data.into()).await.unwrap(); + + let url = Url::parse("memory://").unwrap(); + session_ctx.register_object_store(&url, Arc::new(store)); + + let df = session_ctx + .read_csv("memory:///", CsvReadOptions::new().terminator(Some(b' '))) + .await + .unwrap(); + + let result = df.collect().await.unwrap(); + + let expected = [ + "+---+---+", + "| a | b |", + "+---+---+", + "| 1 | 2 |", + "| 3 | 4 |", + "+---+---+", + ]; + + crate::assert_batches_eq!(expected, &result); + } + #[tokio::test] async fn write_csv_results_error_handling() -> Result<()> { let ctx = SessionContext::new(); @@ -1365,6 +1430,7 @@ mod tests { has_header, delimiter, quote, + terminator, escape, comment, newlines_in_values, @@ -1374,6 +1440,7 @@ mod tests { assert_eq!(has_header, default_options.has_header.unwrap_or(false)); assert_eq!(delimiter, default_options.delimiter); assert_eq!(quote, default_options.quote); + assert_eq!(terminator, default_options.terminator); assert_eq!(escape, default_options.escape); assert_eq!(comment, default_options.comment); assert_eq!( diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 9268ccca0b70e..51e94d2caaf4c 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -423,6 +423,7 @@ message CsvOptions { bytes comment = 13; // Optional comment character as a byte bytes double_quote = 14; // Indicates if quotes are doubled bytes newlines_in_values = 15; // Indicates if newlines are supported in values + bytes terminator = 16; // Optional terminator character as a byte } // Options controlling CSV format diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index feb4c11aa8091..45d275fb488e5 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -863,6 +863,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { has_header: proto_opts.has_header.first().map(|h| *h != 0), delimiter: proto_opts.delimiter[0], quote: proto_opts.quote[0], + terminator: proto_opts.terminator.first().copied(), escape: proto_opts.escape.first().copied(), double_quote: proto_opts.has_header.first().map(|h| *h != 0), newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0), diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 05e57f5585a6a..78ba829f8c50e 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1542,6 +1542,9 @@ impl serde::Serialize for CsvOptions { if !self.newlines_in_values.is_empty() { len += 1; } + if !self.terminator.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] @@ -1598,6 +1601,10 @@ impl serde::Serialize for CsvOptions { #[allow(clippy::needless_borrow)] struct_ser.serialize_field("newlinesInValues", pbjson::private::base64::encode(&self.newlines_in_values).as_str())?; } + if !self.terminator.is_empty() { + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("terminator", pbjson::private::base64::encode(&self.terminator).as_str())?; + } struct_ser.end() } } @@ -1633,6 +1640,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "doubleQuote", "newlines_in_values", "newlinesInValues", + "terminator", ]; #[allow(clippy::enum_variant_names)] @@ -1652,6 +1660,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Comment, DoubleQuote, NewlinesInValues, + Terminator, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1688,6 +1697,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "comment" => Ok(GeneratedField::Comment), "doubleQuote" | "double_quote" => Ok(GeneratedField::DoubleQuote), "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), + "terminator" => Ok(GeneratedField::Terminator), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1722,6 +1732,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut comment__ = None; let mut double_quote__ = None; let mut newlines_in_values__ = None; + let mut terminator__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -1830,6 +1841,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) ; } + GeneratedField::Terminator => { + if terminator__.is_some() { + return Err(serde::de::Error::duplicate_field("terminator")); + } + terminator__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } } } Ok(CsvOptions { @@ -1848,6 +1867,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { comment: comment__.unwrap_or_default(), double_quote: double_quote__.unwrap_or_default(), newlines_in_values: newlines_in_values__.unwrap_or_default(), + terminator: terminator__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index ebc05718a458a..cb8f86a022a6a 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -652,6 +652,9 @@ pub struct CsvOptions { /// Indicates if newlines are supported in values #[prost(bytes = "vec", tag = "15")] pub newlines_in_values: ::prost::alloc::vec::Vec, + /// Optional terminator character as a byte + #[prost(bytes = "vec", tag = "16")] + pub terminator: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 4cf7e73ac9121..3718ccbb0f856 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -910,6 +910,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]), delimiter: vec![opts.delimiter], quote: vec![opts.quote], + terminator: opts.terminator.map_or_else(Vec::new, |e| vec![e]), escape: opts.escape.map_or_else(Vec::new, |e| vec![e]), double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]), newlines_in_values: opts diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index ebc05718a458a..cb8f86a022a6a 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -652,6 +652,9 @@ pub struct CsvOptions { /// Indicates if newlines are supported in values #[prost(bytes = "vec", tag = "15")] pub newlines_in_values: ::prost::alloc::vec::Vec, + /// Optional terminator character as a byte + #[prost(bytes = "vec", tag = "16")] + pub terminator: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 607a3d8642fde..2e3476da6ac05 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -53,6 +53,7 @@ impl CsvOptionsProto { has_header: options.has_header.map_or(vec![], |v| vec![v as u8]), delimiter: vec![options.delimiter], quote: vec![options.quote], + terminator: options.terminator.map_or(vec![], |v| vec![v]), escape: options.escape.map_or(vec![], |v| vec![v]), double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]), compression: options.compression as i32, @@ -87,6 +88,11 @@ impl From<&CsvOptionsProto> for CsvOptions { }, delimiter: proto.delimiter.first().copied().unwrap_or(b','), quote: proto.quote.first().copied().unwrap_or(b'"'), + terminator: if !proto.terminator.is_empty() { + Some(proto.terminator[0]) + } else { + None + }, escape: if !proto.escape.is_empty() { Some(proto.escape[0]) } else { From b98980ba1e6235ad175a7b5bdcd4ab71219f0e57 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Fri, 30 Aug 2024 22:30:47 +0800 Subject: [PATCH 2/7] add test and fix missing builder --- datafusion/core/src/datasource/file_format/csv.rs | 8 ++++++++ .../core/src/datasource/file_format/options.rs | 1 + .../core/src/datasource/physical_plan/csv.rs | 15 +++++++++++---- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index d1ce2afcccf36..52ce5790d1ada 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -247,6 +247,13 @@ impl CsvFormat { self } + /// The character used to indicate the end of a row. + /// - default to None (CRLF) + pub fn with_terminator(mut self, terminator: Option) -> Self { + self.options.terminator = terminator; + self + } + /// Specifies whether newlines in (quoted) values are supported. /// /// Parsing newlines in quoted values may be affected by execution behaviour such as @@ -359,6 +366,7 @@ impl FileFormat for CsvFormat { .with_has_header(has_header) .with_delimeter(self.options.delimiter) .with_quote(self.options.quote) + .with_terminator(self.options.terminator) .with_escape(self.options.escape) .with_comment(self.options.comment) .with_newlines_in_values(newlines_in_values) diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 101e103d4f34e..edeea908abc1e 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -520,6 +520,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_delimiter(self.delimiter) .with_quote(self.quote) .with_escape(self.escape) + .with_terminator(self.terminator) .with_newlines_in_values(self.newlines_in_values) .with_schema_infer_max_rec(self.schema_infer_max_records) .with_file_compression_type(self.file_compression_type.to_owned()); diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index fcf798df88b79..b9a2398000662 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -430,7 +430,6 @@ impl ExecutionPlan for CsvExec { object_store, comment: self.comment, }); - let opener = CsvOpener { config, file_compression_type: self.file_compression_type.to_owned(), @@ -518,6 +517,7 @@ impl CsvConfig { impl CsvConfig { fn open(&self, reader: R) -> Result> { + dbg!(&self.terminator); Ok(self.builder().build(reader)?) } @@ -527,7 +527,6 @@ impl CsvConfig { .with_batch_size(self.batch_size) .with_header(self.has_header) .with_quote(self.quote); - if let Some(terminator) = self.terminator { builder = builder.with_terminator(terminator); } @@ -557,6 +556,7 @@ impl CsvOpener { config: Arc, file_compression_type: FileCompressionType, ) -> Self { + dbg!(&config); Self { config, file_compression_type, @@ -1249,7 +1249,7 @@ mod tests { let session_ctx = SessionContext::new(); let store = object_store::memory::InMemory::new(); - let data = bytes::Bytes::from("a,b 1,2 3,4"); + let data = bytes::Bytes::from("a,b\r1,2\r3,4"); let path = object_store::path::Path::from("a.csv"); store.put(&path, data.into()).await.unwrap(); @@ -1257,7 +1257,7 @@ mod tests { session_ctx.register_object_store(&url, Arc::new(store)); let df = session_ctx - .read_csv("memory:///", CsvReadOptions::new().terminator(Some(b' '))) + .read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\r'))) .await .unwrap(); @@ -1273,6 +1273,13 @@ mod tests { ]; crate::assert_batches_eq!(expected, &result); + + match session_ctx + .read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\n'))) + .await.unwrap().collect().await { + Ok(_) => panic!("Expected error"), + Err(e) => assert_eq!(e.strip_backtrace(), "Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got more than 2"), + } } #[tokio::test] From f2ff11fad713e9dcf11e1b12c265547fb01a51e8 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Sun, 1 Sep 2024 23:38:42 +0800 Subject: [PATCH 3/7] remove the debug message and fix the doc --- datafusion/common/src/config.rs | 2 +- datafusion/core/src/datasource/physical_plan/csv.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index eb404a0085139..b901c389108fd 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1698,7 +1698,7 @@ impl CsvOptions { } /// The character that terminates a row. - /// - default to '\n' + /// - default to None (CRLF) pub fn with_terminator(mut self, terminator: Option) -> Self { self.terminator = terminator; self diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index b9a2398000662..f8a21b1a305db 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -517,7 +517,6 @@ impl CsvConfig { impl CsvConfig { fn open(&self, reader: R) -> Result> { - dbg!(&self.terminator); Ok(self.builder().build(reader)?) } @@ -556,7 +555,6 @@ impl CsvOpener { config: Arc, file_compression_type: FileCompressionType, ) -> Self { - dbg!(&config); Self { config, file_compression_type, From 8a1c9c7e56e57169abe1942476a5746a4548d6fc Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Wed, 4 Sep 2024 01:07:01 +0800 Subject: [PATCH 4/7] support EscapedStringLiteral --- datafusion/sql/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index c32acecaae5fd..2531795a16305 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -268,8 +268,8 @@ pub(crate) fn value_to_string(value: &Value) -> Option { Value::DollarQuotedString(s) => Some(s.to_string()), Value::Number(_, _) | Value::Boolean(_) => Some(value.to_string()), Value::UnicodeStringLiteral(s) => Some(s.to_string()), + Value::EscapedStringLiteral(s) => Some(s.to_string()), Value::DoubleQuotedString(_) - | Value::EscapedStringLiteral(_) | Value::NationalStringLiteral(_) | Value::SingleQuotedByteStringLiteral(_) | Value::DoubleQuotedByteStringLiteral(_) From 6116f9e2843d06274519734eb6ab23db899fe1fc Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Wed, 4 Sep 2024 01:07:37 +0800 Subject: [PATCH 5/7] add create external table tests --- .../core/src/datasource/physical_plan/csv.rs | 61 +++++++++++++++++++ datafusion/core/tests/data/cr_terminator.csv | 1 + .../data/newlines_in_values_cr_terminator.csv | 1 + .../sqllogictest/test_files/csv_files.slt | 20 ++++++ 4 files changed, 83 insertions(+) create mode 100644 datafusion/core/tests/data/cr_terminator.csv create mode 100644 datafusion/core/tests/data/newlines_in_values_cr_terminator.csv diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index f8a21b1a305db..98476b4402da8 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -1280,6 +1280,67 @@ mod tests { } } + #[tokio::test] + async fn test_create_external_table_with_terminator() -> Result<()> { + let ctx = SessionContext::new(); + ctx.sql( + r#" + CREATE EXTERNAL TABLE t1 ( + col1 TEXT, + col2 TEXT + ) STORED AS CSV + LOCATION 'tests/data/cr_terminator.csv' + OPTIONS ('format.terminator' E'\r', 'format.has_header' 'true'); + "#, + ) + .await? + .collect() + .await?; + + let df = ctx.sql(r#"select * from t1"#).await?.collect().await?; + let expected = [ + "+------+--------+", + "| col1 | col2 |", + "+------+--------+", + "| id0 | value0 |", + "| id1 | value1 |", + "| id2 | value2 |", + "| id3 | value3 |", + "+------+--------+", + ]; + crate::assert_batches_eq!(expected, &df); + Ok(()) + } + + #[tokio::test] + async fn test_create_external_table_with_terminator_with_newlines_in_values( + ) -> Result<()> { + let ctx = SessionContext::new(); + ctx.sql(r#" + CREATE EXTERNAL TABLE t1 ( + col1 TEXT, + col2 TEXT + ) STORED AS CSV + LOCATION 'tests/data/newlines_in_values_cr_terminator.csv' + OPTIONS ('format.terminator' E'\r', 'format.has_header' 'true', 'format.newlines_in_values' 'true'); + "#).await?.collect().await?; + + let df = ctx.sql(r#"select * from t1"#).await?.collect().await?; + let expected = [ + "+-------+-----------------------------+", + "| col1 | col2 |", + "+-------+-----------------------------+", + "| 1 | hello\rworld |", + "| 2 | something\relse |", + "| 3 | \rmany\rlines\rmake\rgood test\r |", + "| 4 | unquoted |", + "| value | end |", + "+-------+-----------------------------+", + ]; + crate::assert_batches_eq!(expected, &df); + Ok(()) + } + #[tokio::test] async fn write_csv_results_error_handling() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/core/tests/data/cr_terminator.csv b/datafusion/core/tests/data/cr_terminator.csv new file mode 100644 index 0000000000000..f2a5d09a4c19e --- /dev/null +++ b/datafusion/core/tests/data/cr_terminator.csv @@ -0,0 +1 @@ +c1,c2 id0,value0 id1,value1 id2,value2 id3,value3 \ No newline at end of file diff --git a/datafusion/core/tests/data/newlines_in_values_cr_terminator.csv b/datafusion/core/tests/data/newlines_in_values_cr_terminator.csv new file mode 100644 index 0000000000000..2f6557d60ec51 --- /dev/null +++ b/datafusion/core/tests/data/newlines_in_values_cr_terminator.csv @@ -0,0 +1 @@ +id,message 1,"hello world" 2,"something else" 3," many lines make good test " 4,unquoted value,end \ No newline at end of file diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index 7cb21abdba10e..c140711f58418 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -336,3 +336,23 @@ id message 05)good test 4 unquoted value end + +statement ok +CREATE EXTERNAL TABLE stored_table_with_cr_terminator ( +col1 TEXT, +col2 TEXT +) STORED AS CSV +LOCATION '../core/tests/data/cr_terminator.csv' +OPTIONS ('format.terminator' E'\r', 'format.has_header' 'true'); + +# TODO: It should be passed but got the error: External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid +# query TT +# select * from stored_table_with_cr_terminator; +# ---- +# id0 value0 +# id1 value1 +# id2 value2 +# id3 value3 + +statement ok +drop table stored_table_with_cr_terminator; From 7b60e540905d54a8def9bc3244dcca7abf191a16 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Wed, 4 Sep 2024 21:59:52 +0800 Subject: [PATCH 6/7] refactor the error assertion --- datafusion/core/src/datasource/physical_plan/csv.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 98476b4402da8..6cd1864deb1d4 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -1272,12 +1272,14 @@ mod tests { crate::assert_batches_eq!(expected, &result); - match session_ctx + let e = session_ctx .read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\n'))) - .await.unwrap().collect().await { - Ok(_) => panic!("Expected error"), - Err(e) => assert_eq!(e.strip_backtrace(), "Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got more than 2"), - } + .await + .unwrap() + .collect() + .await + .unwrap_err(); + assert_eq!(e.strip_backtrace(), "Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got more than 2") } #[tokio::test] From 4597f04e9ebcf1d0fae81f11bdc169ed4bee9182 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Thu, 5 Sep 2024 00:45:14 +0800 Subject: [PATCH 7/7] add issue reference --- datafusion/sqllogictest/test_files/csv_files.slt | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index c140711f58418..d6600e06dc1ca 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -346,6 +346,7 @@ LOCATION '../core/tests/data/cr_terminator.csv' OPTIONS ('format.terminator' E'\r', 'format.has_header' 'true'); # TODO: It should be passed but got the error: External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid +# See the issue: https://github.com/apache/datafusion/issues/12328 # query TT # select * from stored_table_with_cr_terminator; # ----