From fce6481d121f93b65d62509023799de72ca24e31 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 6 May 2025 09:52:48 -0700 Subject: [PATCH 01/44] Initial (broken) copy --- src/lib.rs | 1 + src/registry.rs | 302 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 303 insertions(+) create mode 100644 src/registry.rs diff --git a/src/lib.rs b/src/lib.rs index 66575b81..086fb3cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -516,6 +516,7 @@ pub mod local; pub mod memory; pub mod path; pub mod prefix; +pub mod registry; #[cfg(feature = "cloud")] pub mod signer; pub mod throttle; diff --git a/src/registry.rs b/src/registry.rs new file mode 100644 index 00000000..b74ce39a --- /dev/null +++ b/src/registry.rs @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store. +//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS +//! and query data inside these systems. + +use dashmap::DashMap; +use datafusion_common::{exec_err, DataFusionError, Result}; +#[cfg(not(target_arch = "wasm32"))] +use object_store::local::LocalFileSystem; +use object_store::ObjectStore; +use std::sync::Arc; +use url::Url; + +/// A parsed URL identifying a particular [`ObjectStore`] instance +/// +/// For example: +/// * `file://` for local file system +/// * `s3://bucket` for AWS S3 bucket +/// * `oss://bucket` for Aliyun OSS bucket +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ObjectStoreUrl { + url: Url, +} + +impl ObjectStoreUrl { + /// Parse an [`ObjectStoreUrl`] from a string + /// + /// # Example + /// ``` + /// # use url::Url; + /// # use datafusion_execution::object_store::ObjectStoreUrl; + /// let object_store_url = ObjectStoreUrl::parse("s3://bucket").unwrap(); + /// assert_eq!(object_store_url.as_str(), "s3://bucket/"); + /// // can also access the underlying `Url` + /// let url: &Url = object_store_url.as_ref(); + /// assert_eq!(url.scheme(), "s3"); + /// assert_eq!(url.host_str(), Some("bucket")); + /// assert_eq!(url.path(), "/"); + /// ``` + pub fn parse(s: impl AsRef) -> Result { + let mut parsed = + Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?; + + let remaining = &parsed[url::Position::BeforePath..]; + if !remaining.is_empty() && remaining != "/" { + return exec_err!( + "ObjectStoreUrl must only contain scheme and authority, got: {remaining}" + ); + } + + // Always set path for consistency + parsed.set_path("/"); + Ok(Self { url: parsed }) + } + + /// An [`ObjectStoreUrl`] for the local filesystem (`file://`) + /// + /// # Example + /// ``` + /// # use datafusion_execution::object_store::ObjectStoreUrl; + /// let local_fs = ObjectStoreUrl::parse("file://").unwrap(); + /// assert_eq!(local_fs, ObjectStoreUrl::local_filesystem()) + /// ``` + pub fn local_filesystem() -> Self { + Self::parse("file://").unwrap() + } + + /// Returns this [`ObjectStoreUrl`] as a string + pub fn as_str(&self) -> &str { + self.as_ref() + } +} + +impl AsRef for ObjectStoreUrl { + fn as_ref(&self) -> &str { + self.url.as_ref() + } +} + +impl AsRef for ObjectStoreUrl { + fn as_ref(&self) -> &Url { + &self.url + } +} + +impl std::fmt::Display for ObjectStoreUrl { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.as_str().fmt(f) + } +} + +/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance, +/// and allows DataFusion to read from different [`ObjectStore`] +/// instances. For example DataFusion might be configured so that +/// +/// 1. `s3://my_bucket/lineitem/` mapped to the `/lineitem` path on an +/// AWS S3 object store bound to `my_bucket` +/// +/// 2. `s3://my_other_bucket/lineitem/` mapped to the (same) +/// `/lineitem` path on a *different* AWS S3 object store bound to +/// `my_other_bucket` +/// +/// When given a [`ListingTableUrl`], DataFusion tries to find an +/// appropriate [`ObjectStore`]. For example +/// +/// ```sql +/// create external table unicorns stored as parquet location 's3://my_bucket/lineitem/'; +/// ``` +/// +/// In this particular case, the url `s3://my_bucket/lineitem/` will be provided to +/// [`ObjectStoreRegistry::get_store`] and one of three things will happen: +/// +/// - If an [`ObjectStore`] has been registered with [`ObjectStoreRegistry::register_store`] with +/// `s3://my_bucket`, that [`ObjectStore`] will be returned +/// +/// - If an AWS S3 object store can be ad-hoc discovered by the url `s3://my_bucket/lineitem/`, this +/// object store will be registered with key `s3://my_bucket` and returned. +/// +/// - Otherwise an error will be returned, indicating that no suitable [`ObjectStore`] could +/// be found +/// +/// This allows for two different use-cases: +/// +/// 1. Systems where object store buckets are explicitly created using DDL, can register these +/// buckets using [`ObjectStoreRegistry::register_store`] +/// +/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`] +/// lazily by providing a custom implementation of [`ObjectStoreRegistry`] +/// +/// +/// [`ListingTableUrl`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTableUrl.html +/// [`ObjectStore`]: object_store::ObjectStore +pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { + /// If a store with the same key existed before, it is replaced and returned + fn register_store( + &self, + url: &Url, + store: Arc, + ) -> Option>; + + /// Get a suitable store for the provided URL. For example: + /// + /// - URL with scheme `file:///` or no scheme will return the default LocalFS store + /// - URL with scheme `s3://bucket/` will return the S3 store + /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store + /// + /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on + /// the `url` and [`ObjectStoreRegistry`] implementation. An [`ObjectStore`] may be lazily + /// created and registered. + fn get_store(&self, url: &Url) -> Result>; +} + +/// The default [`ObjectStoreRegistry`] +pub struct DefaultObjectStoreRegistry { + /// A map from scheme to object store that serve list / read operations for the store + object_stores: DashMap>, +} + +impl std::fmt::Debug for DefaultObjectStoreRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("DefaultObjectStoreRegistry") + .field( + "schemes", + &self + .object_stores + .iter() + .map(|o| o.key().clone()) + .collect::>(), + ) + .finish() + } +} + +impl Default for DefaultObjectStoreRegistry { + fn default() -> Self { + Self::new() + } +} + +impl DefaultObjectStoreRegistry { + /// This will register [`LocalFileSystem`] to handle `file://` paths + #[cfg(not(target_arch = "wasm32"))] + pub fn new() -> Self { + let object_stores: DashMap> = DashMap::new(); + object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); + Self { object_stores } + } + + /// Default without any backend registered. + #[cfg(target_arch = "wasm32")] + pub fn new() -> Self { + let object_stores: DashMap> = DashMap::new(); + Self { object_stores } + } +} + +/// +/// Stores are registered based on the scheme, host and port of the provided URL +/// with a [`LocalFileSystem::new`] automatically registered for `file://` (if the +/// target arch is not `wasm32`). +/// +/// For example: +/// +/// - `file:///my_path` will return the default LocalFS store +/// - `s3://bucket/path` will return a store registered with `s3://bucket` if any +/// - `hdfs://host:port/path` will return a store registered with `hdfs://host:port` if any +impl ObjectStoreRegistry for DefaultObjectStoreRegistry { + fn register_store( + &self, + url: &Url, + store: Arc, + ) -> Option> { + let s = get_url_key(url); + self.object_stores.insert(s, store) + } + + fn get_store(&self, url: &Url) -> Result> { + let s = get_url_key(url); + self.object_stores + .get(&s) + .map(|o| Arc::clone(o.value())) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "No suitable object store found for {url}. See `RuntimeEnv::register_object_store`" + )) + }) + } +} + +/// Get the key of a url for object store registration. +/// The credential info will be removed +fn get_url_key(url: &Url) -> String { + format!( + "{}://{}", + url.scheme(), + &url[url::Position::BeforeHost..url::Position::AfterPort], + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_object_store_url() { + let file = ObjectStoreUrl::parse("file://").unwrap(); + assert_eq!(file.as_str(), "file:///"); + + let url = ObjectStoreUrl::parse("s3://bucket").unwrap(); + assert_eq!(url.as_str(), "s3://bucket/"); + + let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap(); + assert_eq!(url.as_str(), "s3://username:password@host:123/"); + + let err = ObjectStoreUrl::parse("s3://bucket:invalid").unwrap_err(); + assert_eq!(err.strip_backtrace(), "External error: invalid port number"); + + let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err(); + assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?"); + + let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err(); + assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?foo=bar"); + + let err = ObjectStoreUrl::parse("s3://host:123/foo").unwrap_err(); + assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"); + + let err = + ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err(); + assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"); + } + + #[test] + fn test_get_url_key() { + let file = ObjectStoreUrl::parse("file://").unwrap(); + let key = get_url_key(&file.url); + assert_eq!(key.as_str(), "file://"); + + let url = ObjectStoreUrl::parse("s3://bucket").unwrap(); + let key = get_url_key(&url.url); + assert_eq!(key.as_str(), "s3://bucket"); + + let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap(); + let key = get_url_key(&url.url); + assert_eq!(key.as_str(), "s3://host:123"); + } +} \ No newline at end of file From 9c72689beaee1e00e7e680af9bdafc95ad52f2a7 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 6 May 2025 10:09:34 -0700 Subject: [PATCH 02/44] Strip `ObjectStoreUrl` and use instead of --- Cargo.toml | 1 + src/registry.rs | 158 ++++++------------------------------------------ 2 files changed, 20 insertions(+), 139 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b3fd34a9..ab9b2a82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ all-features = true async-trait = "0.1.53" bytes = "1.0" chrono = { version = "0.4.34", default-features = false, features = ["clock"] } +dashmap = "6.1.0" futures = "0.3" http = "1.2.0" humantime = "2.1" diff --git a/src/registry.rs b/src/registry.rs index b74ce39a..ba20435d 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -16,98 +16,22 @@ // under the License. //! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store. -//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS -//! and query data inside these systems. +//! This allows the user to resolve a URL to an ObjectStore at runtime. Unlike +//! [`object_store::parse:parse_url`], this allows for custom logic to be executed +//! when a URL is resolved to an ObjectStore. It also serves as a cache for object stores +//! to avoid repeated creation. use dashmap::DashMap; -use datafusion_common::{exec_err, DataFusionError, Result}; #[cfg(not(target_arch = "wasm32"))] -use object_store::local::LocalFileSystem; -use object_store::ObjectStore; +use crate::local::LocalFileSystem; +use crate::ObjectStore; use std::sync::Arc; use url::Url; -/// A parsed URL identifying a particular [`ObjectStore`] instance -/// -/// For example: -/// * `file://` for local file system -/// * `s3://bucket` for AWS S3 bucket -/// * `oss://bucket` for Aliyun OSS bucket -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct ObjectStoreUrl { - url: Url, -} - -impl ObjectStoreUrl { - /// Parse an [`ObjectStoreUrl`] from a string - /// - /// # Example - /// ``` - /// # use url::Url; - /// # use datafusion_execution::object_store::ObjectStoreUrl; - /// let object_store_url = ObjectStoreUrl::parse("s3://bucket").unwrap(); - /// assert_eq!(object_store_url.as_str(), "s3://bucket/"); - /// // can also access the underlying `Url` - /// let url: &Url = object_store_url.as_ref(); - /// assert_eq!(url.scheme(), "s3"); - /// assert_eq!(url.host_str(), Some("bucket")); - /// assert_eq!(url.path(), "/"); - /// ``` - pub fn parse(s: impl AsRef) -> Result { - let mut parsed = - Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?; - - let remaining = &parsed[url::Position::BeforePath..]; - if !remaining.is_empty() && remaining != "/" { - return exec_err!( - "ObjectStoreUrl must only contain scheme and authority, got: {remaining}" - ); - } - - // Always set path for consistency - parsed.set_path("/"); - Ok(Self { url: parsed }) - } - - /// An [`ObjectStoreUrl`] for the local filesystem (`file://`) - /// - /// # Example - /// ``` - /// # use datafusion_execution::object_store::ObjectStoreUrl; - /// let local_fs = ObjectStoreUrl::parse("file://").unwrap(); - /// assert_eq!(local_fs, ObjectStoreUrl::local_filesystem()) - /// ``` - pub fn local_filesystem() -> Self { - Self::parse("file://").unwrap() - } - - /// Returns this [`ObjectStoreUrl`] as a string - pub fn as_str(&self) -> &str { - self.as_ref() - } -} - -impl AsRef for ObjectStoreUrl { - fn as_ref(&self) -> &str { - self.url.as_ref() - } -} - -impl AsRef for ObjectStoreUrl { - fn as_ref(&self) -> &Url { - &self.url - } -} - -impl std::fmt::Display for ObjectStoreUrl { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - self.as_str().fmt(f) - } -} /// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance, -/// and allows DataFusion to read from different [`ObjectStore`] -/// instances. For example DataFusion might be configured so that +/// and allows users to read from different [`ObjectStore`] +/// instances. For example the registry might be configured so that /// /// 1. `s3://my_bucket/lineitem/` mapped to the `/lineitem` path on an /// AWS S3 object store bound to `my_bucket` @@ -116,13 +40,6 @@ impl std::fmt::Display for ObjectStoreUrl { /// `/lineitem` path on a *different* AWS S3 object store bound to /// `my_other_bucket` /// -/// When given a [`ListingTableUrl`], DataFusion tries to find an -/// appropriate [`ObjectStore`]. For example -/// -/// ```sql -/// create external table unicorns stored as parquet location 's3://my_bucket/lineitem/'; -/// ``` -/// /// In this particular case, the url `s3://my_bucket/lineitem/` will be provided to /// [`ObjectStoreRegistry::get_store`] and one of three things will happen: /// @@ -132,7 +49,7 @@ impl std::fmt::Display for ObjectStoreUrl { /// - If an AWS S3 object store can be ad-hoc discovered by the url `s3://my_bucket/lineitem/`, this /// object store will be registered with key `s3://my_bucket` and returned. /// -/// - Otherwise an error will be returned, indicating that no suitable [`ObjectStore`] could +/// - Otherwise `None` will be returned, indicating that no suitable [`ObjectStore`] could /// be found /// /// This allows for two different use-cases: @@ -143,8 +60,6 @@ impl std::fmt::Display for ObjectStoreUrl { /// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`] /// lazily by providing a custom implementation of [`ObjectStoreRegistry`] /// -/// -/// [`ListingTableUrl`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTableUrl.html /// [`ObjectStore`]: object_store::ObjectStore pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// If a store with the same key existed before, it is replaced and returned @@ -163,7 +78,7 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on /// the `url` and [`ObjectStoreRegistry`] implementation. An [`ObjectStore`] may be lazily /// created and registered. - fn get_store(&self, url: &Url) -> Result>; + fn get_store(&self, url: &Url) -> Option>; } /// The default [`ObjectStoreRegistry`] @@ -173,7 +88,7 @@ pub struct DefaultObjectStoreRegistry { } impl std::fmt::Debug for DefaultObjectStoreRegistry { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DefaultObjectStoreRegistry") .field( "schemes", @@ -230,16 +145,9 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { self.object_stores.insert(s, store) } - fn get_store(&self, url: &Url) -> Result> { + fn get_store(&self, url: &Url) -> Option> { let s = get_url_key(url); - self.object_stores - .get(&s) - .map(|o| Arc::clone(o.value())) - .ok_or_else(|| { - DataFusionError::Internal(format!( - "No suitable object store found for {url}. See `RuntimeEnv::register_object_store`" - )) - }) + self.object_stores.get(&s).map(|o| Arc::clone(o.value())) } } @@ -257,46 +165,18 @@ fn get_url_key(url: &Url) -> String { mod tests { use super::*; - #[test] - fn test_object_store_url() { - let file = ObjectStoreUrl::parse("file://").unwrap(); - assert_eq!(file.as_str(), "file:///"); - - let url = ObjectStoreUrl::parse("s3://bucket").unwrap(); - assert_eq!(url.as_str(), "s3://bucket/"); - - let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap(); - assert_eq!(url.as_str(), "s3://username:password@host:123/"); - - let err = ObjectStoreUrl::parse("s3://bucket:invalid").unwrap_err(); - assert_eq!(err.strip_backtrace(), "External error: invalid port number"); - - let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err(); - assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?"); - - let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err(); - assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?foo=bar"); - - let err = ObjectStoreUrl::parse("s3://host:123/foo").unwrap_err(); - assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"); - - let err = - ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err(); - assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"); - } - #[test] fn test_get_url_key() { - let file = ObjectStoreUrl::parse("file://").unwrap(); - let key = get_url_key(&file.url); + let file = Url::parse("file://").unwrap(); + let key = get_url_key(&file); assert_eq!(key.as_str(), "file://"); - let url = ObjectStoreUrl::parse("s3://bucket").unwrap(); - let key = get_url_key(&url.url); + let url = Url::parse("s3://bucket").unwrap(); + let key = get_url_key(&url); assert_eq!(key.as_str(), "s3://bucket"); - let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap(); - let key = get_url_key(&url.url); + let url = Url::parse("s3://username:password@host:123").unwrap(); + let key = get_url_key(&url); assert_eq!(key.as_str(), "s3://host:123"); } } \ No newline at end of file From 85dbce45c026e1281febebfb8811e5dceef31f16 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 6 May 2025 10:13:04 -0700 Subject: [PATCH 03/44] Remove wasm32 --- src/registry.rs | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index ba20435d..1157fd53 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -22,8 +22,6 @@ //! to avoid repeated creation. use dashmap::DashMap; -#[cfg(not(target_arch = "wasm32"))] -use crate::local::LocalFileSystem; use crate::ObjectStore; use std::sync::Arc; use url::Url; @@ -111,14 +109,6 @@ impl Default for DefaultObjectStoreRegistry { impl DefaultObjectStoreRegistry { /// This will register [`LocalFileSystem`] to handle `file://` paths #[cfg(not(target_arch = "wasm32"))] - pub fn new() -> Self { - let object_stores: DashMap> = DashMap::new(); - object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); - Self { object_stores } - } - - /// Default without any backend registered. - #[cfg(target_arch = "wasm32")] pub fn new() -> Self { let object_stores: DashMap> = DashMap::new(); Self { object_stores } @@ -126,9 +116,7 @@ impl DefaultObjectStoreRegistry { } /// -/// Stores are registered based on the scheme, host and port of the provided URL -/// with a [`LocalFileSystem::new`] automatically registered for `file://` (if the -/// target arch is not `wasm32`). +/// Stores are registered based on the scheme, host and port of the provided URL. /// /// For example: /// From ff551ea3312da17949f80ba6560390784c019bb6 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 6 May 2025 10:31:21 -0700 Subject: [PATCH 04/44] Create new \s if not exist on get --- src/registry.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 1157fd53..074ac0a4 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -22,7 +22,7 @@ //! to avoid repeated creation. use dashmap::DashMap; -use crate::ObjectStore; +use crate::{parse_url, ObjectStore}; use std::sync::Arc; use url::Url; @@ -135,7 +135,16 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { fn get_store(&self, url: &Url) -> Option> { let s = get_url_key(url); - self.object_stores.get(&s).map(|o| Arc::clone(o.value())) + self.object_stores + .entry(s) + .or_try_insert_with(|| { + match parse_url(url) { + Ok((store, _)) => Ok(Arc::new(store)), + Err(e) => Err(e), + } + }) + .map(|o| Arc::clone(o.value())) + .ok() } } From 0bedcace31fbad8f842da1daec67b7bcc14f6298 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 6 May 2025 10:55:44 -0700 Subject: [PATCH 05/44] Add registry tests --- src/registry.rs | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/registry.rs b/src/registry.rs index 074ac0a4..01846c3e 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -176,4 +176,42 @@ mod tests { let key = get_url_key(&url); assert_eq!(key.as_str(), "s3://host:123"); } + + #[cfg(not(target_arch = "wasm32"))] + #[test] + fn test_register_store() { + use crate::local::LocalFileSystem; + let registry = DefaultObjectStoreRegistry::new(); + let url = Url::parse("file:///foo/bar").unwrap(); + let store = Arc::new(LocalFileSystem::new()) as Arc; + let old_store = registry.register_store(&url, Arc::clone(&store)); + assert!(old_store.is_none(), "Should not return a previous store when registering a new one"); + let retrieved_store = registry + .get_store(&url) + .expect("Should retrieve a store when a valid URL is used"); + // Validate that it’s exactly the same Arc we registered above + assert!( + Arc::ptr_eq(&retrieved_store, &store), + "Retrieved store is not the same LocalFileSystem instance" + ); + } + + #[cfg(not(target_arch = "wasm32"))] + #[test] + fn test_register_store_on_get() { + let registry = DefaultObjectStoreRegistry::new(); + let url = Url::parse("file:///foo/bar").unwrap(); + // on first get, should lazily create & register a LocalFileSystem + let retrieved_store = registry + .get_store(&url) + .expect("Should retrieve a store when a valid URL is used"); + // Validate that this really is a LocalFileSystem under the hood by + // looking at its Debug representation + let dbg = format!("{:?}", retrieved_store); + assert!( + dbg.starts_with("LocalFileSystem"), + "Expected a LocalFileSystem, but Debug printed: {}", + dbg + ); + } } \ No newline at end of file From 395f32d24fd9cf62756d27d05565433d08412e55 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 6 May 2025 11:03:21 -0700 Subject: [PATCH 06/44] Add \ --- src/registry.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/registry.rs b/src/registry.rs index 01846c3e..a43f8ba7 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -77,6 +77,9 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// the `url` and [`ObjectStoreRegistry`] implementation. An [`ObjectStore`] may be lazily /// created and registered. fn get_store(&self, url: &Url) -> Option>; + + /// List all registered URLs + fn list_urls(&self) -> Vec; } /// The default [`ObjectStoreRegistry`] @@ -146,6 +149,10 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { .map(|o| Arc::clone(o.value())) .ok() } + + fn list_urls(&self) -> Vec { + self.object_stores.iter().map(|o| o.key().clone()).collect() + } } /// Get the key of a url for object store registration. @@ -214,4 +221,24 @@ mod tests { dbg ); } + + #[cfg(not(target_arch = "wasm32"))] + #[test] + fn test_list_urls() { + use crate::local::LocalFileSystem; + let registry = DefaultObjectStoreRegistry::new(); + let url = Url::parse("file:///foo/bar").unwrap(); + let store = Arc::new(LocalFileSystem::new()) as Arc; + registry.register_store(&url, store); + let urls = registry.list_urls(); + assert_eq!(urls.len(), 1); + assert_eq!(urls[0], "file://"); + } + + #[test] + fn test_registry_with_bad_scheme() { + let registry = DefaultObjectStoreRegistry::new(); + let url = Url::parse("unknown://foo/bar").unwrap(); + assert!(registry.get_store(&url).is_none()); + } } \ No newline at end of file From 48fd0bfdd17b89e6d73a49fae787990c56de5e83 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 6 May 2025 11:14:40 -0700 Subject: [PATCH 07/44] Fix rustdocs and wasm --- src/registry.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index a43f8ba7..39fa03e3 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -17,7 +17,7 @@ //! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store. //! This allows the user to resolve a URL to an ObjectStore at runtime. Unlike -//! [`object_store::parse:parse_url`], this allows for custom logic to be executed +//! [`crate::parse_url`], this allows for custom logic to be executed //! when a URL is resolved to an ObjectStore. It also serves as a cache for object stores //! to avoid repeated creation. @@ -57,8 +57,6 @@ use url::Url; /// /// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`] /// lazily by providing a custom implementation of [`ObjectStoreRegistry`] -/// -/// [`ObjectStore`]: object_store::ObjectStore pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// If a store with the same key existed before, it is replaced and returned fn register_store( @@ -110,8 +108,7 @@ impl Default for DefaultObjectStoreRegistry { } impl DefaultObjectStoreRegistry { - /// This will register [`LocalFileSystem`] to handle `file://` paths - #[cfg(not(target_arch = "wasm32"))] + /// Create a new [`DefaultObjectStoreRegistry`] with no registered stores pub fn new() -> Self { let object_stores: DashMap> = DashMap::new(); Self { object_stores } From 9b441400386fc2e46210c8a2b9c7c0cd9a678cf2 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 6 May 2025 11:16:18 -0700 Subject: [PATCH 08/44] cargo fmt --- src/registry.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 39fa03e3..272c9c08 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -21,12 +21,11 @@ //! when a URL is resolved to an ObjectStore. It also serves as a cache for object stores //! to avoid repeated creation. -use dashmap::DashMap; use crate::{parse_url, ObjectStore}; +use dashmap::DashMap; use std::sync::Arc; use url::Url; - /// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance, /// and allows users to read from different [`ObjectStore`] /// instances. For example the registry might be configured so that @@ -137,11 +136,9 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { let s = get_url_key(url); self.object_stores .entry(s) - .or_try_insert_with(|| { - match parse_url(url) { - Ok((store, _)) => Ok(Arc::new(store)), - Err(e) => Err(e), - } + .or_try_insert_with(|| match parse_url(url) { + Ok((store, _)) => Ok(Arc::new(store)), + Err(e) => Err(e), }) .map(|o| Arc::clone(o.value())) .ok() @@ -189,7 +186,10 @@ mod tests { let url = Url::parse("file:///foo/bar").unwrap(); let store = Arc::new(LocalFileSystem::new()) as Arc; let old_store = registry.register_store(&url, Arc::clone(&store)); - assert!(old_store.is_none(), "Should not return a previous store when registering a new one"); + assert!( + old_store.is_none(), + "Should not return a previous store when registering a new one" + ); let retrieved_store = registry .get_store(&url) .expect("Should retrieve a store when a valid URL is used"); @@ -238,4 +238,4 @@ mod tests { let url = Url::parse("unknown://foo/bar").unwrap(); assert!(registry.get_store(&url).is_none()); } -} \ No newline at end of file +} From 81a05b0f770009260b9bad7b54115eefccd2b712 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Wed, 7 May 2025 09:15:02 -0700 Subject: [PATCH 09/44] Remove dashmap --- Cargo.toml | 1 - src/registry.rs | 46 ++++++++++++++++++++++++---------------------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ab9b2a82..b3fd34a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ all-features = true async-trait = "0.1.53" bytes = "1.0" chrono = { version = "0.4.34", default-features = false, features = ["clock"] } -dashmap = "6.1.0" futures = "0.3" http = "1.2.0" humantime = "2.1" diff --git a/src/registry.rs b/src/registry.rs index 272c9c08..16fe2443 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -22,8 +22,8 @@ //! to avoid repeated creation. use crate::{parse_url, ObjectStore}; -use dashmap::DashMap; -use std::sync::Arc; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; use url::Url; /// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance, @@ -82,20 +82,14 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// The default [`ObjectStoreRegistry`] pub struct DefaultObjectStoreRegistry { /// A map from scheme to object store that serve list / read operations for the store - object_stores: DashMap>, + object_stores: RwLock>>, } impl std::fmt::Debug for DefaultObjectStoreRegistry { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let stores = self.object_stores.read().unwrap(); f.debug_struct("DefaultObjectStoreRegistry") - .field( - "schemes", - &self - .object_stores - .iter() - .map(|o| o.key().clone()) - .collect::>(), - ) + .field("schemes", &stores.keys().cloned().collect::>()) .finish() } } @@ -109,7 +103,7 @@ impl Default for DefaultObjectStoreRegistry { impl DefaultObjectStoreRegistry { /// Create a new [`DefaultObjectStoreRegistry`] with no registered stores pub fn new() -> Self { - let object_stores: DashMap> = DashMap::new(); + let object_stores = RwLock::new(HashMap::new()); Self { object_stores } } } @@ -129,23 +123,31 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { store: Arc, ) -> Option> { let s = get_url_key(url); - self.object_stores.insert(s, store) + let mut stores = self.object_stores.write().unwrap(); + stores.insert(s, store) } fn get_store(&self, url: &Url) -> Option> { let s = get_url_key(url); - self.object_stores - .entry(s) - .or_try_insert_with(|| match parse_url(url) { - Ok((store, _)) => Ok(Arc::new(store)), - Err(e) => Err(e), - }) - .map(|o| Arc::clone(o.value())) - .ok() + let mut stores = self.object_stores.write().unwrap(); + + if let Some(store) = stores.get(&s) { + return Some(Arc::clone(store)); + } + + match parse_url(url) { + Ok((store, _)) => { + let store = Arc::new(store); + stores.insert(s, store.clone()); + Some(store) + } + Err(_) => None, + } } fn list_urls(&self) -> Vec { - self.object_stores.iter().map(|o| o.key().clone()).collect() + let stores = self.object_stores.read().unwrap(); + stores.keys().cloned().collect() } } From a09bd7e416eb7c3336b1161206e8f48aef9351a4 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Wed, 7 May 2025 09:19:46 -0700 Subject: [PATCH 10/44] Placate clippy --- src/registry.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 16fe2443..a56b23b0 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -137,8 +137,8 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { match parse_url(url) { Ok((store, _)) => { - let store = Arc::new(store); - stores.insert(s, store.clone()); + let store = Arc::from(store); + stores.insert(s, Arc::clone(&store)); Some(store) } Err(_) => None, From 658ba5edf2fbcc2156d1ef4c27db91b2084d0411 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Wed, 7 May 2025 12:04:41 -0700 Subject: [PATCH 11/44] Expose parse_url_opts options when calling get --- src/registry.rs | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index a56b23b0..a076621c 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -21,7 +21,7 @@ //! when a URL is resolved to an ObjectStore. It also serves as a cache for object stores //! to avoid repeated creation. -use crate::{parse_url, ObjectStore}; +use crate::{parse_url_opts, ObjectStore}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use url::Url; @@ -75,6 +75,22 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// created and registered. fn get_store(&self, url: &Url) -> Option>; + /// Get a suitable store for the provided URL. For example: + /// + /// - URL with scheme `file:///` or no scheme will return the default LocalFS store + /// - URL with scheme `s3://bucket/` will return the S3 store + /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store + /// + /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on + /// the `url` and [`ObjectStoreRegistry`] implementation. An [`ObjectStore`] may be lazily + /// created and registered. If `opts` are supplied, they will be passed to the ad-hoc discovery + /// function. + fn get_store_with_opts(&self, url: &Url, opts: I) -> Option> + where + I: IntoIterator, + K: AsRef, + V: Into; + /// List all registered URLs fn list_urls(&self) -> Vec; } @@ -128,6 +144,15 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { } fn get_store(&self, url: &Url) -> Option> { + self.get_store_with_opts(url, std::iter::empty::<(&str, &str)>()) + } + + fn get_store_with_opts(&self, url: &Url, opts: I) -> Option> + where + I: IntoIterator, + K: AsRef, + V: Into, + { let s = get_url_key(url); let mut stores = self.object_stores.write().unwrap(); @@ -135,7 +160,7 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { return Some(Arc::clone(store)); } - match parse_url(url) { + match parse_url_opts(url, opts) { Ok((store, _)) => { let store = Arc::from(store); stores.insert(s, Arc::clone(&store)); From 9905e92b24cdeb787341e1da5f0d2e8cc5dfc736 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Wed, 7 May 2025 17:35:26 -0700 Subject: [PATCH 12/44] Revert "Expose parse_url_opts options when calling get" This reverts commit 658ba5edf2fbcc2156d1ef4c27db91b2084d0411. --- src/registry.rs | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index a076621c..a56b23b0 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -21,7 +21,7 @@ //! when a URL is resolved to an ObjectStore. It also serves as a cache for object stores //! to avoid repeated creation. -use crate::{parse_url_opts, ObjectStore}; +use crate::{parse_url, ObjectStore}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use url::Url; @@ -75,22 +75,6 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// created and registered. fn get_store(&self, url: &Url) -> Option>; - /// Get a suitable store for the provided URL. For example: - /// - /// - URL with scheme `file:///` or no scheme will return the default LocalFS store - /// - URL with scheme `s3://bucket/` will return the S3 store - /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store - /// - /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on - /// the `url` and [`ObjectStoreRegistry`] implementation. An [`ObjectStore`] may be lazily - /// created and registered. If `opts` are supplied, they will be passed to the ad-hoc discovery - /// function. - fn get_store_with_opts(&self, url: &Url, opts: I) -> Option> - where - I: IntoIterator, - K: AsRef, - V: Into; - /// List all registered URLs fn list_urls(&self) -> Vec; } @@ -144,15 +128,6 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { } fn get_store(&self, url: &Url) -> Option> { - self.get_store_with_opts(url, std::iter::empty::<(&str, &str)>()) - } - - fn get_store_with_opts(&self, url: &Url, opts: I) -> Option> - where - I: IntoIterator, - K: AsRef, - V: Into, - { let s = get_url_key(url); let mut stores = self.object_stores.write().unwrap(); @@ -160,7 +135,7 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { return Some(Arc::clone(store)); } - match parse_url_opts(url, opts) { + match parse_url(url) { Ok((store, _)) => { let store = Arc::from(store); stores.insert(s, Arc::clone(&store)); From 0a617335b871afff1576dab1fd0c15d8f8dddafd Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Fri, 9 May 2025 16:24:00 -0700 Subject: [PATCH 13/44] Update src/registry.rs Co-authored-by: Andrew Lamb --- src/registry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/registry.rs b/src/registry.rs index a56b23b0..3a04f90d 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -51,7 +51,7 @@ use url::Url; /// /// This allows for two different use-cases: /// -/// 1. Systems where object store buckets are explicitly created using DDL, can register these +/// 1. Systems where object store buckets are explicitly created using DDL, can use the provided [`DefaultObjectStoreRegistry`] to register these /// buckets using [`ObjectStoreRegistry::register_store`] /// /// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`] From fb4c9555c6753bd37daab0ee3baeb31b81abbf7d Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Fri, 9 May 2025 16:25:17 -0700 Subject: [PATCH 14/44] Update src/registry.rs Co-authored-by: Andrew Lamb --- src/registry.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/registry.rs b/src/registry.rs index 3a04f90d..6343c366 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -57,10 +57,12 @@ use url::Url; /// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`] /// lazily by providing a custom implementation of [`ObjectStoreRegistry`] pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { + /// Register a new store for any url's that begin with `prefix` + /// /// If a store with the same key existed before, it is replaced and returned fn register_store( &self, - url: &Url, + prefix: &Url, store: Arc, ) -> Option>; From e5ec53ac73ae3fb5586b690db9c1e2c89a2b05f8 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Fri, 9 May 2025 16:34:02 -0700 Subject: [PATCH 15/44] Rename list_urls --- src/registry.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 6343c366..a0ce8bdb 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -77,8 +77,8 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// created and registered. fn get_store(&self, url: &Url) -> Option>; - /// List all registered URLs - fn list_urls(&self) -> Vec; + /// List all registered store prefixes + fn get_store_prefixes(&self) -> Vec; } /// The default [`ObjectStoreRegistry`] @@ -121,10 +121,10 @@ impl DefaultObjectStoreRegistry { impl ObjectStoreRegistry for DefaultObjectStoreRegistry { fn register_store( &self, - url: &Url, + prefix: &Url, store: Arc, ) -> Option> { - let s = get_url_key(url); + let s = get_url_key(prefix); let mut stores = self.object_stores.write().unwrap(); stores.insert(s, store) } @@ -147,7 +147,7 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { } } - fn list_urls(&self) -> Vec { + fn get_store_prefixes(&self) -> Vec { let stores = self.object_stores.read().unwrap(); stores.keys().cloned().collect() } @@ -231,7 +231,7 @@ mod tests { let url = Url::parse("file:///foo/bar").unwrap(); let store = Arc::new(LocalFileSystem::new()) as Arc; registry.register_store(&url, store); - let urls = registry.list_urls(); + let urls = registry.get_store_prefixes(); assert_eq!(urls.len(), 1); assert_eq!(urls[0], "file://"); } From 6a51b0bd9a0dbf0380db17e6d37a92e38b6e34ac Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Fri, 9 May 2025 16:34:13 -0700 Subject: [PATCH 16/44] Fix doc formatting for fmt --- src/registry.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index a0ce8bdb..63531cd8 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -57,8 +57,8 @@ use url::Url; /// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`] /// lazily by providing a custom implementation of [`ObjectStoreRegistry`] pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { - /// Register a new store for any url's that begin with `prefix` - /// + /// Register a new store for any url's that begin with `prefix` + /// /// If a store with the same key existed before, it is replaced and returned fn register_store( &self, From 005c9f243972f017b7bb1b25dc7ef38b946bb106 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Fri, 9 May 2025 16:45:05 -0700 Subject: [PATCH 17/44] Add more tests for file scheme checking --- src/registry.rs | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 63531cd8..6e64a988 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -166,6 +166,8 @@ fn get_url_key(url: &Url) -> String { #[cfg(test)] mod tests { use super::*; + #[cfg(not(target_arch = "wasm32"))] + use crate::local::LocalFileSystem; #[test] fn test_get_url_key() { @@ -185,23 +187,13 @@ mod tests { #[cfg(not(target_arch = "wasm32"))] #[test] fn test_register_store() { - use crate::local::LocalFileSystem; let registry = DefaultObjectStoreRegistry::new(); let url = Url::parse("file:///foo/bar").unwrap(); let store = Arc::new(LocalFileSystem::new()) as Arc; let old_store = registry.register_store(&url, Arc::clone(&store)); - assert!( - old_store.is_none(), - "Should not return a previous store when registering a new one" - ); - let retrieved_store = registry - .get_store(&url) - .expect("Should retrieve a store when a valid URL is used"); - // Validate that it’s exactly the same Arc we registered above - assert!( - Arc::ptr_eq(&retrieved_store, &store), - "Retrieved store is not the same LocalFileSystem instance" - ); + assert!(old_store.is_none()); + let retrieved_store = registry.get_store(&url).unwrap(); + assert!(Arc::ptr_eq(&retrieved_store, &store)); } #[cfg(not(target_arch = "wasm32"))] @@ -210,9 +202,7 @@ mod tests { let registry = DefaultObjectStoreRegistry::new(); let url = Url::parse("file:///foo/bar").unwrap(); // on first get, should lazily create & register a LocalFileSystem - let retrieved_store = registry - .get_store(&url) - .expect("Should retrieve a store when a valid URL is used"); + let retrieved_store = registry.get_store(&url).unwrap(); // Validate that this really is a LocalFileSystem under the hood by // looking at its Debug representation let dbg = format!("{:?}", retrieved_store); @@ -226,7 +216,6 @@ mod tests { #[cfg(not(target_arch = "wasm32"))] #[test] fn test_list_urls() { - use crate::local::LocalFileSystem; let registry = DefaultObjectStoreRegistry::new(); let url = Url::parse("file:///foo/bar").unwrap(); let store = Arc::new(LocalFileSystem::new()) as Arc; @@ -242,4 +231,27 @@ mod tests { let url = Url::parse("unknown://foo/bar").unwrap(); assert!(registry.get_store(&url).is_none()); } + + #[cfg(not(target_arch = "wasm32"))] + #[test] + fn test_subprefix_url_resolution() { + let registry = DefaultObjectStoreRegistry::new(); + let base_url = Url::parse("file:///foo/bar").unwrap(); + let store = Arc::new(LocalFileSystem::new()) as Arc; + registry.register_store(&base_url, Arc::clone(&store)); + let subprefix_url = Url::parse("file:///foo/bar/baz").unwrap(); + let retrieved_store = registry.get_store(&subprefix_url).unwrap(); + assert!(Arc::ptr_eq(&retrieved_store, &store)); + } + + #[test] + fn test_invalid_file_url_format() { + let registry = DefaultObjectStoreRegistry::new(); + let base_url = Url::parse("file:///foo/bar").unwrap(); + let store = Arc::new(LocalFileSystem::new()) as Arc; + registry.register_store(&base_url, Arc::clone(&store)); + let invalid_url = Url::parse("file://foo/bar").unwrap(); + let result = registry.get_store(&invalid_url); + assert!(result.is_none()); + } } From bdb819498a67e7f67508d39e87e966972e87a4ea Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Fri, 9 May 2025 16:49:48 -0700 Subject: [PATCH 18/44] Make wasm32 happy --- src/registry.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/registry.rs b/src/registry.rs index 6e64a988..6570d76d 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -244,6 +244,7 @@ mod tests { assert!(Arc::ptr_eq(&retrieved_store, &store)); } + #[cfg(not(target_arch = "wasm32"))] #[test] fn test_invalid_file_url_format() { let registry = DefaultObjectStoreRegistry::new(); From 5a01bd8f720b27710df520b9effc12f0f824174c Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 09:51:05 -0700 Subject: [PATCH 19/44] Remove get_store_key and make DefaultObjectStore really dumb --- src/registry.rs | 140 ++++++++++++------------------------------------ 1 file changed, 34 insertions(+), 106 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 6570d76d..4a771243 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -21,70 +21,42 @@ //! when a URL is resolved to an ObjectStore. It also serves as a cache for object stores //! to avoid repeated creation. -use crate::{parse_url, ObjectStore}; +use crate::ObjectStore; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use url::Url; -/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance, -/// and allows users to read from different [`ObjectStore`] -/// instances. For example the registry might be configured so that -/// -/// 1. `s3://my_bucket/lineitem/` mapped to the `/lineitem` path on an -/// AWS S3 object store bound to `my_bucket` -/// -/// 2. `s3://my_other_bucket/lineitem/` mapped to the (same) -/// `/lineitem` path on a *different* AWS S3 object store bound to -/// `my_other_bucket` -/// -/// In this particular case, the url `s3://my_bucket/lineitem/` will be provided to -/// [`ObjectStoreRegistry::get_store`] and one of three things will happen: -/// -/// - If an [`ObjectStore`] has been registered with [`ObjectStoreRegistry::register_store`] with -/// `s3://my_bucket`, that [`ObjectStore`] will be returned -/// -/// - If an AWS S3 object store can be ad-hoc discovered by the url `s3://my_bucket/lineitem/`, this -/// object store will be registered with key `s3://my_bucket` and returned. -/// -/// - Otherwise `None` will be returned, indicating that no suitable [`ObjectStore`] could -/// be found -/// -/// This allows for two different use-cases: -/// -/// 1. Systems where object store buckets are explicitly created using DDL, can use the provided [`DefaultObjectStoreRegistry`] to register these -/// buckets using [`ObjectStoreRegistry::register_store`] -/// -/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`] -/// lazily by providing a custom implementation of [`ObjectStoreRegistry`] +/// [`ObjectStoreRegistry`] maps a URL prefix to an [`ObjectStore`] instance. The definition of +/// a URL prefix depends on the [`ObjectStoreRegistry`] implementation. See implementation docs +/// for more details. pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { - /// Register a new store for any url's that begin with `prefix` + /// Register a new store for any URL that begins with `prefix` /// - /// If a store with the same key existed before, it is replaced and returned + /// If a store with the same prefix existed before, it is replaced and returned fn register_store( &self, prefix: &Url, store: Arc, ) -> Option>; - /// Get a suitable store for the provided URL. For example: - /// - /// - URL with scheme `file:///` or no scheme will return the default LocalFS store - /// - URL with scheme `s3://bucket/` will return the S3 store - /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store + /// Get a suitable store for the provided URL. The definition of a "suitable store" depends on + /// the [`ObjectStoreRegistry`] implementation. See implementation docs for more details. /// /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on /// the `url` and [`ObjectStoreRegistry`] implementation. An [`ObjectStore`] may be lazily - /// created and registered. + /// created and registered. The logic for doing so is left to each [`ObjectStoreRegistry`] + /// implementation. fn get_store(&self, url: &Url) -> Option>; /// List all registered store prefixes - fn get_store_prefixes(&self) -> Vec; + fn get_store_prefixes(&self) -> Vec; } -/// The default [`ObjectStoreRegistry`] +/// A simple [`ObjectStoreRegistry`] implementation that registers stores based on the provided +/// URL prefix. pub struct DefaultObjectStoreRegistry { - /// A map from scheme to object store that serve list / read operations for the store - object_stores: RwLock>>, + /// A map from URL prefix to object store that serve list / read operations for the store + object_stores: RwLock>>, } impl std::fmt::Debug for DefaultObjectStoreRegistry { @@ -111,79 +83,44 @@ impl DefaultObjectStoreRegistry { } /// -/// Stores are registered based on the scheme, host and port of the provided URL. +/// Stores are registered based on the URL prefix of the provided URL. /// /// For example: /// -/// - `file:///my_path` will return the default LocalFS store -/// - `s3://bucket/path` will return a store registered with `s3://bucket` if any -/// - `hdfs://host:port/path` will return a store registered with `hdfs://host:port` if any +/// - `file:///foo/bar` will return a store registered with `file:///foo/bar` if any +/// - `s3://bucket/path` will return a store registered with `s3://bucket/path` if any +/// - `hdfs://host:port/path` will return a store registered with `hdfs://host:port/path` if any impl ObjectStoreRegistry for DefaultObjectStoreRegistry { fn register_store( &self, prefix: &Url, store: Arc, ) -> Option> { - let s = get_url_key(prefix); let mut stores = self.object_stores.write().unwrap(); - stores.insert(s, store) + stores.insert(prefix.clone(), store) } - fn get_store(&self, url: &Url) -> Option> { - let s = get_url_key(url); - let mut stores = self.object_stores.write().unwrap(); - - if let Some(store) = stores.get(&s) { - return Some(Arc::clone(store)); - } - - match parse_url(url) { - Ok((store, _)) => { - let store = Arc::from(store); - stores.insert(s, Arc::clone(&store)); - Some(store) - } - Err(_) => None, - } + /// Get a store that was registered with the provided URL prefix. + /// + /// If no store was registered with the provided URL prefix, `None` is returned. + fn get_store(&self, prefix: &Url) -> Option> { + let stores = self.object_stores.read().unwrap(); + stores.get(prefix).map(|s| Arc::clone(s)) } - fn get_store_prefixes(&self) -> Vec { + /// Returns a vector of all registered store prefixes. + fn get_store_prefixes(&self) -> Vec { let stores = self.object_stores.read().unwrap(); stores.keys().cloned().collect() } } -/// Get the key of a url for object store registration. -/// The credential info will be removed -fn get_url_key(url: &Url) -> String { - format!( - "{}://{}", - url.scheme(), - &url[url::Position::BeforeHost..url::Position::AfterPort], - ) -} - #[cfg(test)] mod tests { use super::*; #[cfg(not(target_arch = "wasm32"))] use crate::local::LocalFileSystem; - #[test] - fn test_get_url_key() { - let file = Url::parse("file://").unwrap(); - let key = get_url_key(&file); - assert_eq!(key.as_str(), "file://"); - - let url = Url::parse("s3://bucket").unwrap(); - let key = get_url_key(&url); - assert_eq!(key.as_str(), "s3://bucket"); - - let url = Url::parse("s3://username:password@host:123").unwrap(); - let key = get_url_key(&url); - assert_eq!(key.as_str(), "s3://host:123"); - } - #[cfg(not(target_arch = "wasm32"))] #[test] fn test_register_store() { @@ -198,19 +135,10 @@ mod tests { #[cfg(not(target_arch = "wasm32"))] #[test] - fn test_register_store_on_get() { + fn test_get_store_miss() { let registry = DefaultObjectStoreRegistry::new(); let url = Url::parse("file:///foo/bar").unwrap(); - // on first get, should lazily create & register a LocalFileSystem - let retrieved_store = registry.get_store(&url).unwrap(); - // Validate that this really is a LocalFileSystem under the hood by - // looking at its Debug representation - let dbg = format!("{:?}", retrieved_store); - assert!( - dbg.starts_with("LocalFileSystem"), - "Expected a LocalFileSystem, but Debug printed: {}", - dbg - ); + assert!(registry.get_store(&url).is_none()); } #[cfg(not(target_arch = "wasm32"))] @@ -222,7 +150,7 @@ mod tests { registry.register_store(&url, store); let urls = registry.get_store_prefixes(); assert_eq!(urls.len(), 1); - assert_eq!(urls[0], "file://"); + assert_eq!(urls[0], Url::parse("file:///foo/bar").unwrap()); } #[test] @@ -234,14 +162,14 @@ mod tests { #[cfg(not(target_arch = "wasm32"))] #[test] - fn test_subprefix_url_resolution() { + fn test_subprefix_miss() { let registry = DefaultObjectStoreRegistry::new(); let base_url = Url::parse("file:///foo/bar").unwrap(); let store = Arc::new(LocalFileSystem::new()) as Arc; registry.register_store(&base_url, Arc::clone(&store)); let subprefix_url = Url::parse("file:///foo/bar/baz").unwrap(); - let retrieved_store = registry.get_store(&subprefix_url).unwrap(); - assert!(Arc::ptr_eq(&retrieved_store, &store)); + let retrieved_store = registry.get_store(&subprefix_url); + assert!(retrieved_store.is_none()); } #[cfg(not(target_arch = "wasm32"))] From b19c26ea04b8745050cffd9db70cc185e71051a7 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 10:42:42 -0700 Subject: [PATCH 20/44] Some more docs --- src/registry.rs | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 4a771243..ec55aebb 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -15,12 +15,29 @@ // specific language governing permissions and limitations // under the License. -//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store. -//! This allows the user to resolve a URL to an ObjectStore at runtime. Unlike -//! [`crate::parse_url`], this allows for custom logic to be executed -//! when a URL is resolved to an ObjectStore. It also serves as a cache for object stores -//! to avoid repeated creation. - +//! ObjectStoreRegistry holds object stores at runtime with a URL for each store. +//! The registry serves as a cache for object stores to avoid repeated creation. It +//! also simplifies converting an [`ObjectStore`] back to a URL: +//! +//! ```rust +//! use std::sync::Arc; +//! use url::Url; +//! use object_store::ObjectStore; +//! use object_store::memory::InMemory; +//! use object_store::registry::{DefaultObjectStoreRegistry, ObjectStoreRegistry}; +//! +//! let expected_store = Arc::new(InMemory::new()) as Arc; +//! let registry = DefaultObjectStoreRegistry::new(); +//! let url = Url::parse("inmemory://").unwrap(); +//! registry.register_store(&url, expected_store.clone()); +//! let prefixes = registry.get_store_prefixes(); +//! for prefix in prefixes { +//! let store = registry.get_store(&prefix).unwrap(); +//! if Arc::ptr_eq(&store, &expected_store) { +//! assert_eq!(prefix, url); +//! } +//! } +//! ``` use crate::ObjectStore; use std::collections::HashMap; use std::sync::{Arc, RwLock}; From 5e622d790a2e8e1f051e77f159e0f87c56fa0706 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 11:06:27 -0700 Subject: [PATCH 21/44] Add a get_url method as well --- src/registry.rs | 107 +++++++++++++++++++++++++++--------------------- 1 file changed, 60 insertions(+), 47 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index ec55aebb..1aab41d8 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -16,8 +16,8 @@ // under the License. //! ObjectStoreRegistry holds object stores at runtime with a URL for each store. -//! The registry serves as a cache for object stores to avoid repeated creation. It -//! also simplifies converting an [`ObjectStore`] back to a URL: +//! The registry serves as a cache for object stores to avoid repeated creation. +//! It also lets you convert an [`ObjectStore`] back to its registered URL via `get_url`: //! //! ```rust //! use std::sync::Arc; @@ -26,17 +26,14 @@ //! use object_store::memory::InMemory; //! use object_store::registry::{DefaultObjectStoreRegistry, ObjectStoreRegistry}; //! -//! let expected_store = Arc::new(InMemory::new()) as Arc; +//! let store = Arc::new(InMemory::new()) as Arc; //! let registry = DefaultObjectStoreRegistry::new(); //! let url = Url::parse("inmemory://").unwrap(); -//! registry.register_store(&url, expected_store.clone()); -//! let prefixes = registry.get_store_prefixes(); -//! for prefix in prefixes { -//! let store = registry.get_store(&prefix).unwrap(); -//! if Arc::ptr_eq(&store, &expected_store) { -//! assert_eq!(prefix, url); -//! } -//! } +//! registry.register_store(&url, store.clone()); +//! let found_store = registry.get_store(&url).unwrap(); +//! assert!(Arc::ptr_eq(&found_store, &store)); +//! let found_url = registry.get_url(store.clone()).unwrap(); +//! assert_eq!(found_url, url); //! ``` use crate::ObjectStore; use std::collections::HashMap; @@ -56,15 +53,17 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { store: Arc, ) -> Option>; - /// Get a suitable store for the provided URL. The definition of a "suitable store" depends on + /// Get a store for the provided URL. The definition of a "suitable store" depends on /// the [`ObjectStoreRegistry`] implementation. See implementation docs for more details. /// - /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on - /// the `url` and [`ObjectStoreRegistry`] implementation. An [`ObjectStore`] may be lazily + /// If no [`ObjectStore`] is found for the `url`, an [`ObjectStore`] may be lazily be /// created and registered. The logic for doing so is left to each [`ObjectStoreRegistry`] /// implementation. fn get_store(&self, url: &Url) -> Option>; + /// Given one of the `Arc`s you registered, return its URL. + fn get_url(&self, store: Arc) -> Option; + /// List all registered store prefixes fn get_store_prefixes(&self) -> Vec; } @@ -125,6 +124,17 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { stores.get(prefix).map(|s| Arc::clone(s)) } + fn get_url(&self, store: Arc) -> Option { + let map = self.object_stores.read().unwrap(); + // scan for pointer-equal entry + for (url, registered) in map.iter() { + if Arc::ptr_eq(&store, registered) { + return Some(url.clone()); + } + } + None + } + /// Returns a vector of all registered store prefixes. fn get_store_prefixes(&self) -> Vec { let stores = self.object_stores.read().unwrap(); @@ -135,69 +145,72 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { #[cfg(test)] mod tests { use super::*; - #[cfg(not(target_arch = "wasm32"))] - use crate::local::LocalFileSystem; + use crate::memory::InMemory; - #[cfg(not(target_arch = "wasm32"))] #[test] fn test_register_store() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("file:///foo/bar").unwrap(); - let store = Arc::new(LocalFileSystem::new()) as Arc; + let url = Url::parse("inmemory://foo").unwrap(); + let store = Arc::new(InMemory::new()) as Arc; let old_store = registry.register_store(&url, Arc::clone(&store)); assert!(old_store.is_none()); let retrieved_store = registry.get_store(&url).unwrap(); assert!(Arc::ptr_eq(&retrieved_store, &store)); } - #[cfg(not(target_arch = "wasm32"))] + #[test] + fn test_reregister_store() { + let registry = DefaultObjectStoreRegistry::new(); + let url = Url::parse("inmemory://foo").unwrap(); + let store = Arc::new(InMemory::new()) as Arc; + let old_store = registry.register_store(&url, Arc::clone(&store)); + assert!(old_store.is_none()); + let old_store = registry.register_store(&url, Arc::new(InMemory::new())); + assert!(Arc::ptr_eq(&old_store.unwrap(), &store)); + } + #[test] fn test_get_store_miss() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("file:///foo/bar").unwrap(); + let url = Url::parse("inmemory://foo").unwrap(); assert!(registry.get_store(&url).is_none()); } - #[cfg(not(target_arch = "wasm32"))] #[test] - fn test_list_urls() { + fn test_get_url_round_trip() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("file:///foo/bar").unwrap(); - let store = Arc::new(LocalFileSystem::new()) as Arc; - registry.register_store(&url, store); - let urls = registry.get_store_prefixes(); - assert_eq!(urls.len(), 1); - assert_eq!(urls[0], Url::parse("file:///foo/bar").unwrap()); + let url = Url::parse("inmemory://").unwrap(); + let store = Arc::new(InMemory::new()) as Arc; + registry.register_store(&url, store.clone()); + assert_eq!(registry.get_url(store.clone()).unwrap(), url); } #[test] - fn test_registry_with_bad_scheme() { + fn test_get_url_miss() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("unknown://foo/bar").unwrap(); - assert!(registry.get_store(&url).is_none()); + let store = Arc::new(InMemory::new()) as Arc; + assert!(registry.get_url(store).is_none()); } - #[cfg(not(target_arch = "wasm32"))] #[test] - fn test_subprefix_miss() { + fn test_list_urls() { let registry = DefaultObjectStoreRegistry::new(); - let base_url = Url::parse("file:///foo/bar").unwrap(); - let store = Arc::new(LocalFileSystem::new()) as Arc; - registry.register_store(&base_url, Arc::clone(&store)); - let subprefix_url = Url::parse("file:///foo/bar/baz").unwrap(); - let retrieved_store = registry.get_store(&subprefix_url); - assert!(retrieved_store.is_none()); + let url = Url::parse("inmemory://foo").unwrap(); + let store = Arc::new(InMemory::new()) as Arc; + registry.register_store(&url, store); + let urls = registry.get_store_prefixes(); + assert_eq!(urls.len(), 1); + assert_eq!(urls[0], url); } - #[cfg(not(target_arch = "wasm32"))] #[test] - fn test_invalid_file_url_format() { + fn test_subprefix_miss() { let registry = DefaultObjectStoreRegistry::new(); - let base_url = Url::parse("file:///foo/bar").unwrap(); - let store = Arc::new(LocalFileSystem::new()) as Arc; + let base_url = Url::parse("inmemory://foo").unwrap(); + let store = Arc::new(InMemory::new()) as Arc; registry.register_store(&base_url, Arc::clone(&store)); - let invalid_url = Url::parse("file://foo/bar").unwrap(); - let result = registry.get_store(&invalid_url); - assert!(result.is_none()); + let subprefix_url = Url::parse("inmemory://foo/bar").unwrap(); + let retrieved_store = registry.get_store(&subprefix_url); + assert!(retrieved_store.is_none()); } } From 3ef24cd2c07de5debef6f976fbe5a849856d0793 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 11:08:59 -0700 Subject: [PATCH 22/44] Clippy! --- src/registry.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 1aab41d8..e568e739 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -121,7 +121,7 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { /// If no store was registered with the provided URL prefix, `None` is returned. fn get_store(&self, prefix: &Url) -> Option> { let stores = self.object_stores.read().unwrap(); - stores.get(prefix).map(|s| Arc::clone(s)) + stores.get(prefix).map(Arc::clone) } fn get_url(&self, store: Arc) -> Option { @@ -182,7 +182,7 @@ mod tests { let url = Url::parse("inmemory://").unwrap(); let store = Arc::new(InMemory::new()) as Arc; registry.register_store(&url, store.clone()); - assert_eq!(registry.get_url(store.clone()).unwrap(), url); + assert_eq!(registry.get_url(Arc::clone(&store)).unwrap(), url); } #[test] From ac7f6ea98b7935690a0481bfed2c6c24db8d530e Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 11:18:25 -0700 Subject: [PATCH 23/44] Clarify how the default registry works --- src/registry.rs | 44 ++++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index e568e739..fea28744 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -29,10 +29,10 @@ //! let store = Arc::new(InMemory::new()) as Arc; //! let registry = DefaultObjectStoreRegistry::new(); //! let url = Url::parse("inmemory://").unwrap(); -//! registry.register_store(&url, store.clone()); +//! registry.register_store(&url, Arc::clone(&store)); //! let found_store = registry.get_store(&url).unwrap(); //! assert!(Arc::ptr_eq(&found_store, &store)); -//! let found_url = registry.get_url(store.clone()).unwrap(); +//! let found_url = registry.get_url(Arc::clone(&store)).unwrap(); //! assert_eq!(found_url, url); //! ``` use crate::ObjectStore; @@ -68,10 +68,16 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { fn get_store_prefixes(&self) -> Vec; } -/// A simple [`ObjectStoreRegistry`] implementation that registers stores based on the provided -/// URL prefix. +/// A simple [`ObjectStoreRegistry`] implementation that has no prefix logic. It simply returns +/// a store registered with the provided URL if one exists. For example, if a store is registered +/// with `file:///foo`, then: +/// +/// - `file:///foo` will match +/// - `file://foo` will not match +/// - `file:///foo/bar` will not match +/// - `s3://foo` will not match pub struct DefaultObjectStoreRegistry { - /// A map from URL prefix to object store that serve list / read operations for the store + /// A map from URL to object store that serve list / read operations for the store object_stores: RwLock>>, } @@ -98,32 +104,30 @@ impl DefaultObjectStoreRegistry { } } -/// -/// Stores are registered based on the URL prefix of the provided URL. -/// -/// For example: -/// -/// - `file:///foo/bar` will return a store registered with `file:///foo/bar` if any -/// - `s3://bucket/path` will return a store registered with `s3://bucket/path` if any -/// - `hdfs://host:port/path` will return a store registered with `hdfs://host:port/path` if any impl ObjectStoreRegistry for DefaultObjectStoreRegistry { + /// Register a new store for the provided URL + /// + /// If a store with the same URL existed before, it is replaced and returned fn register_store( &self, - prefix: &Url, + url: &Url, store: Arc, ) -> Option> { let mut stores = self.object_stores.write().unwrap(); - stores.insert(prefix.clone(), store) + stores.insert(url.clone(), store) } - /// Get a store that was registered with the provided URL prefix. + /// Get a store that was registered with the provided URL. /// - /// If no store was registered with the provided URL prefix, `None` is returned. - fn get_store(&self, prefix: &Url) -> Option> { + /// If no store was registered with the provided URL, `None` is returned. + fn get_store(&self, url: &Url) -> Option> { let stores = self.object_stores.read().unwrap(); - stores.get(prefix).map(Arc::clone) + stores.get(url).map(Arc::clone) } + /// Given one of the `Arc`s you registered, return its URL. + /// + /// If no store was registered with the provided `Arc`, `None` is returned. fn get_url(&self, store: Arc) -> Option { let map = self.object_stores.read().unwrap(); // scan for pointer-equal entry @@ -181,7 +185,7 @@ mod tests { let registry = DefaultObjectStoreRegistry::new(); let url = Url::parse("inmemory://").unwrap(); let store = Arc::new(InMemory::new()) as Arc; - registry.register_store(&url, store.clone()); + registry.register_store(&url, Arc::clone(&store)); assert_eq!(registry.get_url(Arc::clone(&store)).unwrap(), url); } From b667f3ed5e15fd5e5245a860015f4c6e6657579f Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 11:22:42 -0700 Subject: [PATCH 24/44] Test url match behavior --- src/registry.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/registry.rs b/src/registry.rs index fea28744..4597874d 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -217,4 +217,33 @@ mod tests { let retrieved_store = registry.get_store(&subprefix_url); assert!(retrieved_store.is_none()); } + + #[test] + fn test_exact_url_match_behavior() { + let registry = DefaultObjectStoreRegistry::new(); + let base_url = Url::parse("file:///foo").unwrap(); + let store = Arc::new(InMemory::new()) as Arc; + registry.register_store(&base_url, Arc::clone(&store)); + + // Case 1: Exact match should work + let exact_match = Url::parse("file:///foo").unwrap(); + let retrieved_store = registry.get_store(&exact_match); + assert!(retrieved_store.is_some()); + assert!(Arc::ptr_eq(&retrieved_store.unwrap(), &store)); + + // Case 2: Different URL format should not match + let different_format = Url::parse("file://foo").unwrap(); + let retrieved_store = registry.get_store(&different_format); + assert!(retrieved_store.is_none()); + + // Case 3: Subpath should not match + let subpath = Url::parse("file:///foo/bar").unwrap(); + let retrieved_store = registry.get_store(&subpath); + assert!(retrieved_store.is_none()); + + // Case 4: Different scheme should not match + let different_scheme = Url::parse("s3://foo").unwrap(); + let retrieved_store = registry.get_store(&different_scheme); + assert!(retrieved_store.is_none()); + } } From 72b1875baa7764d97864ea7605c5b78f548c823f Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 11:48:09 -0700 Subject: [PATCH 25/44] Add prefix object store registry --- src/registry.rs | 85 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 77 insertions(+), 8 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 4597874d..cb5fecb0 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -17,7 +17,8 @@ //! ObjectStoreRegistry holds object stores at runtime with a URL for each store. //! The registry serves as a cache for object stores to avoid repeated creation. -//! It also lets you convert an [`ObjectStore`] back to its registered URL via `get_url`: +//! It also lets you convert an [`ObjectStore`] back to its registered URL via +//! `get_prefix`: //! //! ```rust //! use std::sync::Arc; @@ -32,7 +33,7 @@ //! registry.register_store(&url, Arc::clone(&store)); //! let found_store = registry.get_store(&url).unwrap(); //! assert!(Arc::ptr_eq(&found_store, &store)); -//! let found_url = registry.get_url(Arc::clone(&store)).unwrap(); +//! let found_url = registry.get_prefix(Arc::clone(&store)).unwrap(); //! assert_eq!(found_url, url); //! ``` use crate::ObjectStore; @@ -62,7 +63,7 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { fn get_store(&self, url: &Url) -> Option>; /// Given one of the `Arc`s you registered, return its URL. - fn get_url(&self, store: Arc) -> Option; + fn get_prefix(&self, store: Arc) -> Option; /// List all registered store prefixes fn get_store_prefixes(&self) -> Vec; @@ -128,7 +129,7 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { /// Given one of the `Arc`s you registered, return its URL. /// /// If no store was registered with the provided `Arc`, `None` is returned. - fn get_url(&self, store: Arc) -> Option { + fn get_prefix(&self, store: Arc) -> Option { let map = self.object_stores.read().unwrap(); // scan for pointer-equal entry for (url, registered) in map.iter() { @@ -146,6 +147,74 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { } } +struct PrefixObjectStoreRegistry { + inner: DefaultObjectStoreRegistry, + prefix_fn: Box Result + Send + Sync>, +} + +impl std::fmt::Debug for PrefixObjectStoreRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PrefixObjectStoreRegistry") + .field("inner", &self.inner) + .finish() + } +} + +impl Default for PrefixObjectStoreRegistry { + fn default() -> Self { + Self::new() + } +} + +impl PrefixObjectStoreRegistry { + fn new() -> Self { + Self::with_prefix_fn(Box::new(Self::default_prefix_fn)) + } + + fn with_prefix_fn( + prefix_fn: Box Result + Send + Sync>, + ) -> Self { + Self { + inner: DefaultObjectStoreRegistry::new(), + prefix_fn, + } + } + + fn default_prefix_fn(url: &Url) -> Result { + let prefix = format!( + "{}://{}", + url.scheme(), + &url[url::Position::BeforeHost..url::Position::AfterPort], + ); + Url::parse(&prefix) + } +} + +impl ObjectStoreRegistry for PrefixObjectStoreRegistry { + fn register_store( + &self, + prefix: &Url, + store: Arc, + ) -> Option> { + self.inner.register_store(prefix, store) + } + + fn get_store(&self, url: &Url) -> Option> { + (self.prefix_fn)(url) + .ok() + .map(|prefix| self.inner.get_store(&prefix)) + .flatten() + } + + fn get_prefix(&self, store: Arc) -> Option { + self.inner.get_prefix(store) + } + + fn get_store_prefixes(&self) -> Vec { + self.inner.get_store_prefixes() + } +} + #[cfg(test)] mod tests { use super::*; @@ -181,19 +250,19 @@ mod tests { } #[test] - fn test_get_url_round_trip() { + fn test_get_prefix_round_trip() { let registry = DefaultObjectStoreRegistry::new(); let url = Url::parse("inmemory://").unwrap(); let store = Arc::new(InMemory::new()) as Arc; registry.register_store(&url, Arc::clone(&store)); - assert_eq!(registry.get_url(Arc::clone(&store)).unwrap(), url); + assert_eq!(registry.get_prefix(Arc::clone(&store)).unwrap(), url); } #[test] - fn test_get_url_miss() { + fn test_get_prefix_miss() { let registry = DefaultObjectStoreRegistry::new(); let store = Arc::new(InMemory::new()) as Arc; - assert!(registry.get_url(store).is_none()); + assert!(registry.get_prefix(store).is_none()); } #[test] From 5cb90ae0e389dd3c468c810c85b8dd94b963cad8 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 11:55:43 -0700 Subject: [PATCH 26/44] Docs and pub --- src/registry.rs | 45 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index cb5fecb0..cf5641be 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -147,9 +147,34 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { } } -struct PrefixObjectStoreRegistry { +/// A function that takes a URL and returns a prefix for that URL +pub type PrefixFn = Box Result + Send + Sync>; + +/// A [`ObjectStoreRegistry`] that uses a prefix function to determine the prefix for a URL when +/// retrieving a store. Stores are registered with a prefix. When a user calls `get_store`, the +/// prefix function is applied to the supplied URL to determine the prefix for the URL. The +/// registered store with the matching prefix is then returned. +/// +/// ```rust +/// use std::sync::Arc; +/// use url::Url; +/// use object_store::ObjectStore; +/// use object_store::memory::InMemory; +/// use object_store::registry::{PrefixObjectStoreRegistry, ObjectStoreRegistry}; +/// +/// let store = Arc::new(InMemory::new()) as Arc; +/// let registry = PrefixObjectStoreRegistry::new(); +/// let parent_url = Url::parse("memory://").unwrap(); +/// let child_url = Url::parse("memory://child").unwrap(); +/// registry.register_store(&parent_url, Arc::clone(&store)); +/// let found_store = registry.get_store(&child_url).unwrap(); +/// assert!(Arc::ptr_eq(&found_store, &store)); +/// let found_url = registry.get_prefix(Arc::clone(&store)).unwrap(); +/// assert_eq!(found_url, parent_url); +/// ``` +pub struct PrefixObjectStoreRegistry { inner: DefaultObjectStoreRegistry, - prefix_fn: Box Result + Send + Sync>, + prefix_fn: PrefixFn, } impl std::fmt::Debug for PrefixObjectStoreRegistry { @@ -167,20 +192,26 @@ impl Default for PrefixObjectStoreRegistry { } impl PrefixObjectStoreRegistry { - fn new() -> Self { + /// Create a new [`PrefixObjectStoreRegistry`] with the default prefix function + pub fn new() -> Self { Self::with_prefix_fn(Box::new(Self::default_prefix_fn)) } - fn with_prefix_fn( - prefix_fn: Box Result + Send + Sync>, - ) -> Self { + /// Create a new [`PrefixObjectStoreRegistry`] with the provided prefix function + pub fn with_prefix_fn(prefix_fn: PrefixFn) -> Self { Self { inner: DefaultObjectStoreRegistry::new(), prefix_fn, } } - fn default_prefix_fn(url: &Url) -> Result { + /// The default prefix function. Returns a URL with in the format + /// `scheme://host:port`. For example: + /// + /// - `memory://` -> `memory://` + /// - `memory://child` -> `memory://child` + /// - `memory://child/grandchild` -> `memory://child` + pub fn default_prefix_fn(url: &Url) -> Result { let prefix = format!( "{}://{}", url.scheme(), From a00db859b14eab035efcdcf44aea816df6b12f8d Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 11:57:58 -0700 Subject: [PATCH 27/44] Fix rustdocs --- src/registry.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index cf5641be..92566c55 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -164,8 +164,8 @@ pub type PrefixFn = Box Result + Send + Sy /// /// let store = Arc::new(InMemory::new()) as Arc; /// let registry = PrefixObjectStoreRegistry::new(); -/// let parent_url = Url::parse("memory://").unwrap(); -/// let child_url = Url::parse("memory://child").unwrap(); +/// let parent_url = Url::parse("memory://parent").unwrap(); +/// let child_url = Url::parse("memory://parent/child").unwrap(); /// registry.register_store(&parent_url, Arc::clone(&store)); /// let found_store = registry.get_store(&child_url).unwrap(); /// assert!(Arc::ptr_eq(&found_store, &store)); From e468e6c71ef4330832698c3f095c1e15caad0067 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 12:10:35 -0700 Subject: [PATCH 28/44] Add tests for PrefixObjectStoreRegistry --- src/registry.rs | 159 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) diff --git a/src/registry.rs b/src/registry.rs index 92566c55..885a19eb 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -346,4 +346,163 @@ mod tests { let retrieved_store = registry.get_store(&different_scheme); assert!(retrieved_store.is_none()); } + + #[test] + fn test_default_prefix_fn() { + // Test simple URLs without hosts + let url = Url::parse("memory://").unwrap(); + let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); + assert_eq!(prefix.as_str(), "memory://"); + + // Test s3 URLs + let url = Url::parse("s3://bucket").unwrap(); + let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); + assert_eq!(prefix.as_str(), "s3://bucket"); + + // Test s3 URLs with path + let url = Url::parse("s3://bucket/path/to/file").unwrap(); + let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); + assert_eq!(prefix.as_str(), "s3://bucket"); + + // Test http URLs with port + let url = Url::parse("http://example.com:8080/path").unwrap(); + let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); + assert_eq!(prefix.as_str(), "http://example.com:8080/"); + + // Test http URLs user, pass, and port + let url = Url::parse("http://user:pass@example.com:8080/path").unwrap(); + let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); + assert_eq!(prefix.as_str(), "http://example.com:8080/"); + + // Test file URLs + let url = Url::parse("file:///path/to/file").unwrap(); + let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); + assert_eq!(prefix.as_str(), "file:///"); + } + + #[test] + fn test_prefix_registry_register_store() { + let registry = PrefixObjectStoreRegistry::new(); + let url = Url::parse("memory://foo").unwrap(); + let store = Arc::new(InMemory::new()) as Arc; + + // Test initial registration + let old_store = registry.register_store(&url, Arc::clone(&store)); + assert!(old_store.is_none()); + + // Test retrieval + let child_url = Url::parse("memory://foo/bar").unwrap(); + let retrieved_store = registry.get_store(&child_url).unwrap(); + assert!(Arc::ptr_eq(&retrieved_store, &store)); + + // Test parent miss + let parent_url = Url::parse("memory://").unwrap(); + assert!(registry.get_store(&parent_url).is_none()); + } + + #[test] + fn test_prefix_registry_get_store() { + let registry = PrefixObjectStoreRegistry::new(); + + // Register a store with a prefix + let prefix_url = Url::parse("s3://mybucket").unwrap(); + let store = Arc::new(InMemory::new()) as Arc; + registry.register_store(&prefix_url, Arc::clone(&store)); + + // Test with exact match + let exact_url = Url::parse("s3://mybucket").unwrap(); + let retrieved_store = registry.get_store(&exact_url).unwrap(); + assert!(Arc::ptr_eq(&retrieved_store, &store)); + + // Test with path - should still match because of prefix + let path_url = Url::parse("s3://mybucket/path/to/object").unwrap(); + let retrieved_store = registry.get_store(&path_url).unwrap(); + assert!(Arc::ptr_eq(&retrieved_store, &store)); + + // Test with different bucket - should not match + let different_url = Url::parse("s3://otherbucket/path").unwrap(); + assert!(registry.get_store(&different_url).is_none()); + + // Test with different scheme - should not match + let different_scheme = Url::parse("file:///path").unwrap(); + assert!(registry.get_store(&different_scheme).is_none()); + } + + #[test] + fn test_prefix_registry_get_prefix() { + let registry = PrefixObjectStoreRegistry::new(); + let url = Url::parse("memory://foo").unwrap(); + let store = Arc::new(InMemory::new()) as Arc; + + // Register store + registry.register_store(&url, Arc::clone(&store)); + + // Test get_prefix retrieves the correct URL + let retrieved_url = registry.get_prefix(Arc::clone(&store)).unwrap(); + assert_eq!(retrieved_url, url); + + // Test with unregistered store + let unregistered_store = Arc::new(InMemory::new()) as Arc; + assert!(registry.get_prefix(unregistered_store).is_none()); + } + + #[test] + fn test_prefix_registry_get_store_prefixes() { + let registry = PrefixObjectStoreRegistry::new(); + + // Register multiple stores + let url1 = Url::parse("s3://bucket1").unwrap(); + let url2 = Url::parse("s3://bucket2").unwrap(); + let url3 = Url::parse("inmemory://test").unwrap(); + + registry.register_store(&url1, Arc::new(InMemory::new())); + registry.register_store(&url2, Arc::new(InMemory::new())); + registry.register_store(&url3, Arc::new(InMemory::new())); + + // Get all prefixes + let prefixes = registry.get_store_prefixes(); + + // Verify number of prefixes + assert_eq!(prefixes.len(), 3); + + // Verify all registered URLs are in the result + assert!(prefixes.contains(&url1)); + assert!(prefixes.contains(&url2)); + assert!(prefixes.contains(&url3)); + } + + #[test] + fn test_prefix_registry_with_custom_prefix_fn() { + // Create a custom prefix function that adds a path segment + let custom_prefix_fn: PrefixFn = Box::new(|url| { + let mut prefix = PrefixObjectStoreRegistry::default_prefix_fn(url)?; + prefix.set_path("/custom"); + Ok(prefix) + }); + + let registry = PrefixObjectStoreRegistry::with_prefix_fn(custom_prefix_fn); + + // Register a store with this prefix + let url = Url::parse("s3://bucket/custom").unwrap(); + let store = Arc::new(InMemory::new()) as Arc; + registry.register_store(&url, Arc::clone(&store)); + + // Test lookup with a URL that should map to the custom prefix + let lookup_url = Url::parse("s3://bucket/path/to/object").unwrap(); + + // When using our custom_prefix_fn, the URL for lookup_url maps to s3://bucket/custom + // which is our registered URL, so it should return the store + let retrieved_store = registry.get_store(&lookup_url); + assert!(retrieved_store.is_some()); + assert!(Arc::ptr_eq(&retrieved_store.unwrap(), &store)); + + // Another URL that should map to the same store + let custom_path_url = Url::parse("s3://bucket/custom/file.txt").unwrap(); + let retrieved_store = registry.get_store(&custom_path_url); + + // This URL will map to s3://bucket/custom via our custom prefix function, + // which matches our registered URL, so it should return the store + assert!(retrieved_store.is_some()); + assert!(Arc::ptr_eq(&retrieved_store.unwrap(), &store)); + } } From 4ef9923b088b852c90c003310b7a946dd1138fd8 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 12:12:32 -0700 Subject: [PATCH 29/44] Clippy --- src/registry.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 885a19eb..cd3feeee 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -233,8 +233,7 @@ impl ObjectStoreRegistry for PrefixObjectStoreRegistry { fn get_store(&self, url: &Url) -> Option> { (self.prefix_fn)(url) .ok() - .map(|prefix| self.inner.get_store(&prefix)) - .flatten() + .and_then(|prefix| self.inner.get_store(&prefix)) } fn get_prefix(&self, store: Arc) -> Option { From fe04c89f31f41cc715d20b203993af84559140c9 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 14:30:56 -0700 Subject: [PATCH 30/44] Add a more builder-ish pattern for prefix reg --- src/registry.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index cd3feeee..72a1325c 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -194,17 +194,18 @@ impl Default for PrefixObjectStoreRegistry { impl PrefixObjectStoreRegistry { /// Create a new [`PrefixObjectStoreRegistry`] with the default prefix function pub fn new() -> Self { - Self::with_prefix_fn(Box::new(Self::default_prefix_fn)) - } - - /// Create a new [`PrefixObjectStoreRegistry`] with the provided prefix function - pub fn with_prefix_fn(prefix_fn: PrefixFn) -> Self { Self { inner: DefaultObjectStoreRegistry::new(), - prefix_fn, + prefix_fn: Box::new(Self::default_prefix_fn), } } + /// Create a new [`PrefixObjectStoreRegistry`] with the provided prefix function + pub fn with_prefix_fn(mut self, prefix_fn: PrefixFn) -> Self { + self.prefix_fn = prefix_fn; + self + } + /// The default prefix function. Returns a URL with in the format /// `scheme://host:port`. For example: /// @@ -479,7 +480,7 @@ mod tests { Ok(prefix) }); - let registry = PrefixObjectStoreRegistry::with_prefix_fn(custom_prefix_fn); + let registry = PrefixObjectStoreRegistry::new().with_prefix_fn(custom_prefix_fn); // Register a store with this prefix let url = Url::parse("s3://bucket/custom").unwrap(); From e8decc82e84a43dff98e593accb7f4c4d99a7caf Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 16:59:35 -0700 Subject: [PATCH 31/44] Add Parsing object store registry --- src/registry.rs | 220 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 192 insertions(+), 28 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 72a1325c..00a84522 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -18,25 +18,8 @@ //! ObjectStoreRegistry holds object stores at runtime with a URL for each store. //! The registry serves as a cache for object stores to avoid repeated creation. //! It also lets you convert an [`ObjectStore`] back to its registered URL via -//! `get_prefix`: -//! -//! ```rust -//! use std::sync::Arc; -//! use url::Url; -//! use object_store::ObjectStore; -//! use object_store::memory::InMemory; -//! use object_store::registry::{DefaultObjectStoreRegistry, ObjectStoreRegistry}; -//! -//! let store = Arc::new(InMemory::new()) as Arc; -//! let registry = DefaultObjectStoreRegistry::new(); -//! let url = Url::parse("inmemory://").unwrap(); -//! registry.register_store(&url, Arc::clone(&store)); -//! let found_store = registry.get_store(&url).unwrap(); -//! assert!(Arc::ptr_eq(&found_store, &store)); -//! let found_url = registry.get_prefix(Arc::clone(&store)).unwrap(); -//! assert_eq!(found_url, url); -//! ``` -use crate::ObjectStore; +//! `get_prefix`. +use crate::{parse_url, ObjectStore}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use url::Url; @@ -246,15 +229,147 @@ impl ObjectStoreRegistry for PrefixObjectStoreRegistry { } } +type ParserFn = Box Result, super::Error> + Send + Sync>; + +/// An [`ObjectStoreRegistry`] implementation that uses prefix matching and dynamic +/// parsing to construct ObjectStores on demand. +/// +/// This registry builds on top of [`PrefixObjectStoreRegistry`] but will automatically +/// create and register new object stores using the [`crate::parse::parse_url_opts`] +/// function when a URL doesn't match any registered prefix. +/// +/// # Examples +/// +/// ```rust +/// # #[cfg(feature = "http")] +/// use std::sync::Arc; +/// use url::Url; +/// use object_store::ObjectStore; +/// use object_store::registry::{ParserObjectStoreRegistry, ObjectStoreRegistry}; +/// +/// let registry = ParserObjectStoreRegistry::new(); +/// let url = "http://localhost:8080".parse::().unwrap(); +/// let store = registry.get_store(&url).unwrap(); +/// let prefix = registry.get_prefix(store).unwrap(); +/// assert_eq!(prefix.as_str(), "http://localhost:8080/"); +/// ``` +/// +/// And using HTTP with custom opts: +/// +/// ```rust +/// # #[cfg(feature = "http")] +/// use std::sync::Arc; +/// use url::Url; +/// use object_store::{parse_url_opts, ObjectStore}; +/// use object_store::registry::{ParserObjectStoreRegistry, ObjectStoreRegistry}; +/// +/// let registry = ParserObjectStoreRegistry::default().with_parser_fn(Box::new( +/// |url| match parse_url_opts( +/// url, +/// vec![("user_agent", "test_url"), ("allow_http", "true")], +/// ) { +/// Ok((store, _)) => Ok(store), +/// Err(e) => Err(e), +/// }, +/// )); +/// let url = "http://foo:bar@host:123/path".parse::().unwrap(); +/// // Custom `user_agent` and `allow_http` options are passed to the HttpStore +/// let store = registry.get_store(&url).unwrap(); +/// ``` +pub struct ParserObjectStoreRegistry { + /// Inner registry for prefix based lookup + inner: PrefixObjectStoreRegistry, + /// Options to be passed to parse_url_opts + parser_fn: ParserFn, +} + +impl std::fmt::Debug for ParserObjectStoreRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParserObjectStoreRegistry") + .field("inner", &self.inner) + .finish() + } +} + +impl Default for ParserObjectStoreRegistry { + fn default() -> Self { + Self::new() + } +} + +impl ParserObjectStoreRegistry { + /// Create a new [`ParserObjectStoreRegistry`] with the default prefix function + pub fn new() -> Self { + Self { + inner: PrefixObjectStoreRegistry::new(), + parser_fn: Self::default_parser_fn(), + } + } + + /// Create a new [`ParserObjectStoreRegistry`] with the provided prefix function + pub fn with_prefix_fn(mut self, prefix_fn: PrefixFn) -> Self { + self.inner = self.inner.with_prefix_fn(prefix_fn); + self + } + + /// Register options to be used with a specific URL prefix when creating stores dynamically + pub fn with_parser_fn( + mut self, + parser_fn: Box Result, super::Error> + Send + Sync>, + ) -> Self { + self.parser_fn = parser_fn; + self + } + + fn default_parser_fn() -> ParserFn { + Box::new(|url| parse_url(url).map(|(store, _)| store)) + } +} + +impl ObjectStoreRegistry for ParserObjectStoreRegistry { + fn register_store( + &self, + prefix: &Url, + store: Arc, + ) -> Option> { + self.inner.register_store(prefix, store) + } + + fn get_store(&self, url: &Url) -> Option> { + let prefix = (self.inner.prefix_fn)(url).unwrap(); + + if let Some(store) = self.inner.get_store(&prefix) { + return Some(store); + } + + match (self.parser_fn)(url) { + Ok(store) => { + let store = Arc::new(store); + self.register_store(&prefix, store.clone()); + Some(store) + } + Err(_) => None, + } + } + + fn get_prefix(&self, store: Arc) -> Option { + self.inner.get_prefix(store) + } + + fn get_store_prefixes(&self) -> Vec { + self.inner.get_store_prefixes() + } +} + #[cfg(test)] mod tests { use super::*; - use crate::memory::InMemory; + use crate::{memory::InMemory, path::Path}; #[test] fn test_register_store() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("inmemory://foo").unwrap(); + let url = Url::parse("memory://foo").unwrap(); let store = Arc::new(InMemory::new()) as Arc; let old_store = registry.register_store(&url, Arc::clone(&store)); assert!(old_store.is_none()); @@ -265,7 +380,7 @@ mod tests { #[test] fn test_reregister_store() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("inmemory://foo").unwrap(); + let url = Url::parse("memory://foo").unwrap(); let store = Arc::new(InMemory::new()) as Arc; let old_store = registry.register_store(&url, Arc::clone(&store)); assert!(old_store.is_none()); @@ -276,14 +391,14 @@ mod tests { #[test] fn test_get_store_miss() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("inmemory://foo").unwrap(); + let url = Url::parse("memory://foo").unwrap(); assert!(registry.get_store(&url).is_none()); } #[test] fn test_get_prefix_round_trip() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("inmemory://").unwrap(); + let url = Url::parse("memory://").unwrap(); let store = Arc::new(InMemory::new()) as Arc; registry.register_store(&url, Arc::clone(&store)); assert_eq!(registry.get_prefix(Arc::clone(&store)).unwrap(), url); @@ -299,7 +414,7 @@ mod tests { #[test] fn test_list_urls() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("inmemory://foo").unwrap(); + let url = Url::parse("memory://foo").unwrap(); let store = Arc::new(InMemory::new()) as Arc; registry.register_store(&url, store); let urls = registry.get_store_prefixes(); @@ -310,10 +425,10 @@ mod tests { #[test] fn test_subprefix_miss() { let registry = DefaultObjectStoreRegistry::new(); - let base_url = Url::parse("inmemory://foo").unwrap(); + let base_url = Url::parse("memory://foo").unwrap(); let store = Arc::new(InMemory::new()) as Arc; registry.register_store(&base_url, Arc::clone(&store)); - let subprefix_url = Url::parse("inmemory://foo/bar").unwrap(); + let subprefix_url = Url::parse("memory://foo/bar").unwrap(); let retrieved_store = registry.get_store(&subprefix_url); assert!(retrieved_store.is_none()); } @@ -453,7 +568,7 @@ mod tests { // Register multiple stores let url1 = Url::parse("s3://bucket1").unwrap(); let url2 = Url::parse("s3://bucket2").unwrap(); - let url3 = Url::parse("inmemory://test").unwrap(); + let url3 = Url::parse("memory://test").unwrap(); registry.register_store(&url1, Arc::new(InMemory::new())); registry.register_store(&url2, Arc::new(InMemory::new())); @@ -505,4 +620,53 @@ mod tests { assert!(retrieved_store.is_some()); assert!(Arc::ptr_eq(&retrieved_store.unwrap(), &store)); } + + #[test] + fn test_parser_object_store_registry() { + let registry = ParserObjectStoreRegistry::default(); + let url = Url::parse("memory://").unwrap(); + let store = registry.get_store(&url); + assert!(store.is_some()); + } + + #[test] + fn test_parser_bad_scheme() { + let registry = ParserObjectStoreRegistry::default(); + let url = Url::parse("bad://").unwrap(); + let store = registry.get_store(&url); + assert!(store.is_none()); + } + + #[cfg(all(feature = "http", feature = "cloud", not(target_arch = "wasm32")))] + #[tokio::test] + async fn test_parser_object_store_registry_with_opts() { + use crate::client::mock_server::MockServer; + use crate::http::HttpStore; + use crate::parse::parse_url_opts; + use http::{header::USER_AGENT, Response}; + + let server = MockServer::new().await; + + server.push_fn(|r| { + assert_eq!(r.uri().path(), "/foo/bar"); + assert_eq!(r.headers().get(USER_AGENT).unwrap(), "test_url"); + Response::new(String::new()) + }); + + let registry = + ParserObjectStoreRegistry::default().with_parser_fn(Box::new( + |url| match parse_url_opts( + url, + vec![("user_agent", "test_url"), ("allow_http", "true")], + ) { + Ok((store, _)) => Ok(store), + Err(e) => Err(e), + }, + )); + let url = Url::parse(format!("{}/foo/bar", server.url()).as_str()).unwrap(); + let store = registry.get_store(&url).unwrap(); + let prefix = registry.get_prefix(store.clone()).unwrap(); + store.get(&Path::from("/foo/bar")).await.unwrap(); + server.shutdown().await; + } } From 21451599f2b0b42d4a55556d0bd5ae86e8b82553 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 18:18:39 -0700 Subject: [PATCH 32/44] Add proper test for parser object store --- src/registry.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 00a84522..d21e3df4 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -273,7 +273,7 @@ type ParserFn = Box Result, super::Error> + /// }, /// )); /// let url = "http://foo:bar@host:123/path".parse::().unwrap(); -/// // Custom `user_agent` and `allow_http` options are passed to the HttpStore +/// // Custom `user_agent` and `allow_http` options are set in this store /// let store = registry.get_store(&url).unwrap(); /// ``` pub struct ParserObjectStoreRegistry { @@ -646,13 +646,12 @@ mod tests { use http::{header::USER_AGENT, Response}; let server = MockServer::new().await; - - server.push_fn(|r| { - assert_eq!(r.uri().path(), "/foo/bar"); - assert_eq!(r.headers().get(USER_AGENT).unwrap(), "test_url"); + let (tx, rx) = tokio::sync::oneshot::channel(); + server.push_fn(move |req| { + let hdr = req.headers().get(USER_AGENT).cloned(); + let _ = tx.send(hdr); Response::new(String::new()) }); - let registry = ParserObjectStoreRegistry::default().with_parser_fn(Box::new( |url| match parse_url_opts( @@ -665,8 +664,9 @@ mod tests { )); let url = Url::parse(format!("{}/foo/bar", server.url()).as_str()).unwrap(); let store = registry.get_store(&url).unwrap(); - let prefix = registry.get_prefix(store.clone()).unwrap(); - store.get(&Path::from("/foo/bar")).await.unwrap(); + let _ = store.get(&Path::from("/foo/bar")).await.unwrap(); + let got = rx.await.expect("handler never ran"); + assert_eq!(got.unwrap(), "test_url"); server.shutdown().await; } } From 39efe633e8e5711a1ceb68308c3c5657af4f586b Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 18:31:30 -0700 Subject: [PATCH 33/44] Misc cleanup --- src/registry.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index d21e3df4..f2940cac 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -138,7 +138,7 @@ pub type PrefixFn = Box Result + Send + Sy /// prefix function is applied to the supplied URL to determine the prefix for the URL. The /// registered store with the matching prefix is then returned. /// -/// ```rust +/// ``` /// use std::sync::Arc; /// use url::Url; /// use object_store::ObjectStore; @@ -240,8 +240,8 @@ type ParserFn = Box Result, super::Error> + /// /// # Examples /// -/// ```rust -/// # #[cfg(feature = "http")] +/// ``` +/// # #[cfg(all(feature = "http", feature = "cloud"))] { /// use std::sync::Arc; /// use url::Url; /// use object_store::ObjectStore; @@ -252,12 +252,13 @@ type ParserFn = Box Result, super::Error> + /// let store = registry.get_store(&url).unwrap(); /// let prefix = registry.get_prefix(store).unwrap(); /// assert_eq!(prefix.as_str(), "http://localhost:8080/"); +/// # } /// ``` /// /// And using HTTP with custom opts: /// -/// ```rust -/// # #[cfg(feature = "http")] +/// ``` +/// # #[cfg(all(feature = "http", feature = "cloud"))] { /// use std::sync::Arc; /// use url::Url; /// use object_store::{parse_url_opts, ObjectStore}; @@ -275,6 +276,7 @@ type ParserFn = Box Result, super::Error> + /// let url = "http://foo:bar@host:123/path".parse::().unwrap(); /// // Custom `user_agent` and `allow_http` options are set in this store /// let store = registry.get_store(&url).unwrap(); +/// # } /// ``` pub struct ParserObjectStoreRegistry { /// Inner registry for prefix based lookup @@ -364,7 +366,7 @@ impl ObjectStoreRegistry for ParserObjectStoreRegistry { #[cfg(test)] mod tests { use super::*; - use crate::{memory::InMemory, path::Path}; + use crate::memory::InMemory; #[test] fn test_register_store() { @@ -643,6 +645,7 @@ mod tests { use crate::client::mock_server::MockServer; use crate::http::HttpStore; use crate::parse::parse_url_opts; + use crate::path::Path; use http::{header::USER_AGENT, Response}; let server = MockServer::new().await; From 970dc13dcdd1852b64bcfbb7a41e2a947c42630c Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 18:32:47 -0700 Subject: [PATCH 34/44] Clippy! --- src/registry.rs | 9 +++------ src/throttle.rs | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index f2940cac..b6b3d482 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -315,10 +315,7 @@ impl ParserObjectStoreRegistry { } /// Register options to be used with a specific URL prefix when creating stores dynamically - pub fn with_parser_fn( - mut self, - parser_fn: Box Result, super::Error> + Send + Sync>, - ) -> Self { + pub fn with_parser_fn(mut self, parser_fn: ParserFn) -> Self { self.parser_fn = parser_fn; self } @@ -347,7 +344,7 @@ impl ObjectStoreRegistry for ParserObjectStoreRegistry { match (self.parser_fn)(url) { Ok(store) => { let store = Arc::new(store); - self.register_store(&prefix, store.clone()); + self.register_store(&prefix, Arc::clone(&store)); Some(store) } Err(_) => None, @@ -643,7 +640,7 @@ mod tests { #[tokio::test] async fn test_parser_object_store_registry_with_opts() { use crate::client::mock_server::MockServer; - use crate::http::HttpStore; + use crate::parse::parse_url_opts; use crate::path::Path; use http::{header::USER_AGENT, Response}; diff --git a/src/throttle.rs b/src/throttle.rs index efe29491..b8ff0652 100644 --- a/src/throttle.rs +++ b/src/throttle.rs @@ -404,7 +404,7 @@ impl MultipartUpload for ThrottledUpload { #[cfg(test)] mod tests { use super::*; - use crate::{integration::*, memory::InMemory, GetResultPayload}; + use crate::{integration::*, memory::InMemory}; use futures::TryStreamExt; use tokio::time::Duration; use tokio::time::Instant; From 2f87a22f344f80df0cd1b6715f39fce8322ef453 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 18:37:12 -0700 Subject: [PATCH 35/44] Revert silly clone --- src/registry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/registry.rs b/src/registry.rs index b6b3d482..04c92edc 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -344,7 +344,7 @@ impl ObjectStoreRegistry for ParserObjectStoreRegistry { match (self.parser_fn)(url) { Ok(store) => { let store = Arc::new(store); - self.register_store(&prefix, Arc::clone(&store)); + self.register_store(&prefix, store.clone()); Some(store) } Err(_) => None, From 227dcba74aa90f8004c4e82687628c96c9f7baaa Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 18:38:11 -0700 Subject: [PATCH 36/44] More clippy sigh --- src/registry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/registry.rs b/src/registry.rs index 04c92edc..d8faf49f 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -344,7 +344,7 @@ impl ObjectStoreRegistry for ParserObjectStoreRegistry { match (self.parser_fn)(url) { Ok(store) => { let store = Arc::new(store); - self.register_store(&prefix, store.clone()); + self.register_store(&prefix, Arc::>::clone(&store)); Some(store) } Err(_) => None, From b7937cc87ba193b10ada0ace869fd2fa622c2fcf Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Mon, 12 May 2025 18:50:34 -0700 Subject: [PATCH 37/44] Fix test_url_http --- src/parse.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/parse.rs b/src/parse.rs index e37f85b5..3c115c67 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -354,10 +354,10 @@ mod tests { use http::{header::USER_AGENT, Response}; let server = MockServer::new().await; - - server.push_fn(|r| { - assert_eq!(r.uri().path(), "/foo/bar"); - assert_eq!(r.headers().get(USER_AGENT).unwrap(), "test_url"); + let (tx, rx) = tokio::sync::oneshot::channel(); + server.push_fn(move |req| { + let hdr = req.headers().get(USER_AGENT).cloned(); + let _ = tx.send(hdr); Response::new(String::new()) }); @@ -366,7 +366,9 @@ mod tests { let url = test.parse().unwrap(); let (store, path) = parse_url_opts(&url, opts).unwrap(); assert_eq!(path.as_ref(), "foo/bar"); - store.get(&path).await.unwrap(); + let _ = store.get(&Path::from("/foo/bar")).await.unwrap(); + let got = rx.await.expect("handler never ran"); + assert_eq!(got.unwrap(), "test_url"); server.shutdown().await; } From a87ecd257218dd6661b6a250a855ab87af4f9201 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 13 May 2025 11:43:21 -0700 Subject: [PATCH 38/44] Remove get_prefix method --- src/registry.rs | 65 ------------------------------------------------- 1 file changed, 65 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index d8faf49f..4172adf3 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -17,8 +17,6 @@ //! ObjectStoreRegistry holds object stores at runtime with a URL for each store. //! The registry serves as a cache for object stores to avoid repeated creation. -//! It also lets you convert an [`ObjectStore`] back to its registered URL via -//! `get_prefix`. use crate::{parse_url, ObjectStore}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -45,9 +43,6 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// implementation. fn get_store(&self, url: &Url) -> Option>; - /// Given one of the `Arc`s you registered, return its URL. - fn get_prefix(&self, store: Arc) -> Option; - /// List all registered store prefixes fn get_store_prefixes(&self) -> Vec; } @@ -109,20 +104,6 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { stores.get(url).map(Arc::clone) } - /// Given one of the `Arc`s you registered, return its URL. - /// - /// If no store was registered with the provided `Arc`, `None` is returned. - fn get_prefix(&self, store: Arc) -> Option { - let map = self.object_stores.read().unwrap(); - // scan for pointer-equal entry - for (url, registered) in map.iter() { - if Arc::ptr_eq(&store, registered) { - return Some(url.clone()); - } - } - None - } - /// Returns a vector of all registered store prefixes. fn get_store_prefixes(&self) -> Vec { let stores = self.object_stores.read().unwrap(); @@ -152,8 +133,6 @@ pub type PrefixFn = Box Result + Send + Sy /// registry.register_store(&parent_url, Arc::clone(&store)); /// let found_store = registry.get_store(&child_url).unwrap(); /// assert!(Arc::ptr_eq(&found_store, &store)); -/// let found_url = registry.get_prefix(Arc::clone(&store)).unwrap(); -/// assert_eq!(found_url, parent_url); /// ``` pub struct PrefixObjectStoreRegistry { inner: DefaultObjectStoreRegistry, @@ -220,10 +199,6 @@ impl ObjectStoreRegistry for PrefixObjectStoreRegistry { .and_then(|prefix| self.inner.get_store(&prefix)) } - fn get_prefix(&self, store: Arc) -> Option { - self.inner.get_prefix(store) - } - fn get_store_prefixes(&self) -> Vec { self.inner.get_store_prefixes() } @@ -250,8 +225,6 @@ type ParserFn = Box Result, super::Error> + /// let registry = ParserObjectStoreRegistry::new(); /// let url = "http://localhost:8080".parse::().unwrap(); /// let store = registry.get_store(&url).unwrap(); -/// let prefix = registry.get_prefix(store).unwrap(); -/// assert_eq!(prefix.as_str(), "http://localhost:8080/"); /// # } /// ``` /// @@ -351,10 +324,6 @@ impl ObjectStoreRegistry for ParserObjectStoreRegistry { } } - fn get_prefix(&self, store: Arc) -> Option { - self.inner.get_prefix(store) - } - fn get_store_prefixes(&self) -> Vec { self.inner.get_store_prefixes() } @@ -394,22 +363,6 @@ mod tests { assert!(registry.get_store(&url).is_none()); } - #[test] - fn test_get_prefix_round_trip() { - let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("memory://").unwrap(); - let store = Arc::new(InMemory::new()) as Arc; - registry.register_store(&url, Arc::clone(&store)); - assert_eq!(registry.get_prefix(Arc::clone(&store)).unwrap(), url); - } - - #[test] - fn test_get_prefix_miss() { - let registry = DefaultObjectStoreRegistry::new(); - let store = Arc::new(InMemory::new()) as Arc; - assert!(registry.get_prefix(store).is_none()); - } - #[test] fn test_list_urls() { let registry = DefaultObjectStoreRegistry::new(); @@ -542,24 +495,6 @@ mod tests { assert!(registry.get_store(&different_scheme).is_none()); } - #[test] - fn test_prefix_registry_get_prefix() { - let registry = PrefixObjectStoreRegistry::new(); - let url = Url::parse("memory://foo").unwrap(); - let store = Arc::new(InMemory::new()) as Arc; - - // Register store - registry.register_store(&url, Arc::clone(&store)); - - // Test get_prefix retrieves the correct URL - let retrieved_url = registry.get_prefix(Arc::clone(&store)).unwrap(); - assert_eq!(retrieved_url, url); - - // Test with unregistered store - let unregistered_store = Arc::new(InMemory::new()) as Arc; - assert!(registry.get_prefix(unregistered_store).is_none()); - } - #[test] fn test_prefix_registry_get_store_prefixes() { let registry = PrefixObjectStoreRegistry::new(); From b9a97b623df5de0af0fb418284f32a4e329f529c Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 13 May 2025 11:44:53 -0700 Subject: [PATCH 39/44] Remove prefix and parser object store registries --- src/registry.rs | 411 +----------------------------------------------- 1 file changed, 1 insertion(+), 410 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 4172adf3..d2fd6aaa 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -17,7 +17,7 @@ //! ObjectStoreRegistry holds object stores at runtime with a URL for each store. //! The registry serves as a cache for object stores to avoid repeated creation. -use crate::{parse_url, ObjectStore}; +use crate::ObjectStore; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use url::Url; @@ -111,224 +111,6 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { } } -/// A function that takes a URL and returns a prefix for that URL -pub type PrefixFn = Box Result + Send + Sync>; - -/// A [`ObjectStoreRegistry`] that uses a prefix function to determine the prefix for a URL when -/// retrieving a store. Stores are registered with a prefix. When a user calls `get_store`, the -/// prefix function is applied to the supplied URL to determine the prefix for the URL. The -/// registered store with the matching prefix is then returned. -/// -/// ``` -/// use std::sync::Arc; -/// use url::Url; -/// use object_store::ObjectStore; -/// use object_store::memory::InMemory; -/// use object_store::registry::{PrefixObjectStoreRegistry, ObjectStoreRegistry}; -/// -/// let store = Arc::new(InMemory::new()) as Arc; -/// let registry = PrefixObjectStoreRegistry::new(); -/// let parent_url = Url::parse("memory://parent").unwrap(); -/// let child_url = Url::parse("memory://parent/child").unwrap(); -/// registry.register_store(&parent_url, Arc::clone(&store)); -/// let found_store = registry.get_store(&child_url).unwrap(); -/// assert!(Arc::ptr_eq(&found_store, &store)); -/// ``` -pub struct PrefixObjectStoreRegistry { - inner: DefaultObjectStoreRegistry, - prefix_fn: PrefixFn, -} - -impl std::fmt::Debug for PrefixObjectStoreRegistry { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PrefixObjectStoreRegistry") - .field("inner", &self.inner) - .finish() - } -} - -impl Default for PrefixObjectStoreRegistry { - fn default() -> Self { - Self::new() - } -} - -impl PrefixObjectStoreRegistry { - /// Create a new [`PrefixObjectStoreRegistry`] with the default prefix function - pub fn new() -> Self { - Self { - inner: DefaultObjectStoreRegistry::new(), - prefix_fn: Box::new(Self::default_prefix_fn), - } - } - - /// Create a new [`PrefixObjectStoreRegistry`] with the provided prefix function - pub fn with_prefix_fn(mut self, prefix_fn: PrefixFn) -> Self { - self.prefix_fn = prefix_fn; - self - } - - /// The default prefix function. Returns a URL with in the format - /// `scheme://host:port`. For example: - /// - /// - `memory://` -> `memory://` - /// - `memory://child` -> `memory://child` - /// - `memory://child/grandchild` -> `memory://child` - pub fn default_prefix_fn(url: &Url) -> Result { - let prefix = format!( - "{}://{}", - url.scheme(), - &url[url::Position::BeforeHost..url::Position::AfterPort], - ); - Url::parse(&prefix) - } -} - -impl ObjectStoreRegistry for PrefixObjectStoreRegistry { - fn register_store( - &self, - prefix: &Url, - store: Arc, - ) -> Option> { - self.inner.register_store(prefix, store) - } - - fn get_store(&self, url: &Url) -> Option> { - (self.prefix_fn)(url) - .ok() - .and_then(|prefix| self.inner.get_store(&prefix)) - } - - fn get_store_prefixes(&self) -> Vec { - self.inner.get_store_prefixes() - } -} - -type ParserFn = Box Result, super::Error> + Send + Sync>; - -/// An [`ObjectStoreRegistry`] implementation that uses prefix matching and dynamic -/// parsing to construct ObjectStores on demand. -/// -/// This registry builds on top of [`PrefixObjectStoreRegistry`] but will automatically -/// create and register new object stores using the [`crate::parse::parse_url_opts`] -/// function when a URL doesn't match any registered prefix. -/// -/// # Examples -/// -/// ``` -/// # #[cfg(all(feature = "http", feature = "cloud"))] { -/// use std::sync::Arc; -/// use url::Url; -/// use object_store::ObjectStore; -/// use object_store::registry::{ParserObjectStoreRegistry, ObjectStoreRegistry}; -/// -/// let registry = ParserObjectStoreRegistry::new(); -/// let url = "http://localhost:8080".parse::().unwrap(); -/// let store = registry.get_store(&url).unwrap(); -/// # } -/// ``` -/// -/// And using HTTP with custom opts: -/// -/// ``` -/// # #[cfg(all(feature = "http", feature = "cloud"))] { -/// use std::sync::Arc; -/// use url::Url; -/// use object_store::{parse_url_opts, ObjectStore}; -/// use object_store::registry::{ParserObjectStoreRegistry, ObjectStoreRegistry}; -/// -/// let registry = ParserObjectStoreRegistry::default().with_parser_fn(Box::new( -/// |url| match parse_url_opts( -/// url, -/// vec![("user_agent", "test_url"), ("allow_http", "true")], -/// ) { -/// Ok((store, _)) => Ok(store), -/// Err(e) => Err(e), -/// }, -/// )); -/// let url = "http://foo:bar@host:123/path".parse::().unwrap(); -/// // Custom `user_agent` and `allow_http` options are set in this store -/// let store = registry.get_store(&url).unwrap(); -/// # } -/// ``` -pub struct ParserObjectStoreRegistry { - /// Inner registry for prefix based lookup - inner: PrefixObjectStoreRegistry, - /// Options to be passed to parse_url_opts - parser_fn: ParserFn, -} - -impl std::fmt::Debug for ParserObjectStoreRegistry { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ParserObjectStoreRegistry") - .field("inner", &self.inner) - .finish() - } -} - -impl Default for ParserObjectStoreRegistry { - fn default() -> Self { - Self::new() - } -} - -impl ParserObjectStoreRegistry { - /// Create a new [`ParserObjectStoreRegistry`] with the default prefix function - pub fn new() -> Self { - Self { - inner: PrefixObjectStoreRegistry::new(), - parser_fn: Self::default_parser_fn(), - } - } - - /// Create a new [`ParserObjectStoreRegistry`] with the provided prefix function - pub fn with_prefix_fn(mut self, prefix_fn: PrefixFn) -> Self { - self.inner = self.inner.with_prefix_fn(prefix_fn); - self - } - - /// Register options to be used with a specific URL prefix when creating stores dynamically - pub fn with_parser_fn(mut self, parser_fn: ParserFn) -> Self { - self.parser_fn = parser_fn; - self - } - - fn default_parser_fn() -> ParserFn { - Box::new(|url| parse_url(url).map(|(store, _)| store)) - } -} - -impl ObjectStoreRegistry for ParserObjectStoreRegistry { - fn register_store( - &self, - prefix: &Url, - store: Arc, - ) -> Option> { - self.inner.register_store(prefix, store) - } - - fn get_store(&self, url: &Url) -> Option> { - let prefix = (self.inner.prefix_fn)(url).unwrap(); - - if let Some(store) = self.inner.get_store(&prefix) { - return Some(store); - } - - match (self.parser_fn)(url) { - Ok(store) => { - let store = Arc::new(store); - self.register_store(&prefix, Arc::>::clone(&store)); - Some(store) - } - Err(_) => None, - } - } - - fn get_store_prefixes(&self) -> Vec { - self.inner.get_store_prefixes() - } -} - #[cfg(test)] mod tests { use super::*; @@ -413,195 +195,4 @@ mod tests { let retrieved_store = registry.get_store(&different_scheme); assert!(retrieved_store.is_none()); } - - #[test] - fn test_default_prefix_fn() { - // Test simple URLs without hosts - let url = Url::parse("memory://").unwrap(); - let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); - assert_eq!(prefix.as_str(), "memory://"); - - // Test s3 URLs - let url = Url::parse("s3://bucket").unwrap(); - let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); - assert_eq!(prefix.as_str(), "s3://bucket"); - - // Test s3 URLs with path - let url = Url::parse("s3://bucket/path/to/file").unwrap(); - let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); - assert_eq!(prefix.as_str(), "s3://bucket"); - - // Test http URLs with port - let url = Url::parse("http://example.com:8080/path").unwrap(); - let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); - assert_eq!(prefix.as_str(), "http://example.com:8080/"); - - // Test http URLs user, pass, and port - let url = Url::parse("http://user:pass@example.com:8080/path").unwrap(); - let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); - assert_eq!(prefix.as_str(), "http://example.com:8080/"); - - // Test file URLs - let url = Url::parse("file:///path/to/file").unwrap(); - let prefix = PrefixObjectStoreRegistry::default_prefix_fn(&url).unwrap(); - assert_eq!(prefix.as_str(), "file:///"); - } - - #[test] - fn test_prefix_registry_register_store() { - let registry = PrefixObjectStoreRegistry::new(); - let url = Url::parse("memory://foo").unwrap(); - let store = Arc::new(InMemory::new()) as Arc; - - // Test initial registration - let old_store = registry.register_store(&url, Arc::clone(&store)); - assert!(old_store.is_none()); - - // Test retrieval - let child_url = Url::parse("memory://foo/bar").unwrap(); - let retrieved_store = registry.get_store(&child_url).unwrap(); - assert!(Arc::ptr_eq(&retrieved_store, &store)); - - // Test parent miss - let parent_url = Url::parse("memory://").unwrap(); - assert!(registry.get_store(&parent_url).is_none()); - } - - #[test] - fn test_prefix_registry_get_store() { - let registry = PrefixObjectStoreRegistry::new(); - - // Register a store with a prefix - let prefix_url = Url::parse("s3://mybucket").unwrap(); - let store = Arc::new(InMemory::new()) as Arc; - registry.register_store(&prefix_url, Arc::clone(&store)); - - // Test with exact match - let exact_url = Url::parse("s3://mybucket").unwrap(); - let retrieved_store = registry.get_store(&exact_url).unwrap(); - assert!(Arc::ptr_eq(&retrieved_store, &store)); - - // Test with path - should still match because of prefix - let path_url = Url::parse("s3://mybucket/path/to/object").unwrap(); - let retrieved_store = registry.get_store(&path_url).unwrap(); - assert!(Arc::ptr_eq(&retrieved_store, &store)); - - // Test with different bucket - should not match - let different_url = Url::parse("s3://otherbucket/path").unwrap(); - assert!(registry.get_store(&different_url).is_none()); - - // Test with different scheme - should not match - let different_scheme = Url::parse("file:///path").unwrap(); - assert!(registry.get_store(&different_scheme).is_none()); - } - - #[test] - fn test_prefix_registry_get_store_prefixes() { - let registry = PrefixObjectStoreRegistry::new(); - - // Register multiple stores - let url1 = Url::parse("s3://bucket1").unwrap(); - let url2 = Url::parse("s3://bucket2").unwrap(); - let url3 = Url::parse("memory://test").unwrap(); - - registry.register_store(&url1, Arc::new(InMemory::new())); - registry.register_store(&url2, Arc::new(InMemory::new())); - registry.register_store(&url3, Arc::new(InMemory::new())); - - // Get all prefixes - let prefixes = registry.get_store_prefixes(); - - // Verify number of prefixes - assert_eq!(prefixes.len(), 3); - - // Verify all registered URLs are in the result - assert!(prefixes.contains(&url1)); - assert!(prefixes.contains(&url2)); - assert!(prefixes.contains(&url3)); - } - - #[test] - fn test_prefix_registry_with_custom_prefix_fn() { - // Create a custom prefix function that adds a path segment - let custom_prefix_fn: PrefixFn = Box::new(|url| { - let mut prefix = PrefixObjectStoreRegistry::default_prefix_fn(url)?; - prefix.set_path("/custom"); - Ok(prefix) - }); - - let registry = PrefixObjectStoreRegistry::new().with_prefix_fn(custom_prefix_fn); - - // Register a store with this prefix - let url = Url::parse("s3://bucket/custom").unwrap(); - let store = Arc::new(InMemory::new()) as Arc; - registry.register_store(&url, Arc::clone(&store)); - - // Test lookup with a URL that should map to the custom prefix - let lookup_url = Url::parse("s3://bucket/path/to/object").unwrap(); - - // When using our custom_prefix_fn, the URL for lookup_url maps to s3://bucket/custom - // which is our registered URL, so it should return the store - let retrieved_store = registry.get_store(&lookup_url); - assert!(retrieved_store.is_some()); - assert!(Arc::ptr_eq(&retrieved_store.unwrap(), &store)); - - // Another URL that should map to the same store - let custom_path_url = Url::parse("s3://bucket/custom/file.txt").unwrap(); - let retrieved_store = registry.get_store(&custom_path_url); - - // This URL will map to s3://bucket/custom via our custom prefix function, - // which matches our registered URL, so it should return the store - assert!(retrieved_store.is_some()); - assert!(Arc::ptr_eq(&retrieved_store.unwrap(), &store)); - } - - #[test] - fn test_parser_object_store_registry() { - let registry = ParserObjectStoreRegistry::default(); - let url = Url::parse("memory://").unwrap(); - let store = registry.get_store(&url); - assert!(store.is_some()); - } - - #[test] - fn test_parser_bad_scheme() { - let registry = ParserObjectStoreRegistry::default(); - let url = Url::parse("bad://").unwrap(); - let store = registry.get_store(&url); - assert!(store.is_none()); - } - - #[cfg(all(feature = "http", feature = "cloud", not(target_arch = "wasm32")))] - #[tokio::test] - async fn test_parser_object_store_registry_with_opts() { - use crate::client::mock_server::MockServer; - - use crate::parse::parse_url_opts; - use crate::path::Path; - use http::{header::USER_AGENT, Response}; - - let server = MockServer::new().await; - let (tx, rx) = tokio::sync::oneshot::channel(); - server.push_fn(move |req| { - let hdr = req.headers().get(USER_AGENT).cloned(); - let _ = tx.send(hdr); - Response::new(String::new()) - }); - let registry = - ParserObjectStoreRegistry::default().with_parser_fn(Box::new( - |url| match parse_url_opts( - url, - vec![("user_agent", "test_url"), ("allow_http", "true")], - ) { - Ok((store, _)) => Ok(store), - Err(e) => Err(e), - }, - )); - let url = Url::parse(format!("{}/foo/bar", server.url()).as_str()).unwrap(); - let store = registry.get_store(&url).unwrap(); - let _ = store.get(&Path::from("/foo/bar")).await.unwrap(); - let got = rx.await.expect("handler never ran"); - assert_eq!(got.unwrap(), "test_url"); - server.shutdown().await; - } } From 25a156d70862b32cedf54bd142bf4f78b42516cc Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 13 May 2025 13:47:07 -0700 Subject: [PATCH 40/44] Use parse_opts with default object store registry --- src/registry.rs | 195 +++++++++++++++++++++++++++--------------------- 1 file changed, 111 insertions(+), 84 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index d2fd6aaa..d1f2d59d 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -17,44 +17,40 @@ //! ObjectStoreRegistry holds object stores at runtime with a URL for each store. //! The registry serves as a cache for object stores to avoid repeated creation. -use crate::ObjectStore; +use crate::{parse_url, Error, ObjectStore}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use url::Url; -/// [`ObjectStoreRegistry`] maps a URL prefix to an [`ObjectStore`] instance. The definition of -/// a URL prefix depends on the [`ObjectStoreRegistry`] implementation. See implementation docs -/// for more details. +/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance. The meaning of +/// a URL mapping depends on the [`ObjectStoreRegistry`] implementation. See implementation +/// docs for more details. pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { - /// Register a new store for any URL that begins with `prefix` + /// Register a new store for the provided URL /// - /// If a store with the same prefix existed before, it is replaced and returned + /// If a store with the same URL mapping exists before, it is replaced and returned along + /// with the mapped URL. fn register_store( &self, - prefix: &Url, + url: &Url, store: Arc, - ) -> Option>; + ) -> Option<(Arc, Url)>; - /// Get a store for the provided URL. The definition of a "suitable store" depends on - /// the [`ObjectStoreRegistry`] implementation. See implementation docs for more details. + /// Get a store for the provided URL. The input URL is mapped to an [`ObjectStore`] + /// instance based on the [`ObjectStoreRegistry`] implementation. See implementation docs + /// for more details. /// /// If no [`ObjectStore`] is found for the `url`, an [`ObjectStore`] may be lazily be /// created and registered. The logic for doing so is left to each [`ObjectStoreRegistry`] /// implementation. - fn get_store(&self, url: &Url) -> Option>; + fn get_store(&self, url: &Url) -> Result, Url)>, Error>; - /// List all registered store prefixes - fn get_store_prefixes(&self) -> Vec; + /// List all registered store URLs. These are the URL mappings for all registered stores. + fn get_store_urls(&self) -> Vec; } -/// A simple [`ObjectStoreRegistry`] implementation that has no prefix logic. It simply returns -/// a store registered with the provided URL if one exists. For example, if a store is registered -/// with `file:///foo`, then: -/// -/// - `file:///foo` will match -/// - `file://foo` will not match -/// - `file:///foo/bar` will not match -/// - `s3://foo` will not match +/// An [`ObjectStoreRegistry`] implementation that maps URLs to object stores using +/// `scheme://host:port`. pub struct DefaultObjectStoreRegistry { /// A map from URL to object store that serve list / read operations for the store object_stores: RwLock>>, @@ -64,7 +60,7 @@ impl std::fmt::Debug for DefaultObjectStoreRegistry { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let stores = self.object_stores.read().unwrap(); f.debug_struct("DefaultObjectStoreRegistry") - .field("schemes", &stores.keys().cloned().collect::>()) + .field("urls", &stores.keys().cloned().collect::>()) .finish() } } @@ -76,11 +72,49 @@ impl Default for DefaultObjectStoreRegistry { } impl DefaultObjectStoreRegistry { - /// Create a new [`DefaultObjectStoreRegistry`] with no registered stores + /// Create a new [`DefaultObjectStoreRegistry`] with no registered stores. pub fn new() -> Self { let object_stores = RwLock::new(HashMap::new()); Self { object_stores } } + + /// Get the key of a url for object store registration. + /// + /// ## Examples + /// + /// - `s3://bucket` maps to `s3://bucket` + /// - `s3://bucket/path` maps to `s3://bucket` + /// - `s3://bucket/path?param=value` maps to `s3://bucket` + /// - `memory://` maps to `memory://` + /// - `memory://path` maps to `memory://` + /// - `file:///` maps to `file:///` + /// - `file:///path` maps to `file:///` + /// - `http://host:port` maps to `http://host:port/` + /// - `http://host:port/path` maps to `http://host:port/` + /// - `http://user:pass@host:port/path/to/file` maps to `http://host:port/` + /// + /// ## Returns + /// + /// A [`Url`] with the same scheme and host as the input, but with an empty path. + /// + /// ## Errors + /// + /// Returns an error if the input is not a valid URL. + fn map_url_to_key(url: &Url) -> Url { + match url.scheme() { + // Don't include the host for memory or path. Just hard code it + // since [`crate::parse::parse_url`] expects these to never have + // a "host" component. + "memory" => Url::parse(&format!("{}://", url.scheme())), + "file" => Url::parse(&format!("{}:///", url.scheme())), + _ => Url::parse(&format!( + "{}://{}", + url.scheme(), + &url[url::Position::BeforeHost..url::Position::AfterPort], + )), + } + .unwrap() + } } impl ObjectStoreRegistry for DefaultObjectStoreRegistry { @@ -91,21 +125,33 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { &self, url: &Url, store: Arc, - ) -> Option> { + ) -> Option<(Arc, Url)> { + let key = DefaultObjectStoreRegistry::map_url_to_key(url); let mut stores = self.object_stores.write().unwrap(); - stores.insert(url.clone(), store) + stores + .insert(key.clone(), store) + .map(|old_store| (old_store, key)) } /// Get a store that was registered with the provided URL. /// /// If no store was registered with the provided URL, `None` is returned. - fn get_store(&self, url: &Url) -> Option> { - let stores = self.object_stores.read().unwrap(); - stores.get(url).map(Arc::clone) + fn get_store(&self, url: &Url) -> Result, Url)>, crate::Error> { + let key = DefaultObjectStoreRegistry::map_url_to_key(url); + eprintln!("key: {key}"); + let mut stores = self.object_stores.write().unwrap(); + if let Some(store) = stores.get(&key) { + Ok(Some((Arc::clone(store), key))) + } else { + let (store, _) = parse_url(&key)?; + let store: Arc = store.into(); + stores.insert(key.clone(), store.clone()); + Ok(Some((store, key))) + } } - /// Returns a vector of all registered store prefixes. - fn get_store_prefixes(&self) -> Vec { + /// Returns a vector of all registered store URLs. + fn get_store_urls(&self) -> Vec { let stores = self.object_stores.read().unwrap(); stores.keys().cloned().collect() } @@ -119,80 +165,61 @@ mod tests { #[test] fn test_register_store() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("memory://foo").unwrap(); + let url = Url::parse("memory://").unwrap(); let store = Arc::new(InMemory::new()) as Arc; let old_store = registry.register_store(&url, Arc::clone(&store)); assert!(old_store.is_none()); - let retrieved_store = registry.get_store(&url).unwrap(); - assert!(Arc::ptr_eq(&retrieved_store, &store)); + let new_store = Arc::new(InMemory::new()) as Arc; + let (old_store, mapped_url) = registry.register_store(&url, new_store.clone()).unwrap(); + assert_eq!(mapped_url.as_str(), "memory://"); + assert!(Arc::ptr_eq(&old_store, &store)); + let (retrieved_store, mapped_url) = registry.get_store(&url).unwrap().unwrap(); + assert_eq!(mapped_url.as_str(), "memory://"); + assert!(Arc::ptr_eq(&retrieved_store, &new_store)); } - #[test] - fn test_reregister_store() { + #[tokio::test] + async fn test_dynamic_register_store() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("memory://foo").unwrap(); - let store = Arc::new(InMemory::new()) as Arc; - let old_store = registry.register_store(&url, Arc::clone(&store)); - assert!(old_store.is_none()); - let old_store = registry.register_store(&url, Arc::new(InMemory::new())); - assert!(Arc::ptr_eq(&old_store.unwrap(), &store)); - } - - #[test] - fn test_get_store_miss() { - let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("memory://foo").unwrap(); - assert!(registry.get_store(&url).is_none()); + let url = Url::parse("memory://").unwrap(); + let (first_store, mapped_url) = registry.get_store(&url).unwrap().unwrap(); + assert_eq!(mapped_url.as_str(), "memory://"); + first_store.put(&"/foo".into(), "bar".into()).await.unwrap(); + let (second_store, mapped_url) = registry.get_store(&url).unwrap().unwrap(); + assert_eq!(mapped_url.as_str(), "memory://"); + eprintln!("first_store: {:?}", first_store); + eprintln!("second_store: {:?}", second_store); + assert!(Arc::ptr_eq(&second_store, &first_store)); + let val = second_store + .get(&"/foo".into()) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(val.as_ref(), b"bar"); } #[test] fn test_list_urls() { let registry = DefaultObjectStoreRegistry::new(); - let url = Url::parse("memory://foo").unwrap(); + let url = Url::parse("memory://").unwrap(); let store = Arc::new(InMemory::new()) as Arc; registry.register_store(&url, store); - let urls = registry.get_store_prefixes(); + let urls = registry.get_store_urls(); assert_eq!(urls.len(), 1); - assert_eq!(urls[0], url); + assert_eq!(urls[0].as_str(), "memory://"); } #[test] - fn test_subprefix_miss() { + fn test_get_child_url() { let registry = DefaultObjectStoreRegistry::new(); - let base_url = Url::parse("memory://foo").unwrap(); + let base_url = Url::parse("memory://").unwrap(); let store = Arc::new(InMemory::new()) as Arc; registry.register_store(&base_url, Arc::clone(&store)); let subprefix_url = Url::parse("memory://foo/bar").unwrap(); - let retrieved_store = registry.get_store(&subprefix_url); - assert!(retrieved_store.is_none()); - } - - #[test] - fn test_exact_url_match_behavior() { - let registry = DefaultObjectStoreRegistry::new(); - let base_url = Url::parse("file:///foo").unwrap(); - let store = Arc::new(InMemory::new()) as Arc; - registry.register_store(&base_url, Arc::clone(&store)); - - // Case 1: Exact match should work - let exact_match = Url::parse("file:///foo").unwrap(); - let retrieved_store = registry.get_store(&exact_match); - assert!(retrieved_store.is_some()); - assert!(Arc::ptr_eq(&retrieved_store.unwrap(), &store)); - - // Case 2: Different URL format should not match - let different_format = Url::parse("file://foo").unwrap(); - let retrieved_store = registry.get_store(&different_format); - assert!(retrieved_store.is_none()); - - // Case 3: Subpath should not match - let subpath = Url::parse("file:///foo/bar").unwrap(); - let retrieved_store = registry.get_store(&subpath); - assert!(retrieved_store.is_none()); - - // Case 4: Different scheme should not match - let different_scheme = Url::parse("s3://foo").unwrap(); - let retrieved_store = registry.get_store(&different_scheme); - assert!(retrieved_store.is_none()); + let (retrieved_store, mapped_url) = registry.get_store(&subprefix_url).unwrap().unwrap(); + assert_eq!(mapped_url.as_str(), "memory://"); + assert!(Arc::ptr_eq(&retrieved_store, &store)); } } From 5d16b15fc5181a6a937db9fec5d7951632a854a5 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 13 May 2025 13:51:14 -0700 Subject: [PATCH 41/44] Add a test for map_url_to_key --- src/registry.rs | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/registry.rs b/src/registry.rs index d1f2d59d..1fc19320 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -106,6 +106,9 @@ impl DefaultObjectStoreRegistry { // since [`crate::parse::parse_url`] expects these to never have // a "host" component. "memory" => Url::parse(&format!("{}://", url.scheme())), + // Note this will handle file://path/to/file as well + // as file:///path/to/file even though file://path/to/file + // is not technically a valid URL. "file" => Url::parse(&format!("{}:///", url.scheme())), _ => Url::parse(&format!( "{}://{}", @@ -222,4 +225,43 @@ mod tests { assert_eq!(mapped_url.as_str(), "memory://"); assert!(Arc::ptr_eq(&retrieved_store, &store)); } + + #[test] + fn test_map_url_to_key() { + // Test all documented examples + let test_cases = [ + // s3 examples + ("s3://bucket", "s3://bucket"), + ("s3://bucket/path", "s3://bucket"), + ("s3://bucket/path?param=value", "s3://bucket"), + // memory examples + ("memory://", "memory://"), + ("memory://path", "memory://"), + // file examples + ("file:///", "file:///"), + ("file:///path", "file:///"), + // http examples + ("http://host:1234", "http://host:1234"), + ("http://host:1234/path", "http://host:1234"), + ( + "http://user:pass@host:1234/path/to/file", + "http://host:1234", + ), + ]; + + for (input, expected) in test_cases { + let input_url = Url::parse(input).unwrap(); + let expected_url = Url::parse(expected).unwrap(); + let result = DefaultObjectStoreRegistry::map_url_to_key(&input_url); + + assert_eq!( + result.as_str(), + expected_url.as_str(), + "Expected '{}' to map to '{}', but got '{}'", + input, + expected, + result + ); + } + } } From 1d530fdf45e3c89b9621f43b4c04481e3741149f Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 13 May 2025 14:03:42 -0700 Subject: [PATCH 42/44] Clean up docs --- src/registry.rs | 76 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 20 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 1fc19320..18ab8b3c 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -28,6 +28,8 @@ use url::Url; pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// Register a new store for the provided URL /// + /// ## Returns + /// /// If a store with the same URL mapping exists before, it is replaced and returned along /// with the mapped URL. fn register_store( @@ -43,14 +45,60 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// If no [`ObjectStore`] is found for the `url`, an [`ObjectStore`] may be lazily be /// created and registered. The logic for doing so is left to each [`ObjectStoreRegistry`] /// implementation. + /// + /// ## Returns + /// + /// If a store is found for the `url`, it is returned along with the mapped URL. + /// + /// If no store is found for the `url`, `None` is returned. + /// + /// ## Errors + /// + /// Returns an error if an implementation can't parse a URL or create a store. fn get_store(&self, url: &Url) -> Result, Url)>, Error>; /// List all registered store URLs. These are the URL mappings for all registered stores. + /// + /// ## Returns + /// + /// A vector of all registered store URLs. fn get_store_urls(&self) -> Vec; } /// An [`ObjectStoreRegistry`] implementation that maps URLs to object stores using /// `scheme://host:port`. +/// +/// ## Examples +/// +/// Registering a store: +/// +/// ``` +/// # use std::sync::Arc; +/// # use url::Url; +/// # use object_store::ObjectStore; +/// # use object_store::memory::InMemory; +/// # use object_store::registry::{ObjectStoreRegistry, DefaultObjectStoreRegistry}; +/// let registry = DefaultObjectStoreRegistry::new(); +/// let url = Url::parse("memory://path/to/store").unwrap(); +/// let store = Arc::new(InMemory::new()) as Arc; +/// registry.register_store(&url, Arc::clone(&store)); +/// let (retrieved_store, mapped_url) = registry.get_store(&url).unwrap().unwrap(); +/// assert_eq!(mapped_url.as_str(), "memory://"); +/// assert!(Arc::ptr_eq(&retrieved_store, &store)); +/// ``` +/// +/// Dynamically creating a store: +/// +/// ``` +/// # use std::sync::Arc; +/// # use url::Url; +/// # use object_store::ObjectStore; +/// # use object_store::registry::{ObjectStoreRegistry, DefaultObjectStoreRegistry}; +/// let registry = DefaultObjectStoreRegistry::new(); +/// let url = Url::parse("memory://path/to/store").unwrap(); +/// let (store, mapped_url) = registry.get_store(&url).unwrap().unwrap(); +/// assert_eq!(mapped_url.as_str(), "memory://"); +/// ``` pub struct DefaultObjectStoreRegistry { /// A map from URL to object store that serve list / read operations for the store object_stores: RwLock>>, @@ -78,20 +126,11 @@ impl DefaultObjectStoreRegistry { Self { object_stores } } - /// Get the key of a url for object store registration. - /// - /// ## Examples + /// Get the key of a url for object store registration. Mapping rules are as follows: /// - /// - `s3://bucket` maps to `s3://bucket` - /// - `s3://bucket/path` maps to `s3://bucket` - /// - `s3://bucket/path?param=value` maps to `s3://bucket` - /// - `memory://` maps to `memory://` - /// - `memory://path` maps to `memory://` - /// - `file:///` maps to `file:///` - /// - `file:///path` maps to `file:///` - /// - `http://host:port` maps to `http://host:port/` - /// - `http://host:port/path` maps to `http://host:port/` - /// - `http://user:pass@host:port/path/to/file` maps to `http://host:port/` + /// - Any URL with a `file` scheme is mapped to `file:///` + /// - Any URL with a `memory` scheme is mapped to `memory://` + /// - All other URLs are mapped to `scheme://host:port` /// /// ## Returns /// @@ -148,7 +187,7 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { } else { let (store, _) = parse_url(&key)?; let store: Arc = store.into(); - stores.insert(key.clone(), store.clone()); + stores.insert(key.clone(), Arc::clone(&store)); Ok(Some((store, key))) } } @@ -173,7 +212,9 @@ mod tests { let old_store = registry.register_store(&url, Arc::clone(&store)); assert!(old_store.is_none()); let new_store = Arc::new(InMemory::new()) as Arc; - let (old_store, mapped_url) = registry.register_store(&url, new_store.clone()).unwrap(); + let (old_store, mapped_url) = registry + .register_store(&url, Arc::clone(&new_store)) + .unwrap(); assert_eq!(mapped_url.as_str(), "memory://"); assert!(Arc::ptr_eq(&old_store, &store)); let (retrieved_store, mapped_url) = registry.get_store(&url).unwrap().unwrap(); @@ -228,19 +269,14 @@ mod tests { #[test] fn test_map_url_to_key() { - // Test all documented examples let test_cases = [ - // s3 examples ("s3://bucket", "s3://bucket"), ("s3://bucket/path", "s3://bucket"), ("s3://bucket/path?param=value", "s3://bucket"), - // memory examples ("memory://", "memory://"), ("memory://path", "memory://"), - // file examples ("file:///", "file:///"), ("file:///path", "file:///"), - // http examples ("http://host:1234", "http://host:1234"), ("http://host:1234/path", "http://host:1234"), ( From c81f155add00ff62c981b3be3641c05dcac3ddc6 Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 13 May 2025 14:04:22 -0700 Subject: [PATCH 43/44] Clippy --- src/registry.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/registry.rs b/src/registry.rs index 18ab8b3c..1302163c 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -168,7 +168,7 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { url: &Url, store: Arc, ) -> Option<(Arc, Url)> { - let key = DefaultObjectStoreRegistry::map_url_to_key(url); + let key = Self::map_url_to_key(url); let mut stores = self.object_stores.write().unwrap(); stores .insert(key.clone(), store) @@ -179,7 +179,7 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { /// /// If no store was registered with the provided URL, `None` is returned. fn get_store(&self, url: &Url) -> Result, Url)>, crate::Error> { - let key = DefaultObjectStoreRegistry::map_url_to_key(url); + let key = Self::map_url_to_key(url); eprintln!("key: {key}"); let mut stores = self.object_stores.write().unwrap(); if let Some(store) = stores.get(&key) { From 2a076c99f0fab7d0f5ed7d445eecda2db09f560a Mon Sep 17 00:00:00 2001 From: Chris Riccomini Date: Tue, 13 May 2025 14:14:10 -0700 Subject: [PATCH 44/44] More clippy --- src/registry.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/registry.rs b/src/registry.rs index 1302163c..ba5e00eb 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -22,6 +22,8 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use url::Url; +type GetStoreResult = Result, Url)>, Error>; + /// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance. The meaning of /// a URL mapping depends on the [`ObjectStoreRegistry`] implementation. See implementation /// docs for more details. @@ -55,7 +57,7 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { /// ## Errors /// /// Returns an error if an implementation can't parse a URL or create a store. - fn get_store(&self, url: &Url) -> Result, Url)>, Error>; + fn get_store(&self, url: &Url) -> GetStoreResult; /// List all registered store URLs. These are the URL mappings for all registered stores. ///