From a6488c9a09bb029ddcfa7fd33716cda6d3e86d6a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 19 Jun 2026 10:01:58 +0200 Subject: [PATCH 1/3] Add split_key_prefix_len to index config to shard S3 object keys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Recent splits share a ULID timestamp prefix, causing S3 key hotspots under high read load. Setting split_key_prefix_len (e.g. 2) on an index extracts N characters from the ULID random portion (positions 10–25) as a subdirectory prefix, distributing new splits across 32^N S3 partitions. Old splits (prefix not set in SplitMetadata) continue using the legacy flat path; no migration needed. SplitMetadata will store a `prefix` string computed once at creation time via compute_split_key_prefix(). --- config/tutorials/hdfs-logs/index-config.yaml | 6 ++ quickwit/quickwit-common/src/lib.rs | 78 +++++++++++++++++++ .../quickwit-config/src/index_config/mod.rs | 19 +++++ .../src/index_config/serialize.rs | 36 +++++++++ .../quickwit-config/src/index_template/mod.rs | 9 +++ .../src/index_template/serialize.rs | 4 + .../src/actors/indexing_pipeline.rs | 7 ++ .../src/actors/indexing_service.rs | 2 + .../src/actors/merge_pipeline.rs | 3 + .../src/actors/merge_split_downloader.rs | 7 +- .../quickwit-indexing/src/actors/uploader.rs | 16 +++- .../src/models/split_attrs.rs | 2 + .../src/split_store/indexing_split_store.rs | 28 ++++--- .../src/actors/delete_task_pipeline.rs | 1 + .../quickwit-metastore/src/split_metadata.rs | 15 +++- .../src/split_metadata_version.rs | 6 ++ .../protos/quickwit/search.proto | 4 + .../src/codegen/quickwit/quickwit.search.rs | 6 ++ .../quickwit-search/src/cluster_client.rs | 3 + quickwit/quickwit-search/src/leaf.rs | 10 ++- quickwit/quickwit-search/src/leaf_cache.rs | 5 ++ quickwit/quickwit-search/src/lib.rs | 1 + quickwit/quickwit-search/src/retry/mod.rs | 1 + quickwit/quickwit-search/src/retry/search.rs | 2 + quickwit/quickwit-search/src/root.rs | 2 + .../src/split_cache/download_task.rs | 9 ++- .../quickwit-storage/src/split_cache/mod.rs | 29 ++++--- .../src/split_cache/split_table.rs | 48 +++++++----- .../quickwit-storage/src/split_cache/tests.rs | 26 +++---- 29 files changed, 325 insertions(+), 60 deletions(-) diff --git a/config/tutorials/hdfs-logs/index-config.yaml b/config/tutorials/hdfs-logs/index-config.yaml index 6b949dfe079..6e9713c1962 100644 --- a/config/tutorials/hdfs-logs/index-config.yaml +++ b/config/tutorials/hdfs-logs/index-config.yaml @@ -32,3 +32,9 @@ doc_mapping: search_settings: default_search_fields: [severity_text, body] + +# Uncomment to distribute S3 object keys across 1024 prefixes and reduce rate limiting +# on high-throughput indexes where recent splits would otherwise cluster on the same +# S3 partition. Affects only newly created splits; existing splits continue to use +# the legacy flat naming scheme. +# split_key_prefix_len: 2 diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 50dcdeec846..ae0aa5e37c2 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -112,6 +112,84 @@ pub fn split_file(split_id: impl Display) -> String { format!("{split_id}.split") } +/// Computes the S3 key prefix for a split from its ULID string and the configured length. +/// +/// Extracts `prefix_len` characters starting at position 10 of the ULID (the first characters +/// of the random portion, after the 10-character timestamp). Returns an empty string when +/// `prefix_len` is 0 (legacy flat scheme). +/// +/// The returned prefix does NOT contain the trailing `/` separator; that separator is added by +/// [`split_storage_path`] when building the full storage path. +/// +/// If `split_id` is too short to extract the requested prefix (hidden contract: must be a +/// valid 26-character ULID and `prefix_len <= 16`), logs a rate-limited warning and returns +/// an empty string, falling back to the legacy flat scheme. +pub fn compute_split_key_prefix(split_id: &str, prefix_len: u8) -> String { + if prefix_len == 0 { + return String::new(); + } + let end = 10 + prefix_len as usize; + if split_id.len() < end { + crate::rate_limited_warn!( + limit_per_min = 1, + split_id = split_id, + prefix_len = prefix_len, + "split ID is too short to extract prefix; falling back to flat storage path" + ); + return String::new(); + } + split_id[10..end].to_string() +} + +/// Returns the storage path for a split given its ID and key prefix. +/// +/// - Empty `prefix`: legacy flat scheme `{split_id}.split` +/// - Non-empty `prefix`: `{prefix}/{split_id}.split` — the `/` separator is inserted here, so the +/// `prefix` itself must NOT contain a trailing `/` (as produced by [`compute_split_key_prefix`]) +/// +/// The prefix is typically computed once at split creation time via +/// [`compute_split_key_prefix`] and stored in `SplitMetadata`. +pub fn split_storage_path(split_id: &str, prefix: &str) -> String { + if prefix.is_empty() { + return format!("{split_id}.split"); + } + format!("{prefix}/{split_id}.split") +} + +#[cfg(test)] +mod split_path_tests { + use super::*; + + const SAMPLE_ULID: &str = "01ARZ3NDEKTSV4RRFFQ69G5FAV"; + + #[test] + fn test_compute_split_key_prefix_zero() { + assert_eq!(compute_split_key_prefix(SAMPLE_ULID, 0), ""); + } + + #[test] + fn test_compute_split_key_prefix_four() { + // Chars 10–13 of the ULID are the first 4 chars of the random portion. + assert_eq!(compute_split_key_prefix(SAMPLE_ULID, 4), "TSV4"); + } + + #[test] + fn test_split_storage_path_empty_prefix() { + assert_eq!( + split_storage_path(SAMPLE_ULID, ""), + "01ARZ3NDEKTSV4RRFFQ69G5FAV.split" + ); + } + + #[test] + fn test_split_storage_path_with_prefix() { + assert_eq!( + split_storage_path(SAMPLE_ULID, "TS"), + "TS/01ARZ3NDEKTSV4RRFFQ69G5FAV.split" + ); + } +} + fn get_from_env_opt_aux( key: &str, parse_fn: impl FnOnce(&str) -> Option, diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index d536e4edab5..281dd64e984 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -380,6 +380,10 @@ fn prepend_at_char(schedule: &str) -> String { trimmed_schedule.to_string() } +fn is_zero_u8(v: &u8) -> bool { + *v == 0 +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(deny_unknown_fields)] #[serde(into = "VersionedIndexConfig")] @@ -392,6 +396,19 @@ pub struct IndexConfig { pub ingest_settings: IngestSettings, pub search_settings: SearchSettings, pub retention_policy_opt: Option, + /// Number of characters from the ULID random portion (positions 10–25) used as an S3 key + /// prefix directory to distribute object keys across S3 partitions and avoid hotspots. + /// + /// - `0` (default): legacy flat scheme `{split_id}.split` + /// - `N > 0`: `{split_id[10..10+N]}/{split_id}.split`, giving 32^N distinct prefixes + /// + /// Setting this to `2` creates 1024 buckets, which is sufficient for most workloads. + /// Maximum value is 16 (length of the ULID random portion). + /// + /// See [S3 performance guidelines](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html) + /// for background on key prefix partitioning. + #[serde(default, skip_serializing_if = "is_zero_u8")] + pub split_key_prefix_len: u8, } impl IndexConfig { @@ -501,6 +518,7 @@ impl IndexConfig { ingest_settings: IngestSettings::default(), search_settings, retention_policy_opt: None, + split_key_prefix_len: 0, } } } @@ -611,6 +629,7 @@ impl crate::TestableForRegression for IndexConfig { ingest_settings, search_settings, retention_policy_opt, + split_key_prefix_len: 0, } } diff --git a/quickwit/quickwit-config/src/index_config/serialize.rs b/quickwit/quickwit-config/src/index_config/serialize.rs index 24fcc6d1ac2..b7eb64458ad 100644 --- a/quickwit/quickwit-config/src/index_config/serialize.rs +++ b/quickwit/quickwit-config/src/index_config/serialize.rs @@ -126,6 +126,15 @@ impl IndexConfigForSerialization { let index_uri = self.index_uri_or_fallback_to_default(default_index_root_uri)?; + const MAX_SPLIT_KEY_PREFIX_LEN: u8 = 16; + ensure!( + self.split_key_prefix_len <= MAX_SPLIT_KEY_PREFIX_LEN, + "`split_key_prefix_len` must be at most {} (the length of the ULID random portion), \ + but {} was provided", + MAX_SPLIT_KEY_PREFIX_LEN, + self.split_key_prefix_len, + ); + let index_config = IndexConfig { index_id: self.index_id, index_uri, @@ -134,6 +143,7 @@ impl IndexConfigForSerialization { ingest_settings: self.ingest_settings, search_settings: self.search_settings, retention_policy_opt: self.retention_policy_opt, + split_key_prefix_len: self.split_key_prefix_len, }; validate_index_config( &index_config.doc_mapping, @@ -184,6 +194,8 @@ pub struct IndexConfigV0_8 { #[serde(rename = "retention")] #[serde(default)] pub retention_policy_opt: Option, + #[serde(default, skip_serializing_if = "super::is_zero_u8")] + pub split_key_prefix_len: u8, } impl From for IndexConfigV0_8 { @@ -196,6 +208,7 @@ impl From for IndexConfigV0_8 { ingest_settings: index_config.ingest_settings, search_settings: index_config.search_settings, retention_policy_opt: index_config.retention_policy_opt, + split_key_prefix_len: index_config.split_key_prefix_len, } } } @@ -558,4 +571,27 @@ mod test { ) .expect_err("field required for default search is absent"); } + + #[test] + fn test_validate_split_key_prefix_len_too_long() { + let mut invalid_index_config = minimal_index_config_for_serialization(); + invalid_index_config.split_key_prefix_len = 17; + let validation_err = invalid_index_config + .build_and_validate(None) + .unwrap_err() + .to_string(); + assert!( + validation_err.contains("split_key_prefix_len"), + "unexpected error: {validation_err}" + ); + } + + #[test] + fn test_validate_split_key_prefix_len_max_valid() { + let mut index_config = minimal_index_config_for_serialization(); + index_config.split_key_prefix_len = 16; + index_config + .build_and_validate(Some(&Uri::for_test("s3://quickwit-indexes"))) + .expect("prefix_len of 16 should be valid"); + } } diff --git a/quickwit/quickwit-config/src/index_template/mod.rs b/quickwit/quickwit-config/src/index_template/mod.rs index d312edab1be..1186f91153c 100644 --- a/quickwit/quickwit-config/src/index_template/mod.rs +++ b/quickwit/quickwit-config/src/index_template/mod.rs @@ -29,6 +29,10 @@ use crate::{ pub type IndexTemplateId = String; pub type IndexIdPattern = String; +fn is_zero_u8(v: &u8) -> bool { + *v == 0 +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(into = "VersionedIndexTemplate")] #[serde(from = "VersionedIndexTemplate")] @@ -51,6 +55,8 @@ pub struct IndexTemplate { #[serde(rename = "retention")] #[serde(default)] pub retention_policy_opt: Option, + #[serde(default, skip_serializing_if = "is_zero_u8")] + pub split_key_prefix_len: u8, } impl IndexTemplate { @@ -77,6 +83,7 @@ impl IndexTemplate { ingest_settings: self.ingest_settings.clone(), search_settings: self.search_settings.clone(), retention_policy_opt: self.retention_policy_opt.clone(), + split_key_prefix_len: self.split_key_prefix_len, }; Ok(index_config) } @@ -134,6 +141,7 @@ impl IndexTemplate { ingest_settings: IngestSettings::default(), search_settings: SearchSettings::default(), retention_policy_opt: None, + split_key_prefix_len: 0, } } } @@ -178,6 +186,7 @@ impl crate::TestableForRegression for IndexTemplate { retention_period: "42 days".to_string(), evaluation_schedule: "daily".to_string(), }), + split_key_prefix_len: 0, } } diff --git a/quickwit/quickwit-config/src/index_template/serialize.rs b/quickwit/quickwit-config/src/index_template/serialize.rs index 087d47b7856..c8953987e01 100644 --- a/quickwit/quickwit-config/src/index_template/serialize.rs +++ b/quickwit/quickwit-config/src/index_template/serialize.rs @@ -55,6 +55,8 @@ pub struct IndexTemplateV0_8 { pub search_settings: SearchSettings, #[serde(default)] pub retention: Option, + #[serde(default, skip_serializing_if = "super::is_zero_u8")] + pub split_key_prefix_len: u8, } impl From for IndexTemplate { @@ -84,6 +86,7 @@ impl From for IndexTemplate { ingest_settings: index_template_v0_8.ingest_settings, search_settings: index_template_v0_8.search_settings, retention_policy_opt: index_template_v0_8.retention, + split_key_prefix_len: index_template_v0_8.split_key_prefix_len, } } } @@ -101,6 +104,7 @@ impl From for IndexTemplateV0_8 { ingest_settings: index_template.ingest_settings, search_settings: index_template.search_settings, retention: index_template.retention_policy_opt, + split_key_prefix_len: index_template.split_key_prefix_len, } } } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 9d6287245ac..39b1902ee4d 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -330,6 +330,7 @@ impl IndexingPipeline { SplitsUpdateMailbox::Sequencer(sequencer_mailbox), self.params.max_concurrent_split_uploads_index, self.params.event_broker.clone(), + self.params.split_key_prefix_len, ); let (uploader_mailbox, uploader_handle) = ctx .spawn_actor() @@ -526,6 +527,7 @@ pub struct IndexingPipelineParams { pub split_store: IndexingSplitStore, pub max_concurrent_split_uploads_index: usize, pub cooperative_indexing_permits: Option>, + pub split_key_prefix_len: u8, // Merge-related parameters pub merge_policy: Arc, @@ -668,6 +670,7 @@ mod tests { max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, + split_key_prefix_len: 0, merge_planner_mailbox, event_broker: EventBroker::default(), params_fingerprint: 42u64, @@ -792,6 +795,7 @@ mod tests { max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, + split_key_prefix_len: 0, merge_planner_mailbox, event_broker: Default::default(), params_fingerprint: 42u64, @@ -871,6 +875,7 @@ mod tests { merge_io_throughput_limiter_opt: None, merge_scheduler_service: universe.get_or_spawn_one(), event_broker: Default::default(), + split_key_prefix_len: 0, }; let merge_pipeline = MergePipeline::new(merge_pipeline_params, None, universe.spawn_ctx()); let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone(); @@ -893,6 +898,7 @@ mod tests { max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, + split_key_prefix_len: 0, merge_planner_mailbox: merge_planner_mailbox.clone(), event_broker: Default::default(), params_fingerprint: 42u64, @@ -1021,6 +1027,7 @@ mod tests { max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, + split_key_prefix_len: 0, merge_planner_mailbox, params_fingerprint: 42u64, event_broker: Default::default(), diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 57197a8fc58..536276d62aa 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -383,6 +383,7 @@ impl IndexingService { merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), max_concurrent_split_uploads: self.max_concurrent_split_uploads, event_broker: self.event_broker.clone(), + split_key_prefix_len: index_config.split_key_prefix_len, }; let merge_planner_mailbox = self.get_or_create_merge_pipeline(merge_pipeline_params, immature_splits_opt, ctx)?; @@ -402,6 +403,7 @@ impl IndexingService { split_store, max_concurrent_split_uploads_index, cooperative_indexing_permits: self.cooperative_indexing_permits.clone(), + split_key_prefix_len: index_config.split_key_prefix_len, merge_policy, retention_policy, max_concurrent_split_uploads_merge, diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 53ece0a09fa..2bb30c1ba1b 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -287,6 +287,7 @@ impl MergePipeline { merge_publisher_mailbox.into(), self.params.max_concurrent_split_uploads, self.params.event_broker.clone(), + self.params.split_key_prefix_len, ); let (merge_uploader_mailbox, merge_uploader_handle) = ctx .spawn_actor() @@ -563,6 +564,7 @@ pub struct MergePipelineParams { pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline. pub merge_io_throughput_limiter_opt: Option, pub event_broker: EventBroker, + pub split_key_prefix_len: u8, } #[cfg(test)] @@ -627,6 +629,7 @@ mod tests { max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, event_broker: Default::default(), + split_key_prefix_len: 0, }; let pipeline = MergePipeline::new(pipeline_params, None, universe.spawn_ctx()); let _merge_planner_mailbox = pipeline.merge_planner_mailbox().clone(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 5d68bb59285..e5f154c72cb 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -114,7 +114,12 @@ impl MergeSplitDownloader { let _protect_guard = ctx.protect_zone(); let tantivy_dir = self .split_store - .fetch_and_open_split(split.split_id(), download_directory, &io_controls) + .fetch_and_open_split( + split.split_id(), + &split.prefix, + download_directory, + &io_controls, + ) .await .map_err(|error| { let split_id = split.split_id(); diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index 0b2901f26d5..7b8f62b52c6 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -169,6 +169,7 @@ pub struct Uploader { max_concurrent_split_uploads: usize, counters: UploaderCounters, event_broker: EventBroker, + split_key_prefix_len: u8, } impl Uploader { @@ -182,6 +183,7 @@ impl Uploader { split_update_mailbox: SplitsUpdateMailbox, max_concurrent_split_uploads: usize, event_broker: EventBroker, + split_key_prefix_len: u8, ) -> Uploader { Uploader { uploader_type, @@ -193,6 +195,7 @@ impl Uploader { max_concurrent_split_uploads, counters: Default::default(), event_broker, + split_key_prefix_len, } } async fn acquire_semaphore( @@ -297,6 +300,7 @@ impl Handler for Uploader { let ctx_clone = ctx.clone(); let merge_policy = self.merge_policy.clone(); let retention_policy = self.retention_policy.clone(); + let split_key_prefix_len = self.split_key_prefix_len; debug!(split_ids=?split_ids, "start-stage-and-store-splits"); let event_broker = self.event_broker.clone(); spawn_named_task( @@ -327,17 +331,22 @@ impl Handler for Uploader { return; } }; - let split_metadata = create_split_metadata( + let mut split_metadata = create_split_metadata( &merge_policy, retention_policy.as_ref(), &packaged_split.split_attrs, packaged_split.tags.clone(), split_streamer.footer_range.start..split_streamer.footer_range.end, ); + split_metadata.prefix = quickwit_common::compute_split_key_prefix( + split_metadata.split_id(), + split_key_prefix_len, + ); report_splits.push(ReportSplit { storage_uri: split_store.remote_uri().to_string(), split_id: packaged_split.split_id().to_string(), + prefix: split_metadata.prefix.clone(), }); split_metadata_list.push(split_metadata); @@ -563,6 +572,7 @@ mod tests { SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, event_broker, + 0, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory = TempDirectory::for_test(); @@ -679,6 +689,7 @@ mod tests { SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, EventBroker::default(), + 0, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory_1 = TempDirectory::for_test(); @@ -827,6 +838,7 @@ mod tests { SplitsUpdateMailbox::Publisher(publisher_mailbox), 4, EventBroker::default(), + 0, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory = TempDirectory::for_test(); @@ -901,6 +913,7 @@ mod tests { SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, EventBroker::default(), + 0, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let checkpoint_delta = IndexCheckpointDelta { @@ -1006,6 +1019,7 @@ mod tests { SplitsUpdateMailbox::Publisher(publisher_mailbox), 4, event_broker, + 0, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory = TempDirectory::for_test(); diff --git a/quickwit/quickwit-indexing/src/models/split_attrs.rs b/quickwit/quickwit-indexing/src/models/split_attrs.rs index 2e0504bdd35..fa9b30e4b6b 100644 --- a/quickwit/quickwit-indexing/src/models/split_attrs.rs +++ b/quickwit/quickwit-indexing/src/models/split_attrs.rs @@ -125,6 +125,8 @@ pub fn create_split_metadata( footer_offsets, delete_opstamp: split_attrs.delete_opstamp, num_merge_ops: split_attrs.num_merge_ops, + // prefix is set by the Uploader after calling create_split_metadata + prefix: String::new(), } } diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs index 88904cbd6dd..d6cf782edb7 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs @@ -93,8 +93,8 @@ impl IndexingSplitStore { self.inner.remote_storage.uri() } - fn split_path(&self, split_id: &str) -> PathBuf { - PathBuf::from(quickwit_common::split_file(split_id)) + fn split_path(&self, split_id: &str, prefix: &str) -> PathBuf { + PathBuf::from(quickwit_common::split_storage_path(split_id, prefix)) } /// Stores a split. @@ -116,7 +116,7 @@ impl IndexingSplitStore { let start = Instant::now(); let split_num_bytes = put_payload.len(); - let key = self.split_path(split.split_id()); + let key = self.split_path(split.split_id(), &split.prefix); let is_mature = split.is_mature(OffsetDateTime::now_utc()); self.inner .remote_storage @@ -179,10 +179,13 @@ impl IndexingSplitStore { pub async fn fetch_and_open_split( &self, split_id: &str, + prefix: &str, output_dir_path: &Path, io_controls: &IoControls, ) -> StorageResult> { - let path = PathBuf::from(quickwit_common::split_file(split_id)); + // The local filename is always flat (no prefix directory) — the prefix only affects the + // remote S3 key. + let local_filename = quickwit_common::split_file(split_id); if let Some(split_path) = self .inner .split_cache @@ -198,13 +201,14 @@ impl IndexingSplitStore { } else { tracing::Span::current().record("cache_hit", false); } - let dest_filepath = output_dir_path.join(&path); + let remote_path = self.split_path(split_id, prefix); + let dest_filepath = output_dir_path.join(&local_filename); let dest_file = tokio::fs::File::create(&dest_filepath).await?; let mut dest_file_with_write_limit = io_controls.clone().wrap_write(dest_file); self.inner .remote_storage - .copy_to(&path, &mut dest_file_with_write_limit) - .instrument(info_span!("fetch_split_from_remote_storage", path=?path)) + .copy_to(&remote_path, &mut dest_file_with_write_limit) + .instrument(info_span!("fetch_split_from_remote_storage", path=?remote_path)) .await?; get_tantivy_directory_from_split_bundle(&dest_filepath) } @@ -314,7 +318,7 @@ mod tests { { let output = tempfile::tempdir()?; let split1 = split_store - .fetch_and_open_split(&split_id1, output.path(), &io_controls) + .fetch_and_open_split(&split_id1, "", output.path(), &io_controls) .await?; let local_store_stats = split_store.inspect_split_cache().await; assert_eq!(local_store_stats.len(), 1); @@ -323,7 +327,7 @@ mod tests { { let output = tempfile::tempdir()?; let split2 = split_store - .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .fetch_and_open_split(&split_id2, "", output.path(), &io_controls) .await?; let local_store_stats = split_store.inspect_split_cache().await; assert_eq!(local_store_stats.len(), 0); @@ -410,7 +414,7 @@ mod tests { // get from remote storage because split_id1 was evicted by split_id2 let output = tempfile::tempdir()?; let _split1 = split_store - .fetch_and_open_split(&split_id1, output.path(), &io_controls) + .fetch_and_open_split(&split_id1, "", output.path(), &io_controls) .await?; assert_eq!(io_controls.num_bytes(), split_payload1.len()); } @@ -418,7 +422,7 @@ mod tests { // get from cache let output = tempfile::tempdir()?; let _split2 = split_store - .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .fetch_and_open_split(&split_id2, "", output.path(), &io_controls) .await?; // the number of downloaded by didn't change (still the size of split_payload1) assert_eq!(io_controls.num_bytes(), split_payload1.len()); @@ -427,7 +431,7 @@ mod tests { // get from remote because getting from cache removes the split from the cache let output = tempfile::tempdir()?; let _split2 = split_store - .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .fetch_and_open_split(&split_id2, "", output.path(), &io_controls) .await?; assert_eq!( io_controls.num_bytes(), diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 861c0bde62f..a266ee39433 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -182,6 +182,7 @@ impl DeleteTaskPipeline { SplitsUpdateMailbox::Publisher(publisher_mailbox), self.max_concurrent_split_uploads, self.event_broker.clone(), + index_config.split_key_prefix_len, ); let (uploader_mailbox, uploader_supervisor_handler) = ctx.spawn_actor().supervise(uploader); diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index fe88fe379d3..8733c961ba6 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -132,6 +132,17 @@ pub struct SplitMetadata { /// Doc mapping UID used when creating this split. This split may only be merged with other /// splits using the same doc mapping UID. pub doc_mapping_uid: DocMappingUid, + + /// S3 key prefix directory used when this split was stored (e.g., `"ND"`). + /// + /// An empty string means the legacy flat scheme: `{split_id}.split` directly under the index + /// URI. A non-empty prefix (with no trailing `/`) means the split is stored at + /// `{prefix}/{split_id}.split`. + /// + /// This value is set once at split creation time from `IndexConfig.split_key_prefix_len` + /// and never changed afterwards. + #[serde(default)] + pub prefix: String, } impl fmt::Debug for SplitMetadata { @@ -229,7 +240,7 @@ impl SplitMetadata { /// Converts the split metadata into a [`SplitInfo`]. pub fn as_split_info(&self) -> SplitInfo { - let file_name = quickwit_common::split_file(self.split_id()); + let file_name = quickwit_common::split_storage_path(self.split_id(), &self.prefix); SplitInfo { uncompressed_docs_size_bytes: ByteSize(self.uncompressed_docs_size_in_bytes), @@ -281,6 +292,7 @@ impl quickwit_config::TestableForRegression for SplitMetadata { footer_offsets: 1000..2000, num_merge_ops: 3, doc_mapping_uid: DocMappingUid::default(), + prefix: String::new(), } } @@ -421,6 +433,7 @@ mod tests { delete_opstamp: 0, num_merge_ops: 0, doc_mapping_uid: DocMappingUid::default(), + prefix: String::new(), }; let expected_output = "SplitMetadata { split_id: \"split-1\", index_uid: IndexUid { \ diff --git a/quickwit/quickwit-metastore/src/split_metadata_version.rs b/quickwit/quickwit-metastore/src/split_metadata_version.rs index 8325290be92..e2b050c0d54 100644 --- a/quickwit/quickwit-metastore/src/split_metadata_version.rs +++ b/quickwit/quickwit-metastore/src/split_metadata_version.rs @@ -92,6 +92,10 @@ pub(crate) struct SplitMetadataV0_8 { // splits before when updates first appeared are compatible with each other. #[serde(default)] doc_mapping_uid: DocMappingUid, + + #[serde(default)] + #[serde(skip_serializing_if = "String::is_empty")] + pub prefix: String, } impl From for SplitMetadata { @@ -128,6 +132,7 @@ impl From for SplitMetadata { footer_offsets: v8.footer_offsets, num_merge_ops: v8.num_merge_ops, doc_mapping_uid: v8.doc_mapping_uid, + prefix: v8.prefix, } } } @@ -150,6 +155,7 @@ impl From for SplitMetadataV0_8 { footer_offsets: split.footer_offsets, num_merge_ops: split.num_merge_ops, doc_mapping_uid: split.doc_mapping_uid, + prefix: split.prefix, } } } diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index d50c8b529e6..61bb314d952 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -105,6 +105,8 @@ message ReportSplit { string split_id = 2; // The storage uri. This URI does NOT include the split id. string storage_uri = 1; + // S3 key prefix directory with no trailing "/" (e.g. "ND"). Empty string means the legacy flat scheme. + string prefix = 3; } message ReportSplitsRequest { @@ -517,6 +519,8 @@ message SplitIdAndFooterOffsets { optional int64 timestamp_end = 5; // The number of docs in the split uint64 num_docs = 6; + // S3 key prefix directory with no trailing "/" (e.g. "ND"). Empty string means the legacy flat scheme. + string prefix = 7; } // Hits returned by a FetchDocRequest. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 665ecdf5f96..78b7fb9960f 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -43,6 +43,9 @@ pub struct ReportSplit { /// The storage uri. This URI does NOT include the split id. #[prost(string, tag = "1")] pub storage_uri: ::prost::alloc::string::String, + /// S3 key prefix directory with no trailing "/" (e.g. "ND"). Empty string means the legacy flat scheme. + #[prost(string, tag = "3")] + pub prefix: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -480,6 +483,9 @@ pub struct SplitIdAndFooterOffsets { /// The number of docs in the split #[prost(uint64, tag = "6")] pub num_docs: u64, + /// S3 key prefix directory with no trailing "/" (e.g. "ND"). Empty string means the legacy flat scheme. + #[prost(string, tag = "7")] + pub prefix: ::prost::alloc::string::String, } /// Hits returned by a FetchDocRequest. /// diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 168ab5993b0..2acaebfb83d 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -336,6 +336,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }], ..Default::default() } @@ -363,6 +364,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }, SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -371,6 +373,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }, ], }], diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index d632742bd28..4a0fa81d6a7 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -132,7 +132,10 @@ async fn get_split_footer_from_cache_or_fetch( return Ok(footer_data); } } - let split_file = PathBuf::from(format!("{}.split", split_and_footer_offsets.split_id)); + let split_file = PathBuf::from(quickwit_common::split_storage_path( + &split_and_footer_offsets.split_id, + &split_and_footer_offsets.prefix, + )); let footer_data_opt = index_storage .get_slice( &split_file, @@ -163,7 +166,10 @@ pub(crate) async fn open_split_bundle( index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, ) -> anyhow::Result<(FileSlice, BundleStorage)> { - let split_file = PathBuf::from(format!("{}.split", split_and_footer_offsets.split_id)); + let split_file = PathBuf::from(quickwit_common::split_storage_path( + &split_and_footer_offsets.split_id, + &split_and_footer_offsets.prefix, + )); let footer_data = get_split_footer_from_cache_or_fetch( index_storage.clone(), split_and_footer_offsets, diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index ad0a30cc4e6..2b739a4e482 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -261,6 +261,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }; let split_2 = SplitIdAndFooterOffsets { @@ -270,6 +271,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }; let query_1 = SearchRequest { @@ -340,6 +342,7 @@ mod tests { timestamp_start: Some(100), timestamp_end: Some(199), num_docs: 0, + prefix: String::new(), }; let split_2 = SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -348,6 +351,7 @@ mod tests { timestamp_start: Some(150), timestamp_end: Some(249), num_docs: 0, + prefix: String::new(), }; let split_3 = SplitIdAndFooterOffsets { split_id: "split_3".to_string(), @@ -356,6 +360,7 @@ mod tests { timestamp_start: Some(150), timestamp_end: Some(249), num_docs: 0, + prefix: String::new(), }; let query_1 = SearchRequest { diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index b7c5afd206c..9596dd36e32 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -176,6 +176,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn .as_ref() .map(|time_range| *time_range.end()), num_docs: split_metadata.num_docs as u64, + prefix: split_metadata.prefix.clone(), } } diff --git a/quickwit/quickwit-search/src/retry/mod.rs b/quickwit/quickwit-search/src/retry/mod.rs index 996665717cf..6a5e7a73b83 100644 --- a/quickwit/quickwit-search/src/retry/mod.rs +++ b/quickwit/quickwit-search/src/retry/mod.rs @@ -128,6 +128,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }; let client_for_retry = retry_client( &search_job_placer, diff --git a/quickwit/quickwit-search/src/retry/search.rs b/quickwit/quickwit-search/src/retry/search.rs index 696a352de94..9c08337e5f1 100644 --- a/quickwit/quickwit-search/src/retry/search.rs +++ b/quickwit/quickwit-search/src/retry/search.rs @@ -93,6 +93,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }, SplitIdAndFooterOffsets { split_id: "split_2".to_string(), @@ -101,6 +102,7 @@ mod tests { timestamp_start: None, timestamp_end: None, num_docs: 0, + prefix: String::new(), }, ], }], diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 06c436b58a8..c4c7e5beba7 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -1988,6 +1988,7 @@ mod tests { ingest_settings, search_settings, retention_policy_opt: None, + split_key_prefix_len: 0, }) } @@ -2162,6 +2163,7 @@ mod tests { indexing_settings, search_settings, retention_policy_opt: None, + split_key_prefix_len: 0, }) } diff --git a/quickwit/quickwit-storage/src/split_cache/download_task.rs b/quickwit/quickwit-storage/src/split_cache/download_task.rs index 9af0390bfb5..f40555582df 100644 --- a/quickwit/quickwit-storage/src/split_cache/download_task.rs +++ b/quickwit/quickwit-storage/src/split_cache/download_task.rs @@ -32,12 +32,15 @@ async fn download_split( split_ulid, storage_uri, living_token: _, + remote_split_path, } = candidate_split; - let split_filename = split_file(*split_ulid); - let target_filepath = root_path.join(&split_filename); + // Local cache always uses the flat filename (no prefix directory); the remote key may be + // sharded under a prefix directory (carried verbatim in `remote_split_path`). + let local_filename = split_file(*split_ulid); + let target_filepath = root_path.join(&local_filename); let storage = storage_resolver.resolve(storage_uri).await?; let num_bytes = storage - .copy_to_file(Path::new(&split_filename), &target_filepath) + .copy_to_file(Path::new(remote_split_path), &target_filepath) .await?; Ok(num_bytes) } diff --git a/quickwit/quickwit-storage/src/split_cache/mod.rs b/quickwit/quickwit-storage/src/split_cache/mod.rs index 1c0de124d73..ef5ecb68c2e 100644 --- a/quickwit/quickwit-storage/src/split_cache/mod.rs +++ b/quickwit/quickwit-storage/src/split_cache/mod.rs @@ -146,20 +146,27 @@ impl SplitCache { error!(storage_uri=%report_split.storage_uri, "received invalid storage uri: ignoring"); continue; }; - split_table.report(split_ulid, storage_uri); + let remote_split_path = + quickwit_common::split_storage_path(&report_split.split_id, &report_split.prefix); + split_table.report(split_ulid, storage_uri, remote_split_path); } } // Returns a split guard object. As long as it is not dropped, the // split won't be evinced from the cache. - async fn get_split_file(&self, split_id: Ulid, storage_uri: &Uri) -> Option { + async fn get_split_file( + &self, + split_id: Ulid, + storage_uri: &Uri, + remote_split_path: String, + ) -> Option { // We touch before even checking the fd cache in order to update the file's last access time // for the file cache. - let num_bytes_opt: Option = self - .split_table - .lock() - .unwrap() - .touch(split_id, storage_uri); + let num_bytes_opt: Option = + self.split_table + .lock() + .unwrap() + .touch(split_id, storage_uri, remote_split_path); let num_bytes = num_bytes_opt?; self.fd_cache @@ -200,18 +207,22 @@ struct SplitCacheBackingStorage { impl SplitCacheBackingStorage { async fn get_impl(&self, path: &Path, byte_range: Range) -> Option { let split_id = split_id_from_path(path)?; + // The path we are handed is exactly the relative storage key of the split. + let remote_split_path = path.to_string_lossy().into_owned(); let split_file: SplitFile = self .split_cache - .get_split_file(split_id, &self.storage_root_uri) + .get_split_file(split_id, &self.storage_root_uri, remote_split_path) .await?; split_file.get_range(byte_range).await.ok() } async fn get_all_impl(&self, path: &Path) -> Option { let split_id = split_id_from_path(path)?; + // The path we are handed is exactly the relative storage key of the split. + let remote_split_path = path.to_string_lossy().into_owned(); let split_file = self .split_cache - .get_split_file(split_id, &self.storage_root_uri) + .get_split_file(split_id, &self.storage_root_uri, remote_split_path) .await?; split_file.get_all().await.ok() } diff --git a/quickwit/quickwit-storage/src/split_cache/split_table.rs b/quickwit/quickwit-storage/src/split_cache/split_table.rs index ee24a393e84..7b9b53f9786 100644 --- a/quickwit/quickwit-storage/src/split_cache/split_table.rs +++ b/quickwit/quickwit-storage/src/split_cache/split_table.rs @@ -237,7 +237,12 @@ impl SplitTable { /// If the file is already on the disk cache, return `Some(num_bytes)`. /// If the file is not in cache, return `None`, and register the file in the candidate for /// download list. - pub fn touch(&mut self, split_ulid: Ulid, storage_uri: &Uri) -> Option { + pub fn touch( + &mut self, + split_ulid: Ulid, + storage_uri: &Uri, + remote_split_path: String, + ) -> Option { let timestamp = compute_timestamp(self.origin_time); let status = self.mutate_split(split_ulid, |old_split_info| { if let Some(mut split_info) = old_split_info { @@ -253,6 +258,7 @@ impl SplitTable { storage_uri: storage_uri.clone(), split_ulid, living_token: Arc::new(()), + remote_split_path, }), } } @@ -298,7 +304,7 @@ impl SplitTable { }); } - pub(crate) fn report(&mut self, split_ulid: Ulid, storage_uri: Uri) { + pub(crate) fn report(&mut self, split_ulid: Ulid, storage_uri: Uri, remote_split_path: String) { let origin_time = self.origin_time; self.mutate_split(split_ulid, move |split_info_opt| { if let Some(split_info) = split_info_opt { @@ -314,6 +320,7 @@ impl SplitTable { storage_uri, split_ulid, living_token: Arc::new(()), + remote_split_path, }), } }); @@ -435,6 +442,10 @@ pub(crate) struct CandidateSplit { pub storage_uri: Uri, pub split_ulid: Ulid, pub living_token: Arc<()>, + /// Relative storage key of the split within `storage_uri`, used as-is to fetch it from remote + /// storage. E.g. `"ND/01ARZ….split"` for a sharded split, or `"01ARZ….split"` for the legacy + /// flat scheme. The local cache filename is always flat and derived from `split_ulid`. + pub remote_split_path: String, } pub(crate) struct DownloadOpportunity { @@ -481,8 +492,8 @@ mod tests { let ulids = sorted_split_ulids(2); let ulid1 = ulids[0]; let ulid2 = ulids[1]; - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); - split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate = split_table.best_candidate().unwrap(); assert_eq!(candidate.split_ulid, ulid2); } @@ -501,9 +512,9 @@ mod tests { let ulids = sorted_split_ulids(2); let ulid1 = ulids[0]; let ulid2 = ulids[1]; - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); - split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); - let num_bytes_opt = split_table.touch(ulid1, &Uri::for_test("s3://test1/")); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI), String::new()); + let num_bytes_opt = split_table.touch(ulid1, &Uri::for_test("s3://test1/"), String::new()); assert!(num_bytes_opt.is_none()); let candidate = split_table.best_candidate().unwrap(); assert_eq!(candidate.split_ulid, ulid1); @@ -521,7 +532,7 @@ mod tests { Default::default(), ); let ulid1 = Ulid::new(); - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); assert_eq!(split_table.num_bytes(), 0); let download = split_table.start_download(ulid1); assert!(download.is_some()); @@ -529,11 +540,11 @@ mod tests { split_table.register_as_downloaded(ulid1, 10_000_000); assert_eq!(split_table.num_bytes(), 10_000_000); assert_eq!( - split_table.touch(ulid1, &Uri::for_test(TEST_STORAGE_URI)), + split_table.touch(ulid1, &Uri::for_test(TEST_STORAGE_URI), String::new()), Some(10_000_000) ); let ulid2 = Ulid::new(); - split_table.report(ulid2, Uri::for_test("s3://test`/")); + split_table.report(ulid2, Uri::for_test("s3://test`/"), String::new()); let download = split_table.start_download(ulid2); assert!(download.is_some()); assert!(split_table.start_download(ulid2).is_none()); @@ -564,11 +575,11 @@ mod tests { (split_ulids[5], 300_000), ]; for (split_ulid, num_bytes) in splits { - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); split_table.register_as_downloaded(split_ulid, num_bytes); } let new_ulid = Ulid::new(); - split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let DownloadOpportunity { splits_to_delete, split_to_download, @@ -602,11 +613,11 @@ mod tests { (split_ulids[5], 300_000), ]; for (split_ulid, num_bytes) in splits { - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); split_table.register_as_downloaded(split_ulid, num_bytes); } let new_ulid = Ulid::new(); - split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let DownloadOpportunity { splits_to_delete, split_to_download, @@ -627,10 +638,10 @@ mod tests { Default::default(), ); let split_ulid = Ulid::new(); - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate = split_table.start_download(split_ulid).unwrap(); // This report should be cancelled as we have a download currently running. - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); assert!(split_table.start_download(split_ulid).is_none()); std::mem::drop(candidate); @@ -639,7 +650,7 @@ mod tests { assert!(split_table.start_download(split_ulid).is_none()); // This report should be considered as our candidate (and its alive token has been dropped) - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate2 = split_table.start_download(split_ulid).unwrap(); assert_eq!(candidate2.split_ulid, split_ulid); @@ -658,7 +669,7 @@ mod tests { ); for i in 1..2_000 { let split_ulid = Ulid::new(); - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); assert_eq!( split_table.candidate_splits.len(), i.min(super::MAX_NUM_CANDIDATES) @@ -684,6 +695,7 @@ mod tests { storage_uri: Uri::for_test(TEST_STORAGE_URI), split_ulid, living_token: Arc::new(()), + remote_split_path: String::new(), }; let split_info = SplitInfo { split_key: SplitKey { diff --git a/quickwit/quickwit-storage/src/split_cache/tests.rs b/quickwit/quickwit-storage/src/split_cache/tests.rs index e08639dc9aa..e3dfa76db94 100644 --- a/quickwit/quickwit-storage/src/split_cache/tests.rs +++ b/quickwit/quickwit-storage/src/split_cache/tests.rs @@ -32,8 +32,8 @@ fn test_split_table() { }); let ulid1 = Ulid::new(); let ulid2 = Ulid::new(); - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); - split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate = split_table.best_candidate().unwrap(); assert_eq!(candidate.split_ulid, ulid2); } @@ -47,8 +47,8 @@ fn test_split_table_prefer_last_touched() { }); let ulid1 = Ulid::new(); let ulid2 = Ulid::new(); - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); - split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI), String::new()); let split_guard_opt = split_table.get_split_guard(ulid1, &Uri::for_test("s3://test1/")); assert!(split_guard_opt.is_none()); let candidate = split_table.best_candidate().unwrap(); @@ -63,7 +63,7 @@ fn test_split_table_prefer_start_download_prevent_new_report() { num_concurrent_downloads: NonZeroU32::new(1).unwrap(), }); let ulid1 = Ulid::new(); - split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI), String::new()); assert_eq!(split_table.num_bytes(), 0); let download = split_table.start_download(ulid1); assert!(download.is_some()); @@ -72,7 +72,7 @@ fn test_split_table_prefer_start_download_prevent_new_report() { assert_eq!(split_table.num_bytes(), 10_000_000); split_table.get_split_guard(ulid1, &Uri::for_test(TEST_STORAGE_URI)); let ulid2 = Ulid::new(); - split_table.report(ulid2, Uri::for_test("s3://test`/")); + split_table.report(ulid2, Uri::for_test("s3://test`/"), String::new()); let download = split_table.start_download(ulid2); assert!(download.is_some()); assert!(split_table.start_download(ulid2).is_none()); @@ -99,11 +99,11 @@ fn test_eviction_due_to_size() { (split_ulids[5], 300_000), ]; for (split_ulid, num_bytes) in splits { - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); split_table.register_as_downloaded(split_ulid, num_bytes); } let new_ulid = Ulid::new(); - split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let DownloadOpportunity { splits_to_delete, split_to_download, @@ -133,11 +133,11 @@ fn test_eviction_due_to_num_splits() { (split_ulids[5], 300_000), ]; for (split_ulid, num_bytes) in splits { - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); split_table.register_as_downloaded(split_ulid, num_bytes); } let new_ulid = Ulid::new(); - split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let DownloadOpportunity { splits_to_delete, split_to_download, @@ -154,10 +154,10 @@ fn test_failed_download_can_be_re_reported() { num_concurrent_downloads: NonZeroU32::new(1).unwrap(), }); let split_ulid = Ulid::new(); - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate = split_table.start_download(split_ulid).unwrap(); // This report should be cancelled as we have a download currently running. - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); assert!(split_table.start_download(split_ulid).is_none()); std::mem::drop(candidate); @@ -166,7 +166,7 @@ fn test_failed_download_can_be_re_reported() { assert!(split_table.start_download(split_ulid).is_none()); // This report should be considered as our candidate (and its alive token has been dropped) - split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI), String::new()); let candidate2 = split_table.start_download(split_ulid).unwrap(); assert_eq!(candidate2.split_ulid, split_ulid); From 015081d6a3cd8ff8421a38c5298c56e2aad66743 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 20 Jun 2026 08:59:50 +0200 Subject: [PATCH 2/3] Move split_key_prefix_len into IndexingSettings --- .../quickwit-config/src/index_config/mod.rs | 29 +++++++++---------- .../src/index_config/serialize.rs | 12 +++----- .../quickwit-config/src/index_template/mod.rs | 8 ----- .../src/index_template/serialize.rs | 4 --- .../src/actors/indexing_service.rs | 4 +-- .../src/actors/delete_task_pipeline.rs | 2 +- quickwit/quickwit-search/src/root.rs | 2 -- 7 files changed, 21 insertions(+), 40 deletions(-) diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 281dd64e984..fed80142e7f 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -130,6 +130,19 @@ pub struct IndexingSettings { pub parquet_indexing: Option, #[serde(default)] pub resources: IndexingResources, + /// Number of characters from the ULID random portion (positions 10–25) used as an S3 key + /// prefix directory to distribute object keys across S3 partitions and avoid hotspots. + /// + /// - `0` (default): legacy flat scheme `{split_id}.split` + /// - `N > 0`: `{split_id[10..10+N]}/{split_id}.split`, giving 32^N distinct prefixes + /// + /// Setting this to `2` creates 1024 buckets, which is sufficient for most workloads. + /// Maximum value is 16 (length of the ULID random portion). + /// + /// See [S3 performance guidelines](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html) + /// for background on key prefix partitioning. + #[serde(default, skip_serializing_if = "is_zero_u8")] + pub split_key_prefix_len: u8, } /// Configuration for the Parquet indexing pipeline (metrics, sketches). @@ -260,6 +273,7 @@ impl Default for IndexingSettings { #[cfg(feature = "metrics")] parquet_indexing: None, resources: IndexingResources::default(), + split_key_prefix_len: 0, } } } @@ -396,19 +410,6 @@ pub struct IndexConfig { pub ingest_settings: IngestSettings, pub search_settings: SearchSettings, pub retention_policy_opt: Option, - /// Number of characters from the ULID random portion (positions 10–25) used as an S3 key - /// prefix directory to distribute object keys across S3 partitions and avoid hotspots. - /// - /// - `0` (default): legacy flat scheme `{split_id}.split` - /// - `N > 0`: `{split_id[10..10+N]}/{split_id}.split`, giving 32^N distinct prefixes - /// - /// Setting this to `2` creates 1024 buckets, which is sufficient for most workloads. - /// Maximum value is 16 (length of the ULID random portion). - /// - /// See [S3 performance guidelines](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html) - /// for background on key prefix partitioning. - #[serde(default, skip_serializing_if = "is_zero_u8")] - pub split_key_prefix_len: u8, } impl IndexConfig { @@ -518,7 +519,6 @@ impl IndexConfig { ingest_settings: IngestSettings::default(), search_settings, retention_policy_opt: None, - split_key_prefix_len: 0, } } } @@ -629,7 +629,6 @@ impl crate::TestableForRegression for IndexConfig { ingest_settings, search_settings, retention_policy_opt, - split_key_prefix_len: 0, } } diff --git a/quickwit/quickwit-config/src/index_config/serialize.rs b/quickwit/quickwit-config/src/index_config/serialize.rs index b7eb64458ad..fdbf4eeb002 100644 --- a/quickwit/quickwit-config/src/index_config/serialize.rs +++ b/quickwit/quickwit-config/src/index_config/serialize.rs @@ -128,11 +128,11 @@ impl IndexConfigForSerialization { const MAX_SPLIT_KEY_PREFIX_LEN: u8 = 16; ensure!( - self.split_key_prefix_len <= MAX_SPLIT_KEY_PREFIX_LEN, + self.indexing_settings.split_key_prefix_len <= MAX_SPLIT_KEY_PREFIX_LEN, "`split_key_prefix_len` must be at most {} (the length of the ULID random portion), \ but {} was provided", MAX_SPLIT_KEY_PREFIX_LEN, - self.split_key_prefix_len, + self.indexing_settings.split_key_prefix_len, ); let index_config = IndexConfig { @@ -143,7 +143,6 @@ impl IndexConfigForSerialization { ingest_settings: self.ingest_settings, search_settings: self.search_settings, retention_policy_opt: self.retention_policy_opt, - split_key_prefix_len: self.split_key_prefix_len, }; validate_index_config( &index_config.doc_mapping, @@ -194,8 +193,6 @@ pub struct IndexConfigV0_8 { #[serde(rename = "retention")] #[serde(default)] pub retention_policy_opt: Option, - #[serde(default, skip_serializing_if = "super::is_zero_u8")] - pub split_key_prefix_len: u8, } impl From for IndexConfigV0_8 { @@ -208,7 +205,6 @@ impl From for IndexConfigV0_8 { ingest_settings: index_config.ingest_settings, search_settings: index_config.search_settings, retention_policy_opt: index_config.retention_policy_opt, - split_key_prefix_len: index_config.split_key_prefix_len, } } } @@ -575,7 +571,7 @@ mod test { #[test] fn test_validate_split_key_prefix_len_too_long() { let mut invalid_index_config = minimal_index_config_for_serialization(); - invalid_index_config.split_key_prefix_len = 17; + invalid_index_config.indexing_settings.split_key_prefix_len = 17; let validation_err = invalid_index_config .build_and_validate(None) .unwrap_err() @@ -589,7 +585,7 @@ mod test { #[test] fn test_validate_split_key_prefix_len_max_valid() { let mut index_config = minimal_index_config_for_serialization(); - index_config.split_key_prefix_len = 16; + index_config.indexing_settings.split_key_prefix_len = 16; index_config .build_and_validate(Some(&Uri::for_test("s3://quickwit-indexes"))) .expect("prefix_len of 16 should be valid"); diff --git a/quickwit/quickwit-config/src/index_template/mod.rs b/quickwit/quickwit-config/src/index_template/mod.rs index 1186f91153c..367b5a4cd3f 100644 --- a/quickwit/quickwit-config/src/index_template/mod.rs +++ b/quickwit/quickwit-config/src/index_template/mod.rs @@ -29,9 +29,6 @@ use crate::{ pub type IndexTemplateId = String; pub type IndexIdPattern = String; -fn is_zero_u8(v: &u8) -> bool { - *v == 0 -} #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(into = "VersionedIndexTemplate")] @@ -55,8 +52,6 @@ pub struct IndexTemplate { #[serde(rename = "retention")] #[serde(default)] pub retention_policy_opt: Option, - #[serde(default, skip_serializing_if = "is_zero_u8")] - pub split_key_prefix_len: u8, } impl IndexTemplate { @@ -83,7 +78,6 @@ impl IndexTemplate { ingest_settings: self.ingest_settings.clone(), search_settings: self.search_settings.clone(), retention_policy_opt: self.retention_policy_opt.clone(), - split_key_prefix_len: self.split_key_prefix_len, }; Ok(index_config) } @@ -141,7 +135,6 @@ impl IndexTemplate { ingest_settings: IngestSettings::default(), search_settings: SearchSettings::default(), retention_policy_opt: None, - split_key_prefix_len: 0, } } } @@ -186,7 +179,6 @@ impl crate::TestableForRegression for IndexTemplate { retention_period: "42 days".to_string(), evaluation_schedule: "daily".to_string(), }), - split_key_prefix_len: 0, } } diff --git a/quickwit/quickwit-config/src/index_template/serialize.rs b/quickwit/quickwit-config/src/index_template/serialize.rs index c8953987e01..087d47b7856 100644 --- a/quickwit/quickwit-config/src/index_template/serialize.rs +++ b/quickwit/quickwit-config/src/index_template/serialize.rs @@ -55,8 +55,6 @@ pub struct IndexTemplateV0_8 { pub search_settings: SearchSettings, #[serde(default)] pub retention: Option, - #[serde(default, skip_serializing_if = "super::is_zero_u8")] - pub split_key_prefix_len: u8, } impl From for IndexTemplate { @@ -86,7 +84,6 @@ impl From for IndexTemplate { ingest_settings: index_template_v0_8.ingest_settings, search_settings: index_template_v0_8.search_settings, retention_policy_opt: index_template_v0_8.retention, - split_key_prefix_len: index_template_v0_8.split_key_prefix_len, } } } @@ -104,7 +101,6 @@ impl From for IndexTemplateV0_8 { ingest_settings: index_template.ingest_settings, search_settings: index_template.search_settings, retention: index_template.retention_policy_opt, - split_key_prefix_len: index_template.split_key_prefix_len, } } } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 536276d62aa..5150735e6b6 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -383,7 +383,7 @@ impl IndexingService { merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), max_concurrent_split_uploads: self.max_concurrent_split_uploads, event_broker: self.event_broker.clone(), - split_key_prefix_len: index_config.split_key_prefix_len, + split_key_prefix_len: index_config.indexing_settings.split_key_prefix_len, }; let merge_planner_mailbox = self.get_or_create_merge_pipeline(merge_pipeline_params, immature_splits_opt, ctx)?; @@ -403,7 +403,7 @@ impl IndexingService { split_store, max_concurrent_split_uploads_index, cooperative_indexing_permits: self.cooperative_indexing_permits.clone(), - split_key_prefix_len: index_config.split_key_prefix_len, + split_key_prefix_len: index_config.indexing_settings.split_key_prefix_len, merge_policy, retention_policy, max_concurrent_split_uploads_merge, diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index a266ee39433..8a141b30a29 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -182,7 +182,7 @@ impl DeleteTaskPipeline { SplitsUpdateMailbox::Publisher(publisher_mailbox), self.max_concurrent_split_uploads, self.event_broker.clone(), - index_config.split_key_prefix_len, + index_config.indexing_settings.split_key_prefix_len, ); let (uploader_mailbox, uploader_supervisor_handler) = ctx.spawn_actor().supervise(uploader); diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index c4c7e5beba7..06c436b58a8 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -1988,7 +1988,6 @@ mod tests { ingest_settings, search_settings, retention_policy_opt: None, - split_key_prefix_len: 0, }) } @@ -2163,7 +2162,6 @@ mod tests { indexing_settings, search_settings, retention_policy_opt: None, - split_key_prefix_len: 0, }) } From 7cb2d2ba44bc3be37aa0e19b2a79101cca1c0c1f Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 20 Jun 2026 09:05:10 +0200 Subject: [PATCH 3/3] Replace split_key_prefix_len config field with QW_SPLIT_KEY_PREFIX_LEN env var --- quickwit/quickwit-common/src/shared_consts.rs | 24 ++++++++++++++ .../quickwit-config/src/index_config/mod.rs | 17 ---------- .../src/index_config/serialize.rs | 31 ------------------- .../src/actors/indexing_pipeline.rs | 7 ----- .../src/actors/indexing_service.rs | 2 -- .../src/actors/merge_pipeline.rs | 3 -- .../quickwit-indexing/src/actors/uploader.rs | 11 +------ .../src/actors/delete_task_pipeline.rs | 1 - .../quickwit-metastore/src/split_metadata.rs | 4 +-- 9 files changed, 27 insertions(+), 73 deletions(-) diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index 95b3a531aaf..0f1453968d2 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -82,5 +82,29 @@ pub const DEFAULT_SHARD_BURST_LIMIT: ByteSize = ByteSize::mib(50); /// A compromise between "exponential" scale up and moderate shard count increase. pub const DEFAULT_SHARD_SCALE_UP_FACTOR: f32 = 1.5; +/// Returns the number of characters from the ULID random portion used as an S3 key prefix +/// directory to distribute split object keys across S3 partitions and avoid hotspots. +/// +/// Controlled by `QW_SPLIT_KEY_PREFIX_LEN` (default: 0, i.e. legacy flat scheme). +/// Maximum value is 16 (length of the ULID random portion). +/// Setting this to `2` creates 1024 buckets, which is sufficient for most workloads. +pub fn split_key_prefix_len() -> u8 { + const MAX_SPLIT_KEY_PREFIX_LEN: u8 = 16; + + static SPLIT_KEY_PREFIX_LEN: LazyLock = LazyLock::new(|| { + let value: u8 = crate::get_from_env("QW_SPLIT_KEY_PREFIX_LEN", 0u8, false); + if value > MAX_SPLIT_KEY_PREFIX_LEN { + warn!( + value, + max = MAX_SPLIT_KEY_PREFIX_LEN, + "QW_SPLIT_KEY_PREFIX_LEN exceeds maximum, clamping to {MAX_SPLIT_KEY_PREFIX_LEN}" + ); + return MAX_SPLIT_KEY_PREFIX_LEN; + } + value + }); + *SPLIT_KEY_PREFIX_LEN +} + // (Just a reexport). pub use bytesize::MIB; diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index fed80142e7f..7c3a5afa0b8 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -130,19 +130,6 @@ pub struct IndexingSettings { pub parquet_indexing: Option, #[serde(default)] pub resources: IndexingResources, - /// Number of characters from the ULID random portion (positions 10–25) used as an S3 key - /// prefix directory to distribute object keys across S3 partitions and avoid hotspots. - /// - /// - `0` (default): legacy flat scheme `{split_id}.split` - /// - `N > 0`: `{split_id[10..10+N]}/{split_id}.split`, giving 32^N distinct prefixes - /// - /// Setting this to `2` creates 1024 buckets, which is sufficient for most workloads. - /// Maximum value is 16 (length of the ULID random portion). - /// - /// See [S3 performance guidelines](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html) - /// for background on key prefix partitioning. - #[serde(default, skip_serializing_if = "is_zero_u8")] - pub split_key_prefix_len: u8, } /// Configuration for the Parquet indexing pipeline (metrics, sketches). @@ -273,7 +260,6 @@ impl Default for IndexingSettings { #[cfg(feature = "metrics")] parquet_indexing: None, resources: IndexingResources::default(), - split_key_prefix_len: 0, } } } @@ -394,9 +380,6 @@ fn prepend_at_char(schedule: &str) -> String { trimmed_schedule.to_string() } -fn is_zero_u8(v: &u8) -> bool { - *v == 0 -} #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(deny_unknown_fields)] diff --git a/quickwit/quickwit-config/src/index_config/serialize.rs b/quickwit/quickwit-config/src/index_config/serialize.rs index fdbf4eeb002..5ab5a265d4a 100644 --- a/quickwit/quickwit-config/src/index_config/serialize.rs +++ b/quickwit/quickwit-config/src/index_config/serialize.rs @@ -126,15 +126,6 @@ impl IndexConfigForSerialization { let index_uri = self.index_uri_or_fallback_to_default(default_index_root_uri)?; - const MAX_SPLIT_KEY_PREFIX_LEN: u8 = 16; - ensure!( - self.indexing_settings.split_key_prefix_len <= MAX_SPLIT_KEY_PREFIX_LEN, - "`split_key_prefix_len` must be at most {} (the length of the ULID random portion), \ - but {} was provided", - MAX_SPLIT_KEY_PREFIX_LEN, - self.indexing_settings.split_key_prefix_len, - ); - let index_config = IndexConfig { index_id: self.index_id, index_uri, @@ -568,26 +559,4 @@ mod test { .expect_err("field required for default search is absent"); } - #[test] - fn test_validate_split_key_prefix_len_too_long() { - let mut invalid_index_config = minimal_index_config_for_serialization(); - invalid_index_config.indexing_settings.split_key_prefix_len = 17; - let validation_err = invalid_index_config - .build_and_validate(None) - .unwrap_err() - .to_string(); - assert!( - validation_err.contains("split_key_prefix_len"), - "unexpected error: {validation_err}" - ); - } - - #[test] - fn test_validate_split_key_prefix_len_max_valid() { - let mut index_config = minimal_index_config_for_serialization(); - index_config.indexing_settings.split_key_prefix_len = 16; - index_config - .build_and_validate(Some(&Uri::for_test("s3://quickwit-indexes"))) - .expect("prefix_len of 16 should be valid"); - } } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 39b1902ee4d..9d6287245ac 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -330,7 +330,6 @@ impl IndexingPipeline { SplitsUpdateMailbox::Sequencer(sequencer_mailbox), self.params.max_concurrent_split_uploads_index, self.params.event_broker.clone(), - self.params.split_key_prefix_len, ); let (uploader_mailbox, uploader_handle) = ctx .spawn_actor() @@ -527,7 +526,6 @@ pub struct IndexingPipelineParams { pub split_store: IndexingSplitStore, pub max_concurrent_split_uploads_index: usize, pub cooperative_indexing_permits: Option>, - pub split_key_prefix_len: u8, // Merge-related parameters pub merge_policy: Arc, @@ -670,7 +668,6 @@ mod tests { max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, - split_key_prefix_len: 0, merge_planner_mailbox, event_broker: EventBroker::default(), params_fingerprint: 42u64, @@ -795,7 +792,6 @@ mod tests { max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, - split_key_prefix_len: 0, merge_planner_mailbox, event_broker: Default::default(), params_fingerprint: 42u64, @@ -875,7 +871,6 @@ mod tests { merge_io_throughput_limiter_opt: None, merge_scheduler_service: universe.get_or_spawn_one(), event_broker: Default::default(), - split_key_prefix_len: 0, }; let merge_pipeline = MergePipeline::new(merge_pipeline_params, None, universe.spawn_ctx()); let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone(); @@ -898,7 +893,6 @@ mod tests { max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, - split_key_prefix_len: 0, merge_planner_mailbox: merge_planner_mailbox.clone(), event_broker: Default::default(), params_fingerprint: 42u64, @@ -1027,7 +1021,6 @@ mod tests { max_concurrent_split_uploads_index: 4, max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, - split_key_prefix_len: 0, merge_planner_mailbox, params_fingerprint: 42u64, event_broker: Default::default(), diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 5150735e6b6..57197a8fc58 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -383,7 +383,6 @@ impl IndexingService { merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), max_concurrent_split_uploads: self.max_concurrent_split_uploads, event_broker: self.event_broker.clone(), - split_key_prefix_len: index_config.indexing_settings.split_key_prefix_len, }; let merge_planner_mailbox = self.get_or_create_merge_pipeline(merge_pipeline_params, immature_splits_opt, ctx)?; @@ -403,7 +402,6 @@ impl IndexingService { split_store, max_concurrent_split_uploads_index, cooperative_indexing_permits: self.cooperative_indexing_permits.clone(), - split_key_prefix_len: index_config.indexing_settings.split_key_prefix_len, merge_policy, retention_policy, max_concurrent_split_uploads_merge, diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 2bb30c1ba1b..53ece0a09fa 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -287,7 +287,6 @@ impl MergePipeline { merge_publisher_mailbox.into(), self.params.max_concurrent_split_uploads, self.params.event_broker.clone(), - self.params.split_key_prefix_len, ); let (merge_uploader_mailbox, merge_uploader_handle) = ctx .spawn_actor() @@ -564,7 +563,6 @@ pub struct MergePipelineParams { pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline. pub merge_io_throughput_limiter_opt: Option, pub event_broker: EventBroker, - pub split_key_prefix_len: u8, } #[cfg(test)] @@ -629,7 +627,6 @@ mod tests { max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, event_broker: Default::default(), - split_key_prefix_len: 0, }; let pipeline = MergePipeline::new(pipeline_params, None, universe.spawn_ctx()); let _merge_planner_mailbox = pipeline.merge_planner_mailbox().clone(); diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index 7b8f62b52c6..62151b8ed9a 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -169,7 +169,6 @@ pub struct Uploader { max_concurrent_split_uploads: usize, counters: UploaderCounters, event_broker: EventBroker, - split_key_prefix_len: u8, } impl Uploader { @@ -183,7 +182,6 @@ impl Uploader { split_update_mailbox: SplitsUpdateMailbox, max_concurrent_split_uploads: usize, event_broker: EventBroker, - split_key_prefix_len: u8, ) -> Uploader { Uploader { uploader_type, @@ -195,7 +193,6 @@ impl Uploader { max_concurrent_split_uploads, counters: Default::default(), event_broker, - split_key_prefix_len, } } async fn acquire_semaphore( @@ -300,7 +297,6 @@ impl Handler for Uploader { let ctx_clone = ctx.clone(); let merge_policy = self.merge_policy.clone(); let retention_policy = self.retention_policy.clone(); - let split_key_prefix_len = self.split_key_prefix_len; debug!(split_ids=?split_ids, "start-stage-and-store-splits"); let event_broker = self.event_broker.clone(); spawn_named_task( @@ -340,7 +336,7 @@ impl Handler for Uploader { ); split_metadata.prefix = quickwit_common::compute_split_key_prefix( split_metadata.split_id(), - split_key_prefix_len, + quickwit_common::shared_consts::split_key_prefix_len(), ); report_splits.push(ReportSplit { @@ -572,7 +568,6 @@ mod tests { SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, event_broker, - 0, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory = TempDirectory::for_test(); @@ -689,7 +684,6 @@ mod tests { SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, EventBroker::default(), - 0, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory_1 = TempDirectory::for_test(); @@ -838,7 +832,6 @@ mod tests { SplitsUpdateMailbox::Publisher(publisher_mailbox), 4, EventBroker::default(), - 0, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory = TempDirectory::for_test(); @@ -913,7 +906,6 @@ mod tests { SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, EventBroker::default(), - 0, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let checkpoint_delta = IndexCheckpointDelta { @@ -1019,7 +1011,6 @@ mod tests { SplitsUpdateMailbox::Publisher(publisher_mailbox), 4, event_broker, - 0, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory = TempDirectory::for_test(); diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 8a141b30a29..861c0bde62f 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -182,7 +182,6 @@ impl DeleteTaskPipeline { SplitsUpdateMailbox::Publisher(publisher_mailbox), self.max_concurrent_split_uploads, self.event_broker.clone(), - index_config.indexing_settings.split_key_prefix_len, ); let (uploader_mailbox, uploader_supervisor_handler) = ctx.spawn_actor().supervise(uploader); diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index 8733c961ba6..d9723a838f6 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -139,8 +139,8 @@ pub struct SplitMetadata { /// URI. A non-empty prefix (with no trailing `/`) means the split is stored at /// `{prefix}/{split_id}.split`. /// - /// This value is set once at split creation time from `IndexConfig.split_key_prefix_len` - /// and never changed afterwards. + /// This value is set once at split creation time from the `QW_SPLIT_KEY_PREFIX_LEN` + /// environment variable and never changed afterwards. #[serde(default)] pub prefix: String, }