diff --git a/config/tutorials/hdfs-logs/index-config.yaml b/config/tutorials/hdfs-logs/index-config.yaml index 6b949dfe079..6e9713c1962 100644 --- a/config/tutorials/hdfs-logs/index-config.yaml +++ b/config/tutorials/hdfs-logs/index-config.yaml @@ -32,3 +32,9 @@ doc_mapping: search_settings: default_search_fields: [severity_text, body] + +# Uncomment to distribute S3 object keys across 1024 prefixes and reduce rate limiting +# on high-throughput indexes where recent splits would otherwise cluster on the same +# S3 partition. Affects only newly created splits; existing splits continue to use +# the legacy flat naming scheme. +# split_key_prefix_len: 2 diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 50dcdeec846..ae0aa5e37c2 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -112,6 +112,84 @@ pub fn split_file(split_id: impl Display) -> String { format!("{split_id}.split") } +/// Computes the S3 key prefix for a split from its ULID string and the configured length. +/// +/// Extracts `prefix_len` characters starting at position 10 of the ULID (the first characters +/// of the random portion, after the 10-character timestamp). Returns an empty string when +/// `prefix_len` is 0 (legacy flat scheme). +/// +/// The returned prefix does NOT contain the trailing `/` separator; that separator is added by +/// [`split_storage_path`] when building the full storage path. +/// +/// If `split_id` is too short to extract the requested prefix (hidden contract: must be a +/// valid 26-character ULID and `prefix_len <= 16`), logs a rate-limited warning and returns +/// an empty string, falling back to the legacy flat scheme. +pub fn compute_split_key_prefix(split_id: &str, prefix_len: u8) -> String { + if prefix_len == 0 { + return String::new(); + } + let end = 10 + prefix_len as usize; + if split_id.len() < end { + crate::rate_limited_warn!( + limit_per_min = 1, + split_id = split_id, + prefix_len = prefix_len, + "split ID is too short to extract prefix; falling back to flat storage path" + ); + return String::new(); + } + split_id[10..end].to_string() +} + +/// Returns the storage path for a split given its ID and key prefix. +/// +/// - Empty `prefix`: legacy flat scheme `{split_id}.split` +/// - Non-empty `prefix`: `{prefix}/{split_id}.split` — the `/` separator is inserted here, so the +/// `prefix` itself must NOT contain a trailing `/` (as produced by [`compute_split_key_prefix`]) +/// +/// The prefix is typically computed once at split creation time via +/// [`compute_split_key_prefix`] and stored in `SplitMetadata`. +pub fn split_storage_path(split_id: &str, prefix: &str) -> String { + if prefix.is_empty() { + return format!("{split_id}.split"); + } + format!("{prefix}/{split_id}.split") +} + +#[cfg(test)] +mod split_path_tests { + use super::*; + + const SAMPLE_ULID: &str = "01ARZ3NDEKTSV4RRFFQ69G5FAV"; + + #[test] + fn test_compute_split_key_prefix_zero() { + assert_eq!(compute_split_key_prefix(SAMPLE_ULID, 0), ""); + } + + #[test] + fn test_compute_split_key_prefix_four() { + // Chars 10–13 of the ULID are the first 4 chars of the random portion. + assert_eq!(compute_split_key_prefix(SAMPLE_ULID, 4), "TSV4"); + } + + #[test] + fn test_split_storage_path_empty_prefix() { + assert_eq!( + split_storage_path(SAMPLE_ULID, ""), + "01ARZ3NDEKTSV4RRFFQ69G5FAV.split" + ); + } + + #[test] + fn test_split_storage_path_with_prefix() { + assert_eq!( + split_storage_path(SAMPLE_ULID, "TS"), + "TS/01ARZ3NDEKTSV4RRFFQ69G5FAV.split" + ); + } +} + fn get_from_env_opt_aux( key: &str, parse_fn: impl FnOnce(&str) -> Option, diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index 95b3a531aaf..0f1453968d2 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -82,5 +82,29 @@ pub const DEFAULT_SHARD_BURST_LIMIT: ByteSize = ByteSize::mib(50); /// A compromise between "exponential" scale up and moderate shard count increase. pub const DEFAULT_SHARD_SCALE_UP_FACTOR: f32 = 1.5; +/// Returns the number of characters from the ULID random portion used as an S3 key prefix +/// directory to distribute split object keys across S3 partitions and avoid hotspots. +/// +/// Controlled by `QW_SPLIT_KEY_PREFIX_LEN` (default: 0, i.e. legacy flat scheme). +/// Maximum value is 16 (length of the ULID random portion). +/// Setting this to `2` creates 1024 buckets, which is sufficient for most workloads. +pub fn split_key_prefix_len() -> u8 { + const MAX_SPLIT_KEY_PREFIX_LEN: u8 = 16; + + static SPLIT_KEY_PREFIX_LEN: LazyLock = LazyLock::new(|| { + let value: u8 = crate::get_from_env("QW_SPLIT_KEY_PREFIX_LEN", 0u8, false); + if value > MAX_SPLIT_KEY_PREFIX_LEN { + warn!( + value, + max = MAX_SPLIT_KEY_PREFIX_LEN, + "QW_SPLIT_KEY_PREFIX_LEN exceeds maximum, clamping to {MAX_SPLIT_KEY_PREFIX_LEN}" + ); + return MAX_SPLIT_KEY_PREFIX_LEN; + } + value + }); + *SPLIT_KEY_PREFIX_LEN +} + // (Just a reexport). pub use bytesize::MIB; diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index d536e4edab5..7c3a5afa0b8 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -380,6 +380,7 @@ fn prepend_at_char(schedule: &str) -> String { trimmed_schedule.to_string() } + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(deny_unknown_fields)] #[serde(into = "VersionedIndexConfig")] diff --git a/quickwit/quickwit-config/src/index_config/serialize.rs b/quickwit/quickwit-config/src/index_config/serialize.rs index 24fcc6d1ac2..5ab5a265d4a 100644 --- a/quickwit/quickwit-config/src/index_config/serialize.rs +++ b/quickwit/quickwit-config/src/index_config/serialize.rs @@ -558,4 +558,5 @@ mod test { ) .expect_err("field required for default search is absent"); } + } diff --git a/quickwit/quickwit-config/src/index_template/mod.rs b/quickwit/quickwit-config/src/index_template/mod.rs index d312edab1be..367b5a4cd3f 100644 --- a/quickwit/quickwit-config/src/index_template/mod.rs +++ b/quickwit/quickwit-config/src/index_template/mod.rs @@ -29,6 +29,7 @@ use crate::{ pub type IndexTemplateId = String; pub type IndexIdPattern = String; + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(into = "VersionedIndexTemplate")] #[serde(from = "VersionedIndexTemplate")] diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 5d68bb59285..e5f154c72cb 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -114,7 +114,12 @@ impl MergeSplitDownloader { let _protect_guard = ctx.protect_zone(); let tantivy_dir = self .split_store - .fetch_and_open_split(split.split_id(), download_directory, &io_controls) + .fetch_and_open_split( + split.split_id(), + &split.prefix, + download_directory, + &io_controls, + ) .await .map_err(|error| { let split_id = split.split_id(); diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index 0b2901f26d5..62151b8ed9a 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -327,17 +327,22 @@ impl Handler for Uploader { return; } }; - let split_metadata = create_split_metadata( + let mut split_metadata = create_split_metadata( &merge_policy, retention_policy.as_ref(), &packaged_split.split_attrs, packaged_split.tags.clone(), split_streamer.footer_range.start..split_streamer.footer_range.end, ); + split_metadata.prefix = quickwit_common::compute_split_key_prefix( + split_metadata.split_id(), + quickwit_common::shared_consts::split_key_prefix_len(), + ); report_splits.push(ReportSplit { storage_uri: split_store.remote_uri().to_string(), split_id: packaged_split.split_id().to_string(), + prefix: split_metadata.prefix.clone(), }); split_metadata_list.push(split_metadata); diff --git a/quickwit/quickwit-indexing/src/models/split_attrs.rs b/quickwit/quickwit-indexing/src/models/split_attrs.rs index 2e0504bdd35..fa9b30e4b6b 100644 --- a/quickwit/quickwit-indexing/src/models/split_attrs.rs +++ b/quickwit/quickwit-indexing/src/models/split_attrs.rs @@ -125,6 +125,8 @@ pub fn create_split_metadata( footer_offsets, delete_opstamp: split_attrs.delete_opstamp, num_merge_ops: split_attrs.num_merge_ops, + // prefix is set by the Uploader after calling create_split_metadata + prefix: String::new(), } } diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs index 88904cbd6dd..d6cf782edb7 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs @@ -93,8 +93,8 @@ impl IndexingSplitStore { self.inner.remote_storage.uri() } - fn split_path(&self, split_id: &str) -> PathBuf { - PathBuf::from(quickwit_common::split_file(split_id)) + fn split_path(&self, split_id: &str, prefix: &str) -> PathBuf { + PathBuf::from(quickwit_common::split_storage_path(split_id, prefix)) } /// Stores a split. @@ -116,7 +116,7 @@ impl IndexingSplitStore { let start = Instant::now(); let split_num_bytes = put_payload.len(); - let key = self.split_path(split.split_id()); + let key = self.split_path(split.split_id(), &split.prefix); let is_mature = split.is_mature(OffsetDateTime::now_utc()); self.inner .remote_storage @@ -179,10 +179,13 @@ impl IndexingSplitStore { pub async fn fetch_and_open_split( &self, split_id: &str, + prefix: &str, output_dir_path: &Path, io_controls: &IoControls, ) -> StorageResult> { - let path = PathBuf::from(quickwit_common::split_file(split_id)); + // The local filename is always flat (no prefix directory) — the prefix only affects the + // remote S3 key. + let local_filename = quickwit_common::split_file(split_id); if let Some(split_path) = self .inner .split_cache @@ -198,13 +201,14 @@ impl IndexingSplitStore { } else { tracing::Span::current().record("cache_hit", false); } - let dest_filepath = output_dir_path.join(&path); + let remote_path = self.split_path(split_id, prefix); + let dest_filepath = output_dir_path.join(&local_filename); let dest_file = tokio::fs::File::create(&dest_filepath).await?; let mut dest_file_with_write_limit = io_controls.clone().wrap_write(dest_file); self.inner .remote_storage - .copy_to(&path, &mut dest_file_with_write_limit) - .instrument(info_span!("fetch_split_from_remote_storage", path=?path)) + .copy_to(&remote_path, &mut dest_file_with_write_limit) + .instrument(info_span!("fetch_split_from_remote_storage", path=?remote_path)) .await?; get_tantivy_directory_from_split_bundle(&dest_filepath) } @@ -314,7 +318,7 @@ mod tests { { let output = tempfile::tempdir()?; let split1 = split_store - .fetch_and_open_split(&split_id1, output.path(), &io_controls) + .fetch_and_open_split(&split_id1, "", output.path(), &io_controls) .await?; let local_store_stats = split_store.inspect_split_cache().await; assert_eq!(local_store_stats.len(), 1); @@ -323,7 +327,7 @@ mod tests { { let output = tempfile::tempdir()?; let split2 = split_store - .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .fetch_and_open_split(&split_id2, "", output.path(), &io_controls) .await?; let local_store_stats = split_store.inspect_split_cache().await; assert_eq!(local_store_stats.len(), 0); @@ -410,7 +414,7 @@ mod tests { // get from remote storage because split_id1 was evicted by split_id2 let output = tempfile::tempdir()?; let _split1 = split_store - .fetch_and_open_split(&split_id1, output.path(), &io_controls) + .fetch_and_open_split(&split_id1, "", output.path(), &io_controls) .await?; assert_eq!(io_controls.num_bytes(), split_payload1.len()); } @@ -418,7 +422,7 @@ mod tests { // get from cache let output = tempfile::tempdir()?; let _split2 = split_store - .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .fetch_and_open_split(&split_id2, "", output.path(), &io_controls) .await?; // the number of downloaded by didn't change (still the size of split_payload1) assert_eq!(io_controls.num_bytes(), split_payload1.len()); @@ -427,7 +431,7 @@ mod tests { // get from remote because getting from cache removes the split from the cache let output = tempfile::tempdir()?; let _split2 = split_store - .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .fetch_and_open_split(&split_id2, "", output.path(), &io_controls) .await?; assert_eq!( io_controls.num_bytes(), diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index fe88fe379d3..d9723a838f6 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -132,6 +132,17 @@ pub struct SplitMetadata { /// Doc mapping UID used when creating this split. This split may only be merged with other /// splits using the same doc mapping UID. pub doc_mapping_uid: DocMappingUid, + + /// S3 key prefix directory used when this split was stored (e.g., `"ND"`). + /// + /// An empty string means the legacy flat scheme: `{split_id}.split` directly under the index + /// URI. A non-empty prefix (with no trailing `/`) means the split is stored at + /// `{prefix}/{split_id}.split`. + /// + /// This value is set once at split creation time from the `QW_SPLIT_KEY_PREFIX_LEN` + /// environment variable and never changed afterwards. + #[serde(default)] + pub prefix: String, } impl fmt::Debug for SplitMetadata { @@ -229,7 +240,7 @@ impl SplitMetadata { /// Converts the split metadata into a [`SplitInfo`]. pub fn as_split_info(&self) -> SplitInfo { - let file_name = quickwit_common::split_file(self.split_id()); + let file_name = quickwit_common::split_storage_path(self.split_id(), &self.prefix); SplitInfo { uncompressed_docs_size_bytes: ByteSize(self.uncompressed_docs_size_in_bytes), @@ -281,6 +292,7 @@ impl quickwit_config::TestableForRegression for SplitMetadata { footer_offsets: 1000..2000, num_merge_ops: 3, doc_mapping_uid: DocMappingUid::default(), + prefix: String::new(), } } @@ -421,6 +433,7 @@ mod tests { delete_opstamp: 0, num_merge_ops: 0, doc_mapping_uid: DocMappingUid::default(), + prefix: String::new(), }; let expected_output = "SplitMetadata { split_id: \"split-1\", index_uid: IndexUid { \ diff --git a/quickwit/quickwit-metastore/src/split_metadata_version.rs b/quickwit/quickwit-metastore/src/split_metadata_version.rs index 8325290be92..e2b050c0d54 100644 --- a/quickwit/quickwit-metastore/src/split_metadata_version.rs +++ b/quickwit/quickwit-metastore/src/split_metadata_version.rs @@ -92,6 +92,10 @@ pub(crate) struct SplitMetadataV0_8 { // splits before when updates first appeared are compatible with each other. #[serde(default)] doc_mapping_uid: DocMappingUid, + + #[serde(default)] + #[serde(skip_serializing_if = "String::is_empty")] + pub prefix: String, } impl From for SplitMetadata { @@ -128,6 +132,7 @@ impl From for SplitMetadata { footer_offsets: v8.footer_offsets, num_merge_ops: v8.num_merge_ops, doc_mapping_uid: v8.doc_mapping_uid, + prefix: v8.prefix, } } } @@ -150,6 +155,7 @@ impl From for SplitMetadataV0_8 { footer_offsets: split.footer_offsets, num_merge_ops: split.num_merge_ops, doc_mapping_uid: split.doc_mapping_uid, + prefix: split.prefix, } } } diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index d50c8b529e6..61bb314d952 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -105,6 +105,8 @@ message ReportSplit { string split_id = 2; // The storage uri. This URI does NOT include the split id. string storage_uri = 1; + // S3 key prefix directory with no trailing "/" (e.g. "ND"). Empty string means the legacy flat scheme. + string prefix = 3; } message ReportSplitsRequest { @@ -517,6 +519,8 @@ message SplitIdAndFooterOffsets { optional int64 timestamp_end = 5; // The number of docs in the split uint64 num_docs = 6; + // S3 key prefix directory with no trailing "/" (e.g. "ND"). Empty string means the legacy flat scheme. + string prefix = 7; } // Hits returned by a FetchDocRequest. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 665ecdf5f96..78b7fb9960f 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -43,6 +43,9 @@ pub struct ReportSplit { /// The storage uri. This URI does NOT include the split id. #[prost(string, tag = "1")] pub storage_uri: ::prost::alloc::string::String, + /// S3 key prefix directory with no trailing "/" (e.g. "ND"). Empty string means the legacy flat scheme. + #[prost(string, tag = "3")] + pub prefix: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -480,6 +483,9 @@ pub struct SplitIdAndFooterOffsets { /// The number of docs in the split #[prost(uint64, tag = "6")] pub num_docs: u64, + /// S3 key prefix directory with no trailing "/" (e.g. "ND"). Empty string means the legacy flat scheme. + #[prost(string, tag = "7")] + pub prefix: ::prost::alloc::string::String, } /// Hits returned by a FetchDocRequest. /// diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 168ab5993b0..2acaebfb83d 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -336,6 +336,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }], ..Default::default() } @@ -363,6 +364,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }, SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -371,6 +373,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }, ], }], diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index d632742bd28..4a0fa81d6a7 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -132,7 +132,10 @@ async fn get_split_footer_from_cache_or_fetch( return Ok(footer_data); } } - let split_file = PathBuf::from(format!("{}.split", split_and_footer_offsets.split_id)); + let split_file = PathBuf::from(quickwit_common::split_storage_path( + &split_and_footer_offsets.split_id, + &split_and_footer_offsets.prefix, + )); let footer_data_opt = index_storage .get_slice( &split_file, @@ -163,7 +166,10 @@ pub(crate) async fn open_split_bundle( index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, ) -> anyhow::Result<(FileSlice, BundleStorage)> { - let split_file = PathBuf::from(format!("{}.split", split_and_footer_offsets.split_id)); + let split_file = PathBuf::from(quickwit_common::split_storage_path( + &split_and_footer_offsets.split_id, + &split_and_footer_offsets.prefix, + )); let footer_data = get_split_footer_from_cache_or_fetch( index_storage.clone(), split_and_footer_offsets, diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index ad0a30cc4e6..2b739a4e482 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -261,6 +261,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }; let split_2 = SplitIdAndFooterOffsets { @@ -270,6 +271,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }; let query_1 = SearchRequest { @@ -340,6 +342,7 @@ mod tests { timestamp_start: Some(100), timestamp_end: Some(199), num_docs: 0, + prefix: String::new(), }; let split_2 = SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -348,6 +351,7 @@ mod tests { timestamp_start: Some(150), timestamp_end: Some(249), num_docs: 0, + prefix: String::new(), }; let split_3 = SplitIdAndFooterOffsets { split_id: "split_3".to_string(), @@ -356,6 +360,7 @@ mod tests { timestamp_start: Some(150), timestamp_end: Some(249), num_docs: 0, + prefix: String::new(), }; let query_1 = SearchRequest { diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index b7c5afd206c..9596dd36e32 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -176,6 +176,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn .as_ref() .map(|time_range| *time_range.end()), num_docs: split_metadata.num_docs as u64, + prefix: split_metadata.prefix.clone(), } } diff --git a/quickwit/quickwit-search/src/retry/mod.rs b/quickwit/quickwit-search/src/retry/mod.rs index 996665717cf..6a5e7a73b83 100644 --- a/quickwit/quickwit-search/src/retry/mod.rs +++ b/quickwit/quickwit-search/src/retry/mod.rs @@ -128,6 +128,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }; let client_for_retry = retry_client( &search_job_placer, diff --git a/quickwit/quickwit-search/src/retry/search.rs b/quickwit/quickwit-search/src/retry/search.rs index 696a352de94..9c08337e5f1 100644 --- a/quickwit/quickwit-search/src/retry/search.rs +++ b/quickwit/quickwit-search/src/retry/search.rs @@ -93,6 +93,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }, SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -101,6 +102,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }, ], }], diff --git a/quickwit/quickwit-storage/src/split_cache/download_task.rs b/quickwit/quickwit-storage/src/split_cache/download_task.rs index 9af0390bfb5..f40555582df 100644 --- a/quickwit/quickwit-storage/src/split_cache/download_task.rs +++ b/quickwit/quickwit-storage/src/split_cache/download_task.rs @@ -32,12 +32,15 @@ async fn download_split( split_ulid, storage_uri, living_token: _, + remote_split_path, } = candidate_split; - let split_filename = split_file(*split_ulid); - let target_filepath = root_path.join(&split_filename); + // Local cache always uses the flat filename (no prefix directory); the remote key may be + // sharded under a prefix directory (carried verbatim in `remote_split_path`). + let local_filename = split_file(*split_ulid); + let target_filepath = root_path.join(&local_filename); let storage = storage_resolver.resolve(storage_uri).await?; let num_bytes = storage - .copy_to_file(Path::new(&split_filename), &target_filepath) + .copy_to_file(Path::new(remote_split_path), &target_filepath) .await?; Ok(num_bytes) } diff --git a/quickwit/quickwit-storage/src/split_cache/mod.rs b/quickwit/quickwit-storage/src/split_cache/mod.rs index 1c0de124d73..ef5ecb68c2e 100644 --- a/quickwit/quickwit-storage/src/split_cache/mod.rs +++ b/quickwit/quickwit-storage/src/split_cache/mod.rs @@ -146,20 +146,27 @@ impl SplitCache { error!(storage_uri=%report_split.storage_uri, "received invalid storage uri: ignoring"); continue; }; - split_table.report(split_ulid, storage_uri); + let remote_split_path = + quickwit_common::split_storage_path(&report_split.split_id, &report_split.prefix); + split_table.report(split_ulid, storage_uri, remote_split_path); } } // Returns a split guard object. As long as it is not dropped, the // split won't be evinced from the cache. - async fn get_split_file(&self, split_id: Ulid, storage_uri: &Uri) -> Option { + async fn get_split_file( + &self, + split_id: Ulid, + storage_uri: &Uri, + remote_split_path: String, + ) -> Option { // We touch before even checking the fd cache in order to update the file's last access time // for the file cache. - let num_bytes_opt: Option = self - .split_table - .lock() - .unwrap() - .touch(split_id, storage_uri); + let num_bytes_opt: Option = + self.split_table + .lock() + .unwrap() + .touch(split_id, storage_uri, remote_split_path); let num_bytes = num_bytes_opt?; self.fd_cache @@ -200,18 +207,22 @@ struct SplitCacheBackingStorage { impl SplitCacheBackingStorage { async fn get_impl(&self, path: &Path, byte_range: Range) -> Option { let split_id = split_id_from_path(path)?; + // The path we are handed is exactly the relative storage key of the split. + let remote_split_path = path.to_string_lossy().into_owned(); let split_file: SplitFile = self .split_cache - .get_split_file(split_id, &self.storage_root_uri) + .get_split_file(split_id, &self.storage_root_uri, remote_split_path) .await?; split_file.get_range(byte_range).await.ok() } async fn get_all_impl(&self, path: &Path) -> Option { let split_id = split_id_from_path(path)?; + // The path we are handed is exactly the relative storage key of the split. + let remote_split_path = path.to_string_lossy().into_owned(); let split_file = self .split_cache - .get_split_file(split_id, &self.storage_root_uri) + .get_split_file(split_id, &self.storage_root_uri, remote_split_path) .await?; split_file.get_all().await.ok() } diff --git a/quickwit/quickwit-storage/src/split_cache/split_table.rs b/quickwit/quickwit-storage/src/split_cache/split_table.rs index ee24a393e84..7b9b53f9786 100644 --- a/quickwit/quickwit-storage/src/split_cache/split_table.rs +++ b/quickwit/quickwit-storage/src/split_cache/split_table.rs @@ -237,7 +237,12 @@ impl SplitTable { /// If the file is already on the disk cache, return `Some(num_bytes)`. /// If the file is not in cache, return `None`, and register the file in the candidate for /// download list. - pub fn touch(&mut self, split_ulid: Ulid, storage_uri: &Uri) -> Option { + pub fn touch( + &mut self, + split_ulid: Ulid, + storage_uri: &Uri, + remote_split_path: String, + ) -> Option { let timestamp = compute_timestamp(self.origin_time); let status = self.mutate_split(split_ulid, |old_split_info| { if let Some(mut split_info) = old_split_info { @@ -253,6 +258,7 @@ impl SplitTable { storage_uri: storage_uri.clone(), split_ulid, living_token: Arc::new(()), + remote_split_path, }), } } @@ -298,7 +304,7 @@ impl SplitTable { }); } - pub(crate) fn report(&mut self, split_ulid: Ulid, storage_uri: Uri) { + pub(crate) fn report(&mut self, split_ulid: Ulid, storage_uri: Uri, remote_split_path: String) { let origin_time = self.origin_time; self.mutate_split(split_ulid, move |split_info_opt| { if let Some(split_info) = split_info_opt { @@ -314,6 +320,7 @@ impl SplitTable { storage_uri, split_ulid, living_token: Arc::new(()), + remote_split_path, }), } }); @@ -435,6 +442,10 @@ pub(crate) struct CandidateSplit { pub storage_uri: Uri, pub split_ulid: Ulid, pub living_token: Arc<()>, + /// Relative storage key of the split within `storage_uri`, used as-is to fetch it from remote + /// storage. E.g. `"ND/01ARZ….split"` for a sharded split, or `"01ARZ….split"` for the legacy + /// flat scheme. The local cache filename is always flat and derived from `split_ulid`. + pub remote_split_path: String, } pub(crate) struct DownloadOpportunity { @@ -481,8 +492,8 @@ mod tests { let ulids = sorted_split_ulids(2); let ulid1 = ulids[0]; let ulid2 = ulids[1]; - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); - split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate = split_table.best_candidate().unwrap(); assert_eq!(candidate.split_ulid, ulid2); } @@ -501,9 +512,9 @@ mod tests { let ulids = sorted_split_ulids(2); let ulid1 = ulids[0]; let ulid2 = ulids[1]; - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); - split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); - let num_bytes_opt = split_table.touch(ulid1, &Uri::for_test("s3://test1/")); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI), String::new()); + let num_bytes_opt = split_table.touch(ulid1, &Uri::for_test("s3://test1/"), String::new()); assert!(num_bytes_opt.is_none()); let candidate = split_table.best_candidate().unwrap(); assert_eq!(candidate.split_ulid, ulid1); @@ -521,7 +532,7 @@ mod tests { Default::default(), ); let ulid1 = Ulid::new(); - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); assert_eq!(split_table.num_bytes(), 0); let download = split_table.start_download(ulid1); assert!(download.is_some()); @@ -529,11 +540,11 @@ mod tests { split_table.register_as_downloaded(ulid1, 10_000_000); assert_eq!(split_table.num_bytes(), 10_000_000); assert_eq!( - split_table.touch(ulid1, &Uri::for_test(TEST_STORAGE_URI)), + split_table.touch(ulid1, &Uri::for_test(TEST_STORAGE_URI), String::new()), Some(10_000_000) ); let ulid2 = Ulid::new(); - split_table.report(ulid2, Uri::for_test("s3://test`/")); + split_table.report(ulid2, Uri::for_test("s3://test`/"), String::new()); let download = split_table.start_download(ulid2); assert!(download.is_some()); assert!(split_table.start_download(ulid2).is_none()); @@ -564,11 +575,11 @@ mod tests { (split_ulids[5], 300_000), ]; for (split_ulid, num_bytes) in splits { - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); split_table.register_as_downloaded(split_ulid, num_bytes); } let new_ulid = Ulid::new(); - split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let DownloadOpportunity { splits_to_delete, split_to_download, @@ -602,11 +613,11 @@ mod tests { (split_ulids[5], 300_000), ]; for (split_ulid, num_bytes) in splits { - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); split_table.register_as_downloaded(split_ulid, num_bytes); } let new_ulid = Ulid::new(); - split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let DownloadOpportunity { splits_to_delete, split_to_download, @@ -627,10 +638,10 @@ mod tests { Default::default(), ); let split_ulid = Ulid::new(); - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate = split_table.start_download(split_ulid).unwrap(); // This report should be cancelled as we have a download currently running. - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); assert!(split_table.start_download(split_ulid).is_none()); std::mem::drop(candidate); @@ -639,7 +650,7 @@ mod tests { assert!(split_table.start_download(split_ulid).is_none()); // This report should be considered as our candidate (and its alive token has been dropped) - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate2 = split_table.start_download(split_ulid).unwrap(); assert_eq!(candidate2.split_ulid, split_ulid); @@ -658,7 +669,7 @@ mod tests { ); for i in 1..2_000 { let split_ulid = Ulid::new(); - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); assert_eq!( split_table.candidate_splits.len(), i.min(super::MAX_NUM_CANDIDATES) @@ -684,6 +695,7 @@ mod tests { storage_uri: Uri::for_test(TEST_STORAGE_URI), split_ulid, living_token: Arc::new(()), + remote_split_path: String::new(), }; let split_info = SplitInfo { split_key: SplitKey { diff --git a/quickwit/quickwit-storage/src/split_cache/tests.rs b/quickwit/quickwit-storage/src/split_cache/tests.rs index e08639dc9aa..e3dfa76db94 100644 --- a/quickwit/quickwit-storage/src/split_cache/tests.rs +++ b/quickwit/quickwit-storage/src/split_cache/tests.rs @@ -32,8 +32,8 @@ fn test_split_table() { }); let ulid1 = Ulid::new(); let ulid2 = Ulid::new(); - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); - split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate = split_table.best_candidate().unwrap(); assert_eq!(candidate.split_ulid, ulid2); } @@ -47,8 +47,8 @@ fn test_split_table_prefer_last_touched() { }); let ulid1 = Ulid::new(); let ulid2 = Ulid::new(); - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); - split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI), String::new()); let split_guard_opt = split_table.get_split_guard(ulid1, &Uri::for_test("s3://test1/")); assert!(split_guard_opt.is_none()); let candidate = split_table.best_candidate().unwrap(); @@ -63,7 +63,7 @@ fn test_split_table_prefer_start_download_prevent_new_report() { num_concurrent_downloads: NonZeroU32::new(1).unwrap(), }); let ulid1 = Ulid::new(); - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); assert_eq!(split_table.num_bytes(), 0); let download = split_table.start_download(ulid1); assert!(download.is_some()); @@ -72,7 +72,7 @@ fn test_split_table_prefer_start_download_prevent_new_report() { assert_eq!(split_table.num_bytes(), 10_000_000); split_table.get_split_guard(ulid1, &Uri::for_test(TEST_STORAGE_URI)); let ulid2 = Ulid::new(); - split_table.report(ulid2, Uri::for_test("s3://test`/")); + split_table.report(ulid2, Uri::for_test("s3://test`/"), String::new()); let download = split_table.start_download(ulid2); assert!(download.is_some()); assert!(split_table.start_download(ulid2).is_none()); @@ -99,11 +99,11 @@ fn test_eviction_due_to_size() { (split_ulids[5], 300_000), ]; for (split_ulid, num_bytes) in splits { - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); split_table.register_as_downloaded(split_ulid, num_bytes); } let new_ulid = Ulid::new(); - split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let DownloadOpportunity { splits_to_delete, split_to_download, @@ -133,11 +133,11 @@ fn test_eviction_due_to_num_splits() { (split_ulids[5], 300_000), ]; for (split_ulid, num_bytes) in splits { - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); split_table.register_as_downloaded(split_ulid, num_bytes); } let new_ulid = Ulid::new(); - split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let DownloadOpportunity { splits_to_delete, split_to_download, @@ -154,10 +154,10 @@ fn test_failed_download_can_be_re_reported() { num_concurrent_downloads: NonZeroU32::new(1).unwrap(), }); let split_ulid = Ulid::new(); - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate = split_table.start_download(split_ulid).unwrap(); // This report should be cancelled as we have a download currently running. - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); assert!(split_table.start_download(split_ulid).is_none()); std::mem::drop(candidate); @@ -166,7 +166,7 @@ fn test_failed_download_can_be_re_reported() { assert!(split_table.start_download(split_ulid).is_none()); // This report should be considered as our candidate (and its alive token has been dropped) - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate2 = split_table.start_download(split_ulid).unwrap(); assert_eq!(candidate2.split_ulid, split_ulid);