Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address comments
  • Loading branch information
XiangpengHao committed Jan 16, 2025
commit 077e99b346b9c46cdc7bb0b9b8cd281835a9ce55
10 changes: 5 additions & 5 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ use crate::{PutPayload, Result};
#[derive(Debug)]
pub struct ChunkedStore {
inner: Arc<dyn ObjectStore>,
chunk_size: u64,
chunk_size: usize, // chunks are in memory, so we use usize not u64
}

impl ChunkedStore {
/// Creates a new [`ChunkedStore`] with the specified chunk_size
pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: u64) -> Self {
pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
Self { inner, chunk_size }
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ impl ObjectStore for ChunkedStore {
if exhausted {
return None;
}
while buffer.len() < chunk_size as usize {
while buffer.len() < chunk_size {
match stream.next().await {
None => {
exhausted = true;
Expand All @@ -125,7 +125,7 @@ impl ObjectStore for ChunkedStore {
};
}
// Return the chunked values as the next value in the stream
let slice = buffer.split_to(chunk_size as usize).freeze();
let slice = buffer.split_to(chunk_size).freeze();
Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
},
)
Expand Down Expand Up @@ -204,7 +204,7 @@ mod tests {
let mut remaining = 1001;
while let Some(next) = s.next().await {
let size = next.unwrap().len() as u64;
let expected = remaining.min(chunk_size);
let expected = remaining.min(chunk_size as u64);
assert_eq!(size, expected);
remaining -= expected;
}
Expand Down
10 changes: 7 additions & 3 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ pub struct ObjectMeta {
pub location: Path,
/// The last modified time
pub last_modified: DateTime<Utc>,
/// The size in bytes of the object.
/// The size in bytes of the object.
///
/// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
pub size: u64,
Expand Down Expand Up @@ -1064,7 +1064,11 @@ impl GetResult {
path: path.clone(),
})?;

let mut buffer = Vec::with_capacity(len as usize);
let mut buffer = if let Ok(len) = len.try_into() {
Vec::with_capacity(len)
} else {
Vec::new()
};
file.take(len as _)
.read_to_end(&mut buffer)
.map_err(|source| local::Error::UnableToReadBytes { source, path })?;
Expand Down Expand Up @@ -1097,7 +1101,7 @@ impl GetResult {
match self.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
const CHUNK_SIZE: u64 = 8 * 1024;
const CHUNK_SIZE: usize = 8 * 1024;
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
}
GetResultPayload::Stream(s) => s,
Expand Down
12 changes: 9 additions & 3 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ pub(crate) fn chunked_stream(
mut file: File,
path: PathBuf,
range: Range<u64>,
chunk_size: u64,
chunk_size: usize,
) -> BoxStream<'static, Result<Bytes, super::Error>> {
futures::stream::once(async move {
let (file, path) = maybe_spawn_blocking(move || {
Expand All @@ -841,8 +841,14 @@ pub(crate) fn chunked_stream(
return Ok(None);
}

let to_read = remaining.min(chunk_size);
let mut buffer = Vec::with_capacity(to_read as usize);
let to_read = remaining.min(chunk_size as u64);
let cap = usize::try_from(to_read).map_err(|_e| Error::InvalidRange {
source: InvalidGetRange::TooLarge {
requested: to_read,
max: usize::MAX as u64,
},
})?;
let mut buffer = Vec::with_capacity(cap);
let read = (&mut file)
.take(to_read)
.read_to_end(&mut buffer)
Expand Down
15 changes: 13 additions & 2 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,19 @@ impl ObjectStore for InMemory {
let r = GetRange::Bounded(range.clone())
.as_range(entry.data.len() as u64)
.map_err(|source| Error::Range { source })?;
let r = r.start as usize..r.end as usize;
Ok(entry.data.slice(r))
let r_end = usize::try_from(r.end).map_err(|_e| Error::Range {
source: InvalidGetRange::TooLarge {
requested: r.end,
max: usize::MAX as u64,
},
})?;
let r_start = usize::try_from(r.start).map_err(|_e| Error::Range {
source: InvalidGetRange::TooLarge {
requested: r.start,
max: usize::MAX as u64,
},
})?;
Ok(entry.data.slice(r_start..r_end))
})
.collect()
}
Expand Down
27 changes: 19 additions & 8 deletions object_store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ pub enum GetRange {
/// an error will be returned. Additionally, if the range ends after the end
/// of the object, the entire remainder of the object will be returned.
/// Otherwise, the exact requested range will be returned.
///
/// Note that range is u64 (i.e., not usize),
/// as `object_store` supports 32-bit architectures such as WASM
Bounded(Range<u64>),
/// Request all bytes starting from a given byte offset
Offset(u64),
Expand All @@ -211,19 +214,27 @@ pub(crate) enum InvalidGetRange {

#[error("Range started at {start} and ended at {end}")]
Inconsistent { start: u64, end: u64 },

#[error("Range {requested} is larger than system memory limit {max}")]
TooLarge { requested: u64, max: u64 },
}

impl GetRange {
pub(crate) fn is_valid(&self) -> Result<(), InvalidGetRange> {
match self {
Self::Bounded(r) if r.end <= r.start => {
if let Self::Bounded(r) = self {
if r.end <= r.start {
return Err(InvalidGetRange::Inconsistent {
start: r.start,
end: r.end,
});
}
_ => (),
};
if (r.end - r.start) > usize::MAX as u64 {
return Err(InvalidGetRange::TooLarge {
requested: r.start,
max: usize::MAX as u64,
});
}
}
Ok(())
}

Expand Down Expand Up @@ -333,9 +344,9 @@ mod tests {
&ranges,
|range| {
fetches.push(range.clone());
futures::future::ready(Ok(Bytes::from(
src[range.start as usize..range.end as usize].to_vec(),
)))
let start = usize::try_from(range.start).unwrap();
let end = usize::try_from(range.end).unwrap();
futures::future::ready(Ok(Bytes::from(src[start..end].to_vec())))
},
coalesce,
)
Expand All @@ -346,7 +357,7 @@ mod tests {
for (range, bytes) in ranges.iter().zip(coalesced) {
assert_eq!(
bytes.as_ref(),
&src[range.start as usize..range.end as usize]
&src[usize::try_from(range.start).unwrap()..usize::try_from(range.end).unwrap()]
);
}
fetches
Expand Down
Loading