Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
fce6481
Initial (broken) copy
criccomini May 6, 2025
9c72689
Strip `ObjectStoreUrl` and use instead of
criccomini May 6, 2025
85dbce4
Remove wasm32
criccomini May 6, 2025
ff551ea
Create new \s if not exist on get
criccomini May 6, 2025
0bedcac
Add registry tests
criccomini May 6, 2025
395f32d
Add \
criccomini May 6, 2025
48fd0bf
Fix rustdocs and wasm
criccomini May 6, 2025
9b44140
cargo fmt
criccomini May 6, 2025
81a05b0
Remove dashmap
criccomini May 7, 2025
a09bd7e
Placate clippy
criccomini May 7, 2025
658ba5e
Expose parse_url_opts options when calling get
criccomini May 7, 2025
9905e92
Revert "Expose parse_url_opts options when calling get"
criccomini May 8, 2025
0a61733
Update src/registry.rs
criccomini May 9, 2025
fb4c955
Update src/registry.rs
criccomini May 9, 2025
e5ec53a
Rename list_urls
criccomini May 9, 2025
6a51b0b
Fix doc formatting for fmt
criccomini May 9, 2025
005c9f2
Add more tests for file scheme checking
criccomini May 9, 2025
bdb8194
Make wasm32 happy
criccomini May 9, 2025
5a01bd8
Remove get_store_key and make DefaultObjectStore really dumb
criccomini May 12, 2025
b19c26e
Some more docs
criccomini May 12, 2025
5e622d7
Add a get_url method as well
criccomini May 12, 2025
3ef24cd
Clippy!
criccomini May 12, 2025
ac7f6ea
Clarify how the default registry works
criccomini May 12, 2025
b667f3e
Test url match behavior
criccomini May 12, 2025
72b1875
Add prefix object store registry
criccomini May 12, 2025
5cb90ae
Docs and pub
criccomini May 12, 2025
a00db85
Fix rustdocs
criccomini May 12, 2025
e468e6c
Add tests for PrefixObjectStoreRegistry
criccomini May 12, 2025
4ef9923
Clippy
criccomini May 12, 2025
fe04c89
Add a more builder-ish pattern for prefix reg
criccomini May 12, 2025
e8decc8
Add Parsing object store registry
criccomini May 12, 2025
2145159
Add proper test for parser object store
criccomini May 13, 2025
39efe63
Misc cleanup
criccomini May 13, 2025
970dc13
Clippy!
criccomini May 13, 2025
2f87a22
Revert silly clone
criccomini May 13, 2025
227dcba
More clippy sigh
criccomini May 13, 2025
b7937cc
Fix test_url_http
criccomini May 13, 2025
a87ecd2
Remove get_prefix method
criccomini May 13, 2025
b9a97b6
Remove prefix and parser object store registries
criccomini May 13, 2025
25a156d
Use parse_opts with default object store registry
criccomini May 13, 2025
5d16b15
Add a test for map_url_to_key
criccomini May 13, 2025
1d530fd
Clean up docs
criccomini May 13, 2025
c81f155
Clippy
criccomini May 13, 2025
2a076c9
More clippy
criccomini May 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ pub mod local;
pub mod memory;
pub mod path;
pub mod prefix;
pub mod registry;
#[cfg(feature = "cloud")]
pub mod signer;
pub mod throttle;
Expand Down
12 changes: 7 additions & 5 deletions src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,10 @@ mod tests {
use http::{header::USER_AGENT, Response};

let server = MockServer::new().await;

server.push_fn(|r| {
assert_eq!(r.uri().path(), "/foo/bar");
assert_eq!(r.headers().get(USER_AGENT).unwrap(), "test_url");
let (tx, rx) = tokio::sync::oneshot::channel();
server.push_fn(move |req| {
let hdr = req.headers().get(USER_AGENT).cloned();
let _ = tx.send(hdr);
Response::new(String::new())
});

Expand All @@ -366,7 +366,9 @@ mod tests {
let url = test.parse().unwrap();
let (store, path) = parse_url_opts(&url, opts).unwrap();
assert_eq!(path.as_ref(), "foo/bar");
store.get(&path).await.unwrap();
let _ = store.get(&Path::from("/foo/bar")).await.unwrap();
let got = rx.await.expect("handler never ran");
assert_eq!(got.unwrap(), "test_url");

server.shutdown().await;
}
Expand Down
305 changes: 305 additions & 0 deletions src/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! ObjectStoreRegistry holds object stores at runtime with a URL for each store.
//! The registry serves as a cache for object stores to avoid repeated creation.
use crate::{parse_url, Error, ObjectStore};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use url::Url;

type GetStoreResult = Result<Option<(Arc<dyn ObjectStore>, Url)>, Error>;

/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance. The meaning of
/// a URL mapping depends on the [`ObjectStoreRegistry`] implementation. See implementation
/// docs for more details.
pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to add some examples of these two use cases -- we don't have to do it in the first PR, but maybe we could do it in a follow on

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do! I think it's fine as part of this PR.

/// Register a new store for the provided URL
///
/// ## Returns
///
/// If a store with the same URL mapping exists before, it is replaced and returned along
/// with the mapped URL.
fn register_store(
&self,
url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<(Arc<dyn ObjectStore>, Url)>;

/// Get a store for the provided URL. The input URL is mapped to an [`ObjectStore`]
/// instance based on the [`ObjectStoreRegistry`] implementation. See implementation docs
/// for more details.
///
/// If no [`ObjectStore`] is found for the `url`, an [`ObjectStore`] may be lazily be
/// created and registered. The logic for doing so is left to each [`ObjectStoreRegistry`]
/// implementation.
///
/// ## Returns
///
/// If a store is found for the `url`, it is returned along with the mapped URL.
///
/// If no store is found for the `url`, `None` is returned.
///
/// ## Errors
///
/// Returns an error if an implementation can't parse a URL or create a store.
fn get_store(&self, url: &Url) -> GetStoreResult;

/// List all registered store URLs. These are the URL mappings for all registered stores.
///
/// ## Returns
///
/// A vector of all registered store URLs.
fn get_store_urls(&self) -> Vec<Url>;
}

/// An [`ObjectStoreRegistry`] implementation that maps URLs to object stores using
/// `scheme://host:port`.
///
/// ## Examples
///
/// Registering a store:
///
/// ```
/// # use std::sync::Arc;
/// # use url::Url;
/// # use object_store::ObjectStore;
/// # use object_store::memory::InMemory;
/// # use object_store::registry::{ObjectStoreRegistry, DefaultObjectStoreRegistry};
/// let registry = DefaultObjectStoreRegistry::new();
/// let url = Url::parse("memory://path/to/store").unwrap();
/// let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
/// registry.register_store(&url, Arc::clone(&store));
/// let (retrieved_store, mapped_url) = registry.get_store(&url).unwrap().unwrap();
/// assert_eq!(mapped_url.as_str(), "memory://");
/// assert!(Arc::ptr_eq(&retrieved_store, &store));
/// ```
///
/// Dynamically creating a store:
///
/// ```
/// # use std::sync::Arc;
/// # use url::Url;
/// # use object_store::ObjectStore;
/// # use object_store::registry::{ObjectStoreRegistry, DefaultObjectStoreRegistry};
/// let registry = DefaultObjectStoreRegistry::new();
/// let url = Url::parse("memory://path/to/store").unwrap();
/// let (store, mapped_url) = registry.get_store(&url).unwrap().unwrap();
/// assert_eq!(mapped_url.as_str(), "memory://");
/// ```
pub struct DefaultObjectStoreRegistry {
/// A map from URL to object store that serve list / read operations for the store
object_stores: RwLock<HashMap<Url, Arc<dyn ObjectStore>>>,
}

impl std::fmt::Debug for DefaultObjectStoreRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let stores = self.object_stores.read().unwrap();
f.debug_struct("DefaultObjectStoreRegistry")
.field("urls", &stores.keys().cloned().collect::<Vec<_>>())
.finish()
}
}

impl Default for DefaultObjectStoreRegistry {
fn default() -> Self {
Self::new()
}
}

impl DefaultObjectStoreRegistry {
/// Create a new [`DefaultObjectStoreRegistry`] with no registered stores.
pub fn new() -> Self {
let object_stores = RwLock::new(HashMap::new());
Self { object_stores }
}

/// Get the key of a url for object store registration. Mapping rules are as follows:
///
/// - Any URL with a `file` scheme is mapped to `file:///`
/// - Any URL with a `memory` scheme is mapped to `memory://`
/// - All other URLs are mapped to `scheme://host:port`
///
/// ## Returns
///
/// A [`Url`] with the same scheme and host as the input, but with an empty path.
///
/// ## Errors
///
/// Returns an error if the input is not a valid URL.
fn map_url_to_key(url: &Url) -> Url {
match url.scheme() {
// Don't include the host for memory or path. Just hard code it
// since [`crate::parse::parse_url`] expects these to never have
// a "host" component.
"memory" => Url::parse(&format!("{}://", url.scheme())),
// Note this will handle file://path/to/file as well
// as file:///path/to/file even though file://path/to/file
// is not technically a valid URL.
"file" => Url::parse(&format!("{}:///", url.scheme())),
_ => Url::parse(&format!(
"{}://{}",
url.scheme(),
&url[url::Position::BeforeHost..url::Position::AfterPort],
)),
}
.unwrap()
}
}

impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
/// Register a new store for the provided URL
///
/// If a store with the same URL existed before, it is replaced and returned
fn register_store(
&self,
url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<(Arc<dyn ObjectStore>, Url)> {
let key = Self::map_url_to_key(url);
let mut stores = self.object_stores.write().unwrap();
stores
.insert(key.clone(), store)
.map(|old_store| (old_store, key))
}

/// Get a store that was registered with the provided URL.
///
/// If no store was registered with the provided URL, `None` is returned.
fn get_store(&self, url: &Url) -> Result<Option<(Arc<dyn ObjectStore>, Url)>, crate::Error> {
let key = Self::map_url_to_key(url);
eprintln!("key: {key}");
let mut stores = self.object_stores.write().unwrap();
if let Some(store) = stores.get(&key) {
Ok(Some((Arc::clone(store), key)))
} else {
let (store, _) = parse_url(&key)?;
let store: Arc<dyn ObjectStore> = store.into();
stores.insert(key.clone(), Arc::clone(&store));
Ok(Some((store, key)))
}
}

/// Returns a vector of all registered store URLs.
fn get_store_urls(&self) -> Vec<Url> {
let stores = self.object_stores.read().unwrap();
stores.keys().cloned().collect()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::memory::InMemory;

#[test]
fn test_register_store() {
let registry = DefaultObjectStoreRegistry::new();
let url = Url::parse("memory://").unwrap();
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let old_store = registry.register_store(&url, Arc::clone(&store));
assert!(old_store.is_none());
let new_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let (old_store, mapped_url) = registry
.register_store(&url, Arc::clone(&new_store))
.unwrap();
assert_eq!(mapped_url.as_str(), "memory://");
assert!(Arc::ptr_eq(&old_store, &store));
let (retrieved_store, mapped_url) = registry.get_store(&url).unwrap().unwrap();
assert_eq!(mapped_url.as_str(), "memory://");
assert!(Arc::ptr_eq(&retrieved_store, &new_store));
}

#[tokio::test]
async fn test_dynamic_register_store() {
let registry = DefaultObjectStoreRegistry::new();
let url = Url::parse("memory://").unwrap();
let (first_store, mapped_url) = registry.get_store(&url).unwrap().unwrap();
assert_eq!(mapped_url.as_str(), "memory://");
first_store.put(&"/foo".into(), "bar".into()).await.unwrap();
let (second_store, mapped_url) = registry.get_store(&url).unwrap().unwrap();
assert_eq!(mapped_url.as_str(), "memory://");
eprintln!("first_store: {:?}", first_store);
eprintln!("second_store: {:?}", second_store);
assert!(Arc::ptr_eq(&second_store, &first_store));
let val = second_store
.get(&"/foo".into())
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(val.as_ref(), b"bar");
}

#[test]
fn test_list_urls() {
let registry = DefaultObjectStoreRegistry::new();
let url = Url::parse("memory://").unwrap();
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
registry.register_store(&url, store);
let urls = registry.get_store_urls();
assert_eq!(urls.len(), 1);
assert_eq!(urls[0].as_str(), "memory://");
}

#[test]
fn test_get_child_url() {
let registry = DefaultObjectStoreRegistry::new();
let base_url = Url::parse("memory://").unwrap();
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
registry.register_store(&base_url, Arc::clone(&store));
let subprefix_url = Url::parse("memory://foo/bar").unwrap();
let (retrieved_store, mapped_url) = registry.get_store(&subprefix_url).unwrap().unwrap();
assert_eq!(mapped_url.as_str(), "memory://");
assert!(Arc::ptr_eq(&retrieved_store, &store));
}

#[test]
fn test_map_url_to_key() {
let test_cases = [
("s3://bucket", "s3://bucket"),
("s3://bucket/path", "s3://bucket"),
("s3://bucket/path?param=value", "s3://bucket"),
("memory://", "memory://"),
("memory://path", "memory://"),
("file:///", "file:///"),
("file:///path", "file:///"),
("http://host:1234", "http://host:1234"),
("http://host:1234/path", "http://host:1234"),
(
"http://user:pass@host:1234/path/to/file",
"http://host:1234",
),
];

for (input, expected) in test_cases {
let input_url = Url::parse(input).unwrap();
let expected_url = Url::parse(expected).unwrap();
let result = DefaultObjectStoreRegistry::map_url_to_key(&input_url);

assert_eq!(
result.as_str(),
expected_url.as_str(),
"Expected '{}' to map to '{}', but got '{}'",
input,
expected,
result
);
}
}
}
2 changes: 1 addition & 1 deletion src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl MultipartUpload for ThrottledUpload {
#[cfg(test)]
mod tests {
use super::*;
use crate::{integration::*, memory::InMemory, GetResultPayload};
use crate::{integration::*, memory::InMemory};
use futures::TryStreamExt;
use tokio::time::Duration;
use tokio::time::Instant;
Expand Down
Loading