diff --git a/Cargo.lock b/Cargo.lock index 016f8b5..b0000d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,6 +44,7 @@ dependencies = [ "dummy-waker", "futures", "futures-io", + "once_cell", "proptest", "test-strategy", "tokio", @@ -281,6 +282,12 @@ dependencies = [ "libm", ] +[[package]] +name = "once_cell" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" + [[package]] name = "pin-project-lite" version = "0.2.9" diff --git a/Cargo.toml b/Cargo.toml index f31c896..b279493 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ rustdoc-args = ["--cfg=doc_cfg"] [dependencies] bytes = "1.3.0" futures-io-03 = { package = "futures-io", version = "0.3.25", optional = true } +once_cell = "1.4.0" tokio = { version = "1.0.0", features = ["io-std"], optional = true } [dev-dependencies] diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index d1424bb..5ef4b21 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -53,7 +53,7 @@ impl> Cursor { /// let cursor = Cursor::new(BufList::new()); /// ``` pub fn new(inner: T) -> Cursor { - let data = CursorData::new(inner.as_ref()); + let data = CursorData::new(); Cursor { inner, data } } @@ -170,7 +170,7 @@ impl> Cursor { /// assert_eq!(cursor.position(), 4); /// ``` pub fn set_position(&mut self, pos: u64) { - self.data.set_pos(pos); + self.data.set_pos(self.inner.as_ref(), pos); } // --- @@ -178,7 +178,7 @@ impl> Cursor { // --- #[cfg(test)] fn assert_invariants(&self) -> anyhow::Result<()> { - self.data.assert_invariants() + self.data.assert_invariants(self.inner.as_ref()) } } @@ -203,7 +203,7 @@ where impl> io::Seek for Cursor { fn seek(&mut self, style: SeekFrom) -> io::Result { - self.data.seek_impl(style) + self.data.seek_impl(self.inner.as_ref(), style) } #[cfg(seek_convenience)] @@ -234,16 +234,12 @@ impl> io::BufRead for Cursor { } fn consume(&mut self, amt: usize) { - self.data.consume_impl(amt); + self.data.consume_impl(self.inner.as_ref(), amt); } } #[derive(Clone, Debug)] struct CursorData { - /// An index of chunks and their start positions. There's an additional index at the end, which - /// is the length of the list (list.num_bytes()). - start_pos: Box<[u64]>, - /// The chunk number the cursor is pointing to. Kept in sync with pos. /// /// This is within the range [0, self.start_pos.len()). It is self.start_pos.len() - 1 iff pos @@ -255,36 +251,23 @@ struct CursorData { } impl CursorData { - fn new(inner: &BufList) -> Self { - let mut start_pos = Vec::with_capacity(inner.num_chunks() + 1); - let mut next = 0u64; - for chunk in inner.iter() { - start_pos.push(next); - next += chunk.len() as u64; - } - // Add the length of the chunk at the end. - start_pos.push(next); - - Self { - start_pos: start_pos.into_boxed_slice(), - chunk: 0, - pos: 0, - } + fn new() -> Self { + Self { chunk: 0, pos: 0 } } #[cfg(test)] - fn assert_invariants(&self) -> anyhow::Result<()> { + fn assert_invariants(&self, list: &BufList) -> anyhow::Result<()> { use anyhow::ensure; ensure!( - self.pos >= self.start_pos[self.chunk], + self.pos >= list.get_start_pos()[self.chunk], "invariant failed: current position {} >= start position {} (chunk = {})", self.pos, - self.start_pos[self.chunk], + list.get_start_pos()[self.chunk], self.chunk ); - let next_pos = self.start_pos.get(self.chunk + 1).copied().into(); + let next_pos = list.get_start_pos().get(self.chunk + 1).copied().into(); ensure!( Offset::Value(self.pos) < next_pos, "invariant failed: next start position {:?} > current position {} (chunk = {})", @@ -296,13 +279,13 @@ impl CursorData { Ok(()) } - fn seek_impl(&mut self, style: SeekFrom) -> io::Result { + fn seek_impl(&mut self, list: &BufList, style: SeekFrom) -> io::Result { let (base_pos, offset) = match style { SeekFrom::Start(n) => { - self.set_pos(n); + self.set_pos(list, n); return Ok(n); } - SeekFrom::End(n) => (self.num_bytes(), n), + SeekFrom::End(n) => (self.num_bytes(list), n), SeekFrom::Current(n) => (self.pos, n), }; // Can't use checked_add_signed since it was only stabilized in Rust 1.66. This is adapted @@ -315,7 +298,7 @@ impl CursorData { }; match new_pos { Some(n) => { - self.set_pos(n); + self.set_pos(list, n); Ok(self.pos) } None => Err(io::Error::new( @@ -370,7 +353,7 @@ impl CursorData { fn read_exact_impl(&mut self, list: &BufList, buf: &mut [u8]) -> io::Result<()> { // This is the same as read_impl as long as there's enough space. - let remaining = self.num_bytes().saturating_sub(self.pos); + let remaining = self.num_bytes(list).saturating_sub(self.pos); let buf_len = buf.len(); if remaining < buf_len as u64 { return Err(io::Error::new( @@ -383,7 +366,7 @@ impl CursorData { Ok(()) } - fn fill_buf_impl<'a>(&self, list: &'a BufList) -> &'a [u8] { + fn fill_buf_impl<'a>(&'a self, list: &'a BufList) -> &[u8] { const EMPTY_SLICE: &[u8] = &[]; match self.get_chunk_and_pos(list) { Some((chunk, chunk_pos)) => &chunk.as_ref()[chunk_pos..], @@ -392,14 +375,15 @@ impl CursorData { } } - fn consume_impl(&mut self, amt: usize) { - self.set_pos(self.pos + amt as u64); + fn consume_impl(&mut self, list: &BufList, amt: usize) { + self.set_pos(list, self.pos + amt as u64); } - fn set_pos(&mut self, new_pos: u64) { + fn set_pos(&mut self, list: &BufList, new_pos: u64) { match new_pos.cmp(&self.pos) { Ordering::Greater => { - let next_start = self.start_pos.get(self.chunk + 1).copied().into(); + let start_pos = list.get_start_pos(); + let next_start = start_pos.get(self.chunk + 1).copied().into(); if Offset::Value(new_pos) < next_start { // Within the same chunk. } else { @@ -408,7 +392,7 @@ impl CursorData { // n). // // Do a binary search for this element. - match self.start_pos[self.chunk + 1..].binary_search(&new_pos) { + match start_pos[self.chunk + 1..].binary_search(&new_pos) { // We're starting the search from self.chunk + 1, which means that the value // returned from binary_search is 1 less than the actual delta. Ok(delta_minus_one) => { @@ -431,10 +415,11 @@ impl CursorData { } Ordering::Equal => {} Ordering::Less => { - if self.start_pos.get(self.chunk).copied() <= Some(new_pos) { + let start_pos = list.get_start_pos(); + if start_pos.get(self.chunk).copied() <= Some(new_pos) { // Within the same chunk. } else { - match self.start_pos[..self.chunk].binary_search(&new_pos) { + match start_pos[..self.chunk].binary_search(&new_pos) { Ok(chunk) => { // Exactly at the start point of a chunk. self.chunk = chunk; @@ -456,17 +441,20 @@ impl CursorData { } #[inline] - fn get_chunk_and_pos<'a>(&self, list: &'a BufList) -> Option<(&'a Bytes, usize)> { + fn get_chunk_and_pos<'b>(&self, list: &'b BufList) -> Option<(&'b Bytes, usize)> { match list.get_chunk(self.chunk) { Some(chunk) => { // This guarantees that pos is not past the end of the list. debug_assert!( - self.pos < self.num_bytes(), + self.pos < self.num_bytes(list), "self.pos ({}) is less than num_bytes ({})", self.pos, - self.num_bytes() + self.num_bytes(list) ); - Some((chunk, (self.pos - self.start_pos[self.chunk]) as usize)) + Some(( + chunk, + (self.pos - list.get_start_pos()[self.chunk]) as usize, + )) } None => { // pos is past the end of the list. @@ -475,9 +463,9 @@ impl CursorData { } } - fn num_bytes(&self) -> u64 { - *self - .start_pos + fn num_bytes(&self, list: &BufList) -> u64 { + *list + .get_start_pos() .last() .expect("start_pos always has at least one element") } diff --git a/src/imp.rs b/src/imp.rs index 63e22ff..6f564be 100644 --- a/src/imp.rs +++ b/src/imp.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use bytes::{Buf, BufMut, Bytes, BytesMut}; +use once_cell::sync::OnceCell; use std::{ collections::VecDeque, io::IoSlice, @@ -16,6 +17,10 @@ use std::{ pub struct BufList { // Invariant: none of the bufs in this queue are zero-length. bufs: VecDeque, + + /// An index of chunks and their start positions. There's an additional index at the end, which + /// is the length of the list (list.num_bytes()). + start_pos: OnceCell>, } impl BufList { @@ -25,11 +30,27 @@ impl BufList { Self::default() } + #[inline] + pub(crate) fn get_start_pos(&self) -> &[u64] { + self.start_pos.get_or_init(|| { + let mut start_pos = Vec::with_capacity(self.bufs.len() + 1); + let mut next = 0u64; + for chunk in self.bufs.iter() { + start_pos.push(next); + next += chunk.len() as u64; + } + // Add the length of the chunk at the end. + start_pos.push(next); + start_pos.into_boxed_slice() + }) + } + /// Creates a new, empty, `BufList` with the given capacity. #[inline] pub fn with_capacity(capacity: usize) -> Self { Self { bufs: VecDeque::with_capacity(capacity), + start_pos: OnceCell::new(), } } @@ -112,6 +133,9 @@ impl BufList { /// assert_eq!(buf_list.num_chunks(), 2); /// ``` pub fn push_chunk(&mut self, mut data: B) -> Bytes { + // mutable borrow acquired, invalidate oncecell + self.start_pos = OnceCell::new(); + let len = data.remaining(); // `data` is (almost) certainly a `Bytes`, so `copy_to_bytes` should // internally be a cheap refcount bump almost all of the time. @@ -131,6 +155,9 @@ impl BufList { impl Extend for BufList { fn extend>(&mut self, iter: T) { + // mutable borrow acquired, invalidate oncecell + self.start_pos = OnceCell::new(); + for buf in iter.into_iter() { self.push_chunk(buf); } @@ -204,6 +231,9 @@ impl Buf for BufList { } fn advance(&mut self, mut amt: usize) { + // mutable borrow acquired, invalidate oncecell + self.start_pos = OnceCell::new(); + while amt > 0 { let rem = self.bufs[0].remaining(); // If the amount to advance by is less than the first buffer in @@ -224,6 +254,9 @@ impl Buf for BufList { } fn copy_to_bytes(&mut self, len: usize) -> Bytes { + // mutable borrow acquired, invalidate oncecell + self.start_pos = OnceCell::new(); + // If the length of the requested `Bytes` is <= the length of the front // buffer, we can just use its `copy_to_bytes` implementation (which is // just a reference count bump).