Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ rustdoc-args = ["--cfg=doc_cfg"]
[dependencies]
bytes = "1.3.0"
futures-io-03 = { package = "futures-io", version = "0.3.25", optional = true }
once_cell = "1.4.0"
tokio = { version = "1.0.0", features = ["io-std"], optional = true }

[dev-dependencies]
Expand Down
84 changes: 36 additions & 48 deletions src/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<T: AsRef<BufList>> Cursor<T> {
/// let cursor = Cursor::new(BufList::new());
/// ```
pub fn new(inner: T) -> Cursor<T> {
let data = CursorData::new(inner.as_ref());
let data = CursorData::new();
Cursor { inner, data }
}

Expand Down Expand Up @@ -170,15 +170,15 @@ impl<T: AsRef<BufList>> Cursor<T> {
/// assert_eq!(cursor.position(), 4);
/// ```
pub fn set_position(&mut self, pos: u64) {
self.data.set_pos(pos);
self.data.set_pos(self.inner.as_ref(), pos);
}

// ---
// Helper methods
// ---
#[cfg(test)]
fn assert_invariants(&self) -> anyhow::Result<()> {
self.data.assert_invariants()
self.data.assert_invariants(self.inner.as_ref())
}
}

Expand All @@ -203,7 +203,7 @@ where

impl<T: AsRef<BufList>> io::Seek for Cursor<T> {
fn seek(&mut self, style: SeekFrom) -> io::Result<u64> {
self.data.seek_impl(style)
self.data.seek_impl(self.inner.as_ref(), style)
}

#[cfg(seek_convenience)]
Expand Down Expand Up @@ -234,16 +234,12 @@ impl<T: AsRef<BufList>> io::BufRead for Cursor<T> {
}

fn consume(&mut self, amt: usize) {
self.data.consume_impl(amt);
self.data.consume_impl(self.inner.as_ref(), amt);
}
}

#[derive(Clone, Debug)]
struct CursorData {
/// An index of chunks and their start positions. There's an additional index at the end, which
/// is the length of the list (list.num_bytes()).
start_pos: Box<[u64]>,

/// The chunk number the cursor is pointing to. Kept in sync with pos.
///
/// This is within the range [0, self.start_pos.len()). It is self.start_pos.len() - 1 iff pos
Expand All @@ -255,36 +251,23 @@ struct CursorData {
}

impl CursorData {
fn new(inner: &BufList) -> Self {
let mut start_pos = Vec::with_capacity(inner.num_chunks() + 1);
let mut next = 0u64;
for chunk in inner.iter() {
start_pos.push(next);
next += chunk.len() as u64;
}
// Add the length of the chunk at the end.
start_pos.push(next);

Self {
start_pos: start_pos.into_boxed_slice(),
chunk: 0,
pos: 0,
}
fn new() -> Self {
Self { chunk: 0, pos: 0 }
}

#[cfg(test)]
fn assert_invariants(&self) -> anyhow::Result<()> {
fn assert_invariants(&self, list: &BufList) -> anyhow::Result<()> {
use anyhow::ensure;

ensure!(
self.pos >= self.start_pos[self.chunk],
self.pos >= list.get_start_pos()[self.chunk],
"invariant failed: current position {} >= start position {} (chunk = {})",
self.pos,
self.start_pos[self.chunk],
list.get_start_pos()[self.chunk],
self.chunk
);

let next_pos = self.start_pos.get(self.chunk + 1).copied().into();
let next_pos = list.get_start_pos().get(self.chunk + 1).copied().into();
ensure!(
Offset::Value(self.pos) < next_pos,
"invariant failed: next start position {:?} > current position {} (chunk = {})",
Expand All @@ -296,13 +279,13 @@ impl CursorData {
Ok(())
}

fn seek_impl(&mut self, style: SeekFrom) -> io::Result<u64> {
fn seek_impl(&mut self, list: &BufList, style: SeekFrom) -> io::Result<u64> {
let (base_pos, offset) = match style {
SeekFrom::Start(n) => {
self.set_pos(n);
self.set_pos(list, n);
return Ok(n);
}
SeekFrom::End(n) => (self.num_bytes(), n),
SeekFrom::End(n) => (self.num_bytes(list), n),
SeekFrom::Current(n) => (self.pos, n),
};
// Can't use checked_add_signed since it was only stabilized in Rust 1.66. This is adapted
Expand All @@ -315,7 +298,7 @@ impl CursorData {
};
match new_pos {
Some(n) => {
self.set_pos(n);
self.set_pos(list, n);
Ok(self.pos)
}
None => Err(io::Error::new(
Expand Down Expand Up @@ -370,7 +353,7 @@ impl CursorData {

fn read_exact_impl(&mut self, list: &BufList, buf: &mut [u8]) -> io::Result<()> {
// This is the same as read_impl as long as there's enough space.
let remaining = self.num_bytes().saturating_sub(self.pos);
let remaining = self.num_bytes(list).saturating_sub(self.pos);
let buf_len = buf.len();
if remaining < buf_len as u64 {
return Err(io::Error::new(
Expand All @@ -383,7 +366,7 @@ impl CursorData {
Ok(())
}

fn fill_buf_impl<'a>(&self, list: &'a BufList) -> &'a [u8] {
fn fill_buf_impl<'a>(&'a self, list: &'a BufList) -> &[u8] {
const EMPTY_SLICE: &[u8] = &[];
match self.get_chunk_and_pos(list) {
Some((chunk, chunk_pos)) => &chunk.as_ref()[chunk_pos..],
Expand All @@ -392,14 +375,15 @@ impl CursorData {
}
}

fn consume_impl(&mut self, amt: usize) {
self.set_pos(self.pos + amt as u64);
fn consume_impl(&mut self, list: &BufList, amt: usize) {
self.set_pos(list, self.pos + amt as u64);
}

fn set_pos(&mut self, new_pos: u64) {
fn set_pos(&mut self, list: &BufList, new_pos: u64) {
match new_pos.cmp(&self.pos) {
Ordering::Greater => {
let next_start = self.start_pos.get(self.chunk + 1).copied().into();
let start_pos = list.get_start_pos();
let next_start = start_pos.get(self.chunk + 1).copied().into();
if Offset::Value(new_pos) < next_start {
// Within the same chunk.
} else {
Expand All @@ -408,7 +392,7 @@ impl CursorData {
// n).
//
// Do a binary search for this element.
match self.start_pos[self.chunk + 1..].binary_search(&new_pos) {
match start_pos[self.chunk + 1..].binary_search(&new_pos) {
// We're starting the search from self.chunk + 1, which means that the value
// returned from binary_search is 1 less than the actual delta.
Ok(delta_minus_one) => {
Expand All @@ -431,10 +415,11 @@ impl CursorData {
}
Ordering::Equal => {}
Ordering::Less => {
if self.start_pos.get(self.chunk).copied() <= Some(new_pos) {
let start_pos = list.get_start_pos();
if start_pos.get(self.chunk).copied() <= Some(new_pos) {
// Within the same chunk.
} else {
match self.start_pos[..self.chunk].binary_search(&new_pos) {
match start_pos[..self.chunk].binary_search(&new_pos) {
Ok(chunk) => {
// Exactly at the start point of a chunk.
self.chunk = chunk;
Expand All @@ -456,17 +441,20 @@ impl CursorData {
}

#[inline]
fn get_chunk_and_pos<'a>(&self, list: &'a BufList) -> Option<(&'a Bytes, usize)> {
fn get_chunk_and_pos<'b>(&self, list: &'b BufList) -> Option<(&'b Bytes, usize)> {
match list.get_chunk(self.chunk) {
Some(chunk) => {
// This guarantees that pos is not past the end of the list.
debug_assert!(
self.pos < self.num_bytes(),
self.pos < self.num_bytes(list),
"self.pos ({}) is less than num_bytes ({})",
self.pos,
self.num_bytes()
self.num_bytes(list)
);
Some((chunk, (self.pos - self.start_pos[self.chunk]) as usize))
Some((
chunk,
(self.pos - list.get_start_pos()[self.chunk]) as usize,
))
}
None => {
// pos is past the end of the list.
Expand All @@ -475,9 +463,9 @@ impl CursorData {
}
}

fn num_bytes(&self) -> u64 {
*self
.start_pos
fn num_bytes(&self, list: &BufList) -> u64 {
*list
.get_start_pos()
.last()
.expect("start_pos always has at least one element")
}
Expand Down
33 changes: 33 additions & 0 deletions src/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use bytes::{Buf, BufMut, Bytes, BytesMut};
use once_cell::sync::OnceCell;
use std::{
collections::VecDeque,
io::IoSlice,
Expand All @@ -16,6 +17,10 @@ use std::{
pub struct BufList {
// Invariant: none of the bufs in this queue are zero-length.
bufs: VecDeque<Bytes>,

/// An index of chunks and their start positions. There's an additional index at the end, which
/// is the length of the list (list.num_bytes()).
start_pos: OnceCell<Box<[u64]>>,
}

impl BufList {
Expand All @@ -25,11 +30,27 @@ impl BufList {
Self::default()
}

#[inline]
pub(crate) fn get_start_pos(&self) -> &[u64] {
self.start_pos.get_or_init(|| {
let mut start_pos = Vec::with_capacity(self.bufs.len() + 1);
let mut next = 0u64;
for chunk in self.bufs.iter() {
start_pos.push(next);
next += chunk.len() as u64;
}
// Add the length of the chunk at the end.
start_pos.push(next);
start_pos.into_boxed_slice()
})
}

/// Creates a new, empty, `BufList` with the given capacity.
#[inline]
pub fn with_capacity(capacity: usize) -> Self {
Self {
bufs: VecDeque::with_capacity(capacity),
start_pos: OnceCell::new(),
}
}

Expand Down Expand Up @@ -112,6 +133,9 @@ impl BufList {
/// assert_eq!(buf_list.num_chunks(), 2);
/// ```
pub fn push_chunk<B: Buf>(&mut self, mut data: B) -> Bytes {
// mutable borrow acquired, invalidate oncecell
self.start_pos = OnceCell::new();

let len = data.remaining();
// `data` is (almost) certainly a `Bytes`, so `copy_to_bytes` should
// internally be a cheap refcount bump almost all of the time.
Expand All @@ -131,6 +155,9 @@ impl BufList {

impl<B: Buf> Extend<B> for BufList {
fn extend<T: IntoIterator<Item = B>>(&mut self, iter: T) {
// mutable borrow acquired, invalidate oncecell
self.start_pos = OnceCell::new();

for buf in iter.into_iter() {
self.push_chunk(buf);
}
Expand Down Expand Up @@ -204,6 +231,9 @@ impl Buf for BufList {
}

fn advance(&mut self, mut amt: usize) {
// mutable borrow acquired, invalidate oncecell
self.start_pos = OnceCell::new();

while amt > 0 {
let rem = self.bufs[0].remaining();
// If the amount to advance by is less than the first buffer in
Expand All @@ -224,6 +254,9 @@ impl Buf for BufList {
}

fn copy_to_bytes(&mut self, len: usize) -> Bytes {
// mutable borrow acquired, invalidate oncecell
self.start_pos = OnceCell::new();

// If the length of the requested `Bytes` is <= the length of the front
// buffer, we can just use its `copy_to_bytes` implementation (which is
// just a reference count bump).
Expand Down