From a5990af0f4936be8c852a7b1998abf16a9e38099 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 13 Dec 2022 21:39:35 +0000 Subject: [PATCH 1/2] Upstream newline_delimited_stream and ChunkedStore from DataFusion --- object_store/src/chunked.rs | 247 +++++++++++++++++++++++++++++++ object_store/src/delimited.rs | 270 ++++++++++++++++++++++++++++++++++ object_store/src/lib.rs | 3 + 3 files changed, 520 insertions(+) create mode 100644 object_store/src/chunked.rs create mode 100644 object_store/src/delimited.rs diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs new file mode 100644 index 000000000000..12a4eaf6b90d --- /dev/null +++ b/object_store/src/chunked.rs @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A [`ChunkedStore`] that can be used to test streaming behaviour + +use std::fmt::{Debug, Display, Formatter}; +use std::io::{BufReader, Read}; +use std::ops::Range; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::{BufMut, Bytes, BytesMut}; +use futures::stream::BoxStream; +use futures::StreamExt; +use tokio::io::AsyncWrite; + +use crate::path::Path; +use crate::util::maybe_spawn_blocking; +use crate::{GetResult, ListResult, ObjectMeta, ObjectStore}; +use crate::{MultipartId, Result}; + +/// Wraps a [`ObjectStore`] and makes its get response return chunks +/// in a controllable manner. +/// +/// A `ChunkedStore` makes the memory consumption and performance of +/// the wrapped [`ObjectStore`] worse. It is intended for use within +/// tests, to control the chunks in the produced output streams. For +/// example, it is used to verify the delimiting logic in +/// newline_delimited_stream. +#[derive(Debug)] +pub struct ChunkedStore { + inner: Arc, + chunk_size: usize, +} + +impl ChunkedStore { + /// Creates a new [`ChunkedStore`] with the specified chunk_size + pub fn new(inner: Arc, chunk_size: usize) -> Self { + Self { inner, chunk_size } + } +} + +impl Display for ChunkedStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "ChunkedStore({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for ChunkedStore { + async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + self.inner.put(location, bytes).await + } + + async fn put_multipart( + &self, + location: &Path, + ) -> Result<(MultipartId, Box)> { + self.inner.put_multipart(location).await + } + + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> Result<()> { + self.inner.abort_multipart(location, multipart_id).await + } + + async fn get(&self, location: &Path) -> Result { + match self.inner.get(location).await? { + GetResult::File(std_file, ..) => { + let reader = BufReader::new(std_file); + let chunk_size = self.chunk_size; + Ok(GetResult::Stream( + futures::stream::try_unfold(reader, move |mut reader| async move { + let (r, out, reader) = maybe_spawn_blocking(move || { + let mut out = Vec::with_capacity(chunk_size); + let r = (&mut reader) + .take(chunk_size as u64) + .read_to_end(&mut out) + .map_err(|err| crate::Error::Generic { + store: "ChunkedStore", + source: Box::new(err), + })?; + Ok((r, out, reader)) + }) + .await?; + + match r { + 0 => Ok(None), + _ => Ok(Some((out.into(), reader))), + } + }) + .boxed(), + )) + } + GetResult::Stream(stream) => { + let buffer = BytesMut::new(); + Ok(GetResult::Stream( + futures::stream::unfold( + (stream, buffer, false, self.chunk_size), + |(mut stream, mut buffer, mut exhausted, chunk_size)| async move { + // Keep accumulating bytes until we reach capacity as long as + // the stream can provide them: + if exhausted { + return None; + } + while buffer.len() < chunk_size { + match stream.next().await { + None => { + exhausted = true; + let slice = buffer.split_off(0).freeze(); + return Some(( + Ok(slice), + (stream, buffer, exhausted, chunk_size), + )); + } + Some(Ok(bytes)) => { + buffer.put(bytes); + } + Some(Err(e)) => { + return Some(( + Err(crate::Error::Generic { + store: "ChunkedStore", + source: Box::new(e), + }), + (stream, buffer, exhausted, chunk_size), + )) + } + }; + } + // Return the chunked values as the next value in the stream + let slice = buffer.split_to(chunk_size).freeze(); + Some((Ok(slice), (stream, buffer, exhausted, chunk_size))) + }, + ) + .boxed(), + )) + } + } + } + + async fn get_range(&self, location: &Path, range: Range) -> Result { + self.inner.get_range(location, range).await + } + + async fn head(&self, location: &Path) -> Result { + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + self.inner.delete(location).await + } + + async fn list( + &self, + prefix: Option<&Path>, + ) -> Result>> { + self.inner.list(prefix).await + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy_if_not_exists(from, to).await + } +} + +#[cfg(test)] +mod tests { + use futures::StreamExt; + + use crate::local::LocalFileSystem; + use crate::memory::InMemory; + use crate::path::Path; + use crate::tests::*; + + use super::*; + + #[tokio::test] + async fn test_chunked_basic() { + let location = Path::parse("test").unwrap(); + let store = Arc::new(InMemory::new()); + store + .put(&location, Bytes::from(vec![0; 1001])) + .await + .unwrap(); + + for chunk_size in [10, 20, 31] { + let store = ChunkedStore::new(store.clone(), chunk_size); + let mut s = match store.get(&location).await.unwrap() { + GetResult::Stream(s) => s, + _ => unreachable!(), + }; + + let mut remaining = 1001; + while let Some(next) = s.next().await { + let size = next.unwrap().len(); + let expected = remaining.min(chunk_size); + assert_eq!(size, expected); + remaining -= expected; + } + assert_eq!(remaining, 0); + } + } + + #[tokio::test] + async fn test_chunked() { + let temporary = tempfile::tempdir().unwrap(); + let integrations: &[Arc] = &[ + Arc::new(InMemory::new()), + Arc::new(LocalFileSystem::new_with_prefix(temporary.path()).unwrap()), + ]; + + for integration in integrations { + let integration = ChunkedStore::new(Arc::clone(&integration), 100); + + put_get_delete_list(&integration).await; + list_uses_directories_correctly(&integration).await; + list_with_delimiter(&integration).await; + rename_and_copy(&integration).await; + copy_if_not_exists(&integration).await; + stream_get(&integration).await; + } + } +} diff --git a/object_store/src/delimited.rs b/object_store/src/delimited.rs new file mode 100644 index 000000000000..13214865117a --- /dev/null +++ b/object_store/src/delimited.rs @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility for streaming newline delimited files from object storage + +use std::collections::VecDeque; + +use bytes::Bytes; +use futures::{Stream, StreamExt}; +use snafu::{ensure, Snafu}; + +use super::Result; + +#[derive(Debug, Snafu)] +enum Error { + #[snafu(display("encountered unterminated string"))] + UnterminatedString, + + #[snafu(display("encountered trailing escape character"))] + TrailingEscape, +} + +impl From for super::Error { + fn from(err: Error) -> Self { + Self::Generic { + store: "LineDelimiter", + source: Box::new(err), + } + } +} + +/// The ASCII encoding of `"` +const QUOTE: u8 = b'"'; + +/// The ASCII encoding of `\n` +const NEWLINE: u8 = b'\n'; + +/// The ASCII encoding of `\` +const ESCAPE: u8 = b'\\'; + +/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator +/// of [`Bytes`] containing a whole number of new line delimited records +#[derive(Debug, Default)] +struct LineDelimiter { + /// Complete chunks of [`Bytes`] + complete: VecDeque, + /// Remainder bytes that form the next record + remainder: Vec, + /// True if the last character was the escape character + is_escape: bool, + /// True if currently processing a quoted string + is_quote: bool, +} + +impl LineDelimiter { + /// Creates a new [`LineDelimiter`] with the provided delimiter + fn new() -> Self { + Self::default() + } + + /// Adds the next set of [`Bytes`] + fn push(&mut self, val: impl Into) { + let val: Bytes = val.into(); + + let is_escape = &mut self.is_escape; + let is_quote = &mut self.is_quote; + let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| { + if *is_escape { + *is_escape = false; + None + } else if *v == ESCAPE { + *is_escape = true; + None + } else if *v == QUOTE { + *is_quote = !*is_quote; + None + } else if *is_quote { + None + } else { + (*v == NEWLINE).then_some(idx + 1) + } + }); + + let start_offset = match self.remainder.is_empty() { + true => 0, + false => match record_ends.next() { + Some(idx) => { + self.remainder.extend_from_slice(&val[0..idx]); + self.complete + .push_back(Bytes::from(std::mem::take(&mut self.remainder))); + idx + } + None => { + self.remainder.extend_from_slice(&val); + return; + } + }, + }; + let end_offset = record_ends.last().unwrap_or(start_offset); + if start_offset != end_offset { + self.complete.push_back(val.slice(start_offset..end_offset)); + } + + if end_offset != val.len() { + self.remainder.extend_from_slice(&val[end_offset..]) + } + } + + /// Marks the end of the stream, delimiting any remaining bytes + /// + /// Returns `true` if there is no remaining data to be read + fn finish(&mut self) -> Result { + if !self.remainder.is_empty() { + ensure!(!self.is_quote, UnterminatedStringSnafu); + ensure!(!self.is_quote, TrailingEscapeSnafu); + + self.complete + .push_back(Bytes::from(std::mem::take(&mut self.remainder))) + } + Ok(self.complete.is_empty()) + } +} + +impl Iterator for LineDelimiter { + type Item = Bytes; + + fn next(&mut self) -> Option { + self.complete.pop_front() + } +} + +/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each +/// yielded [`Bytes`] contains a whole number of new line delimited records +/// accounting for `\` style escapes and `"` quotes +pub fn newline_delimited_stream(s: S) -> impl Stream> +where + S: Stream> + Unpin, +{ + let delimiter = LineDelimiter::new(); + + futures::stream::unfold( + (s, delimiter, false), + |(mut s, mut delimiter, mut exhausted)| async move { + loop { + if let Some(next) = delimiter.next() { + return Some((Ok(next), (s, delimiter, exhausted))); + } else if exhausted { + return None; + } + + match s.next().await { + Some(Ok(bytes)) => delimiter.push(bytes), + Some(Err(e)) => return Some((Err(e), (s, delimiter, exhausted))), + None => { + exhausted = true; + match delimiter.finish() { + Ok(true) => return None, + Ok(false) => continue, + Err(e) => return Some((Err(e), (s, delimiter, exhausted))), + } + } + } + } + }, + ) +} + +#[cfg(test)] +mod tests { + use futures::stream::{BoxStream, TryStreamExt}; + + use super::*; + + #[test] + fn test_delimiter() { + let mut delimiter = LineDelimiter::new(); + delimiter.push("hello\nworld"); + delimiter.push("\n\n"); + + assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n")); + assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n")); + assert_eq!(delimiter.next().unwrap(), Bytes::from("\n")); + assert!(delimiter.next().is_none()); + } + + #[test] + fn test_delimiter_escaped() { + let mut delimiter = LineDelimiter::new(); + delimiter.push(""); + delimiter.push("fo\\\n\"foo"); + delimiter.push("bo\n\"bar\n"); + delimiter.push("\"he"); + delimiter.push("llo\"\n"); + assert_eq!( + delimiter.next().unwrap(), + Bytes::from("fo\\\n\"foobo\n\"bar\n") + ); + assert_eq!(delimiter.next().unwrap(), Bytes::from("\"hello\"\n")); + assert!(delimiter.next().is_none()); + + // Verify can push further data + delimiter.push("\"foo\nbar\",\"fiz\\\"inner\\\"\"\nhello"); + assert!(!delimiter.finish().unwrap()); + + assert_eq!( + delimiter.next().unwrap(), + Bytes::from("\"foo\nbar\",\"fiz\\\"inner\\\"\"\n") + ); + assert_eq!(delimiter.next().unwrap(), Bytes::from("hello")); + assert!(delimiter.finish().unwrap()); + assert!(delimiter.next().is_none()); + } + + #[tokio::test] + async fn test_delimiter_stream() { + let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"]; + let input_stream = + futures::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s)))); + let stream = newline_delimited_stream(input_stream); + + let results: Vec<_> = stream.try_collect().await.unwrap(); + assert_eq!( + results, + vec![ + Bytes::from("hello\nworld\n"), + Bytes::from("bingo\n"), + Bytes::from("cupcakes") + ] + ) + } + #[tokio::test] + async fn test_delimiter_unfold_stream() { + let input_stream: BoxStream<'static, Result> = futures::stream::unfold( + VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]), + |mut input| async move { + if !input.is_empty() { + Some((Ok(Bytes::from(input.pop_front().unwrap())), input)) + } else { + None + } + }, + ) + .boxed(); + let stream = newline_delimited_stream(input_stream); + + let results: Vec<_> = stream.try_collect().await.unwrap(); + assert_eq!( + results, + vec![ + Bytes::from("hello\nworld\n"), + Bytes::from("bingo\n"), + Bytes::from("cupcakes") + ] + ) + } +} diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index ec41f381228b..bf606b2d08bf 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -163,6 +163,9 @@ compile_error!("Features 'gcp', 'aws', 'azure' are not supported on wasm."); pub mod aws; #[cfg(feature = "azure")] pub mod azure; +#[cfg(not(target_arch = "wasm32"))] +pub mod chunked; +pub mod delimited; #[cfg(feature = "gcp")] pub mod gcp; pub mod limit; From 558229c4b942245a681a970e12e5a9fbe727e242 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 13 Dec 2022 21:57:48 +0000 Subject: [PATCH 2/2] Clippy --- object_store/src/chunked.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index 12a4eaf6b90d..76865ef96701 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -201,14 +201,14 @@ mod tests { #[tokio::test] async fn test_chunked_basic() { let location = Path::parse("test").unwrap(); - let store = Arc::new(InMemory::new()); + let store: Arc = Arc::new(InMemory::new()); store .put(&location, Bytes::from(vec![0; 1001])) .await .unwrap(); for chunk_size in [10, 20, 31] { - let store = ChunkedStore::new(store.clone(), chunk_size); + let store = ChunkedStore::new(Arc::clone(&store), chunk_size); let mut s = match store.get(&location).await.unwrap() { GetResult::Stream(s) => s, _ => unreachable!(), @@ -234,7 +234,7 @@ mod tests { ]; for integration in integrations { - let integration = ChunkedStore::new(Arc::clone(&integration), 100); + let integration = ChunkedStore::new(Arc::clone(integration), 100); put_get_delete_list(&integration).await; list_uses_directories_correctly(&integration).await;