Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/tutorials/hdfs-logs/index-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ doc_mapping:

search_settings:
default_search_fields: [severity_text, body]

# Uncomment to distribute S3 object keys across 1024 prefixes and reduce rate limiting
# on high-throughput indexes where recent splits would otherwise cluster on the same
# S3 partition. Affects only newly created splits; existing splits continue to use
# the legacy flat naming scheme.
# split_key_prefix_len: 2
78 changes: 78 additions & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,84 @@ pub fn split_file(split_id: impl Display) -> String {
format!("{split_id}.split")
}

/// Computes the S3 key prefix for a split from its ULID string and the configured length.
///
/// Extracts `prefix_len` characters starting at position 10 of the ULID (the first characters
/// of the random portion, after the 10-character timestamp). Returns an empty string when
/// `prefix_len` is 0 (legacy flat scheme).
///
/// The returned prefix does NOT contain the trailing `/` separator; that separator is added by
/// [`split_storage_path`] when building the full storage path.
///
/// If `split_id` is too short to extract the requested prefix (hidden contract: must be a
/// valid 26-character ULID and `prefix_len <= 16`), logs a rate-limited warning and returns
/// an empty string, falling back to the legacy flat scheme.
pub fn compute_split_key_prefix(split_id: &str, prefix_len: u8) -> String {
if prefix_len == 0 {
return String::new();
}
let end = 10 + prefix_len as usize;
if split_id.len() < end {
crate::rate_limited_warn!(
limit_per_min = 1,
split_id = split_id,
prefix_len = prefix_len,
"split ID is too short to extract prefix; falling back to flat storage path"
);
return String::new();
}
split_id[10..end].to_string()
}

/// Returns the storage path for a split given its ID and key prefix.
///
/// - Empty `prefix`: legacy flat scheme `{split_id}.split`
/// - Non-empty `prefix`: `{prefix}/{split_id}.split` — the `/` separator is inserted here, so the
/// `prefix` itself must NOT contain a trailing `/` (as produced by [`compute_split_key_prefix`])
///
/// The prefix is typically computed once at split creation time via
/// [`compute_split_key_prefix`] and stored in `SplitMetadata`.
pub fn split_storage_path(split_id: &str, prefix: &str) -> String {
if prefix.is_empty() {
return format!("{split_id}.split");
}
format!("{prefix}/{split_id}.split")
}

#[cfg(test)]
mod split_path_tests {
use super::*;

const SAMPLE_ULID: &str = "01ARZ3NDEKTSV4RRFFQ69G5FAV";

#[test]
fn test_compute_split_key_prefix_zero() {
assert_eq!(compute_split_key_prefix(SAMPLE_ULID, 0), "");
}

#[test]
fn test_compute_split_key_prefix_four() {
// Chars 10–13 of the ULID are the first 4 chars of the random portion.
assert_eq!(compute_split_key_prefix(SAMPLE_ULID, 4), "TSV4");
}

#[test]
fn test_split_storage_path_empty_prefix() {
assert_eq!(
split_storage_path(SAMPLE_ULID, ""),
"01ARZ3NDEKTSV4RRFFQ69G5FAV.split"
);
}

#[test]
fn test_split_storage_path_with_prefix() {
assert_eq!(
split_storage_path(SAMPLE_ULID, "TS"),
"TS/01ARZ3NDEKTSV4RRFFQ69G5FAV.split"
);
}
}

fn get_from_env_opt_aux<T: Debug>(
key: &str,
parse_fn: impl FnOnce(&str) -> Option<T>,
Expand Down
19 changes: 19 additions & 0 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ fn prepend_at_char(schedule: &str) -> String {
trimmed_schedule.to_string()
}

fn is_zero_u8(v: &u8) -> bool {
*v == 0
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(deny_unknown_fields)]
#[serde(into = "VersionedIndexConfig")]
Expand All @@ -392,6 +396,19 @@ pub struct IndexConfig {
pub ingest_settings: IngestSettings,
pub search_settings: SearchSettings,
pub retention_policy_opt: Option<RetentionPolicy>,
/// Number of characters from the ULID random portion (positions 10–25) used as an S3 key
/// prefix directory to distribute object keys across S3 partitions and avoid hotspots.
///
/// - `0` (default): legacy flat scheme `{split_id}.split`
/// - `N > 0`: `{split_id[10..10+N]}/{split_id}.split`, giving 32^N distinct prefixes
///
/// Setting this to `2` creates 1024 buckets, which is sufficient for most workloads.
/// Maximum value is 16 (length of the ULID random portion).
///
/// See [S3 performance guidelines](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html)
/// for background on key prefix partitioning.
#[serde(default, skip_serializing_if = "is_zero_u8")]
pub split_key_prefix_len: u8,
}

impl IndexConfig {
Expand Down Expand Up @@ -501,6 +518,7 @@ impl IndexConfig {
ingest_settings: IngestSettings::default(),
search_settings,
retention_policy_opt: None,
split_key_prefix_len: 0,
}
}
}
Expand Down Expand Up @@ -611,6 +629,7 @@ impl crate::TestableForRegression for IndexConfig {
ingest_settings,
search_settings,
retention_policy_opt,
split_key_prefix_len: 0,
}
}

Expand Down
36 changes: 36 additions & 0 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ impl IndexConfigForSerialization {

let index_uri = self.index_uri_or_fallback_to_default(default_index_root_uri)?;

const MAX_SPLIT_KEY_PREFIX_LEN: u8 = 16;
ensure!(
self.split_key_prefix_len <= MAX_SPLIT_KEY_PREFIX_LEN,
"`split_key_prefix_len` must be at most {} (the length of the ULID random portion), \
but {} was provided",
MAX_SPLIT_KEY_PREFIX_LEN,
self.split_key_prefix_len,
);

let index_config = IndexConfig {
index_id: self.index_id,
index_uri,
Expand All @@ -134,6 +143,7 @@ impl IndexConfigForSerialization {
ingest_settings: self.ingest_settings,
search_settings: self.search_settings,
retention_policy_opt: self.retention_policy_opt,
split_key_prefix_len: self.split_key_prefix_len,
};
validate_index_config(
&index_config.doc_mapping,
Expand Down Expand Up @@ -184,6 +194,8 @@ pub struct IndexConfigV0_8 {
#[serde(rename = "retention")]
#[serde(default)]
pub retention_policy_opt: Option<RetentionPolicy>,
#[serde(default, skip_serializing_if = "super::is_zero_u8")]
pub split_key_prefix_len: u8,
}

impl From<IndexConfig> for IndexConfigV0_8 {
Expand All @@ -196,6 +208,7 @@ impl From<IndexConfig> for IndexConfigV0_8 {
ingest_settings: index_config.ingest_settings,
search_settings: index_config.search_settings,
retention_policy_opt: index_config.retention_policy_opt,
split_key_prefix_len: index_config.split_key_prefix_len,
}
}
}
Expand Down Expand Up @@ -558,4 +571,27 @@ mod test {
)
.expect_err("field required for default search is absent");
}

#[test]
fn test_validate_split_key_prefix_len_too_long() {
let mut invalid_index_config = minimal_index_config_for_serialization();
invalid_index_config.split_key_prefix_len = 17;
let validation_err = invalid_index_config
.build_and_validate(None)
.unwrap_err()
.to_string();
assert!(
validation_err.contains("split_key_prefix_len"),
"unexpected error: {validation_err}"
);
}

#[test]
fn test_validate_split_key_prefix_len_max_valid() {
let mut index_config = minimal_index_config_for_serialization();
index_config.split_key_prefix_len = 16;
index_config
.build_and_validate(Some(&Uri::for_test("s3://quickwit-indexes")))
.expect("prefix_len of 16 should be valid");
}
}
9 changes: 9 additions & 0 deletions quickwit/quickwit-config/src/index_template/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ use crate::{
pub type IndexTemplateId = String;
pub type IndexIdPattern = String;

fn is_zero_u8(v: &u8) -> bool {
*v == 0
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(into = "VersionedIndexTemplate")]
#[serde(from = "VersionedIndexTemplate")]
Expand All @@ -51,6 +55,8 @@ pub struct IndexTemplate {
#[serde(rename = "retention")]
#[serde(default)]
pub retention_policy_opt: Option<RetentionPolicy>,
#[serde(default, skip_serializing_if = "is_zero_u8")]
pub split_key_prefix_len: u8,
}

impl IndexTemplate {
Expand All @@ -77,6 +83,7 @@ impl IndexTemplate {
ingest_settings: self.ingest_settings.clone(),
search_settings: self.search_settings.clone(),
retention_policy_opt: self.retention_policy_opt.clone(),
split_key_prefix_len: self.split_key_prefix_len,
};
Ok(index_config)
}
Expand Down Expand Up @@ -134,6 +141,7 @@ impl IndexTemplate {
ingest_settings: IngestSettings::default(),
search_settings: SearchSettings::default(),
retention_policy_opt: None,
split_key_prefix_len: 0,
}
}
}
Expand Down Expand Up @@ -178,6 +186,7 @@ impl crate::TestableForRegression for IndexTemplate {
retention_period: "42 days".to_string(),
evaluation_schedule: "daily".to_string(),
}),
split_key_prefix_len: 0,
}
}

Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-config/src/index_template/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct IndexTemplateV0_8 {
pub search_settings: SearchSettings,
#[serde(default)]
pub retention: Option<RetentionPolicy>,
#[serde(default, skip_serializing_if = "super::is_zero_u8")]
pub split_key_prefix_len: u8,
}

impl From<VersionedIndexTemplate> for IndexTemplate {
Expand Down Expand Up @@ -84,6 +86,7 @@ impl From<IndexTemplateV0_8> for IndexTemplate {
ingest_settings: index_template_v0_8.ingest_settings,
search_settings: index_template_v0_8.search_settings,
retention_policy_opt: index_template_v0_8.retention,
split_key_prefix_len: index_template_v0_8.split_key_prefix_len,
}
}
}
Expand All @@ -101,6 +104,7 @@ impl From<IndexTemplate> for IndexTemplateV0_8 {
ingest_settings: index_template.ingest_settings,
search_settings: index_template.search_settings,
retention: index_template.retention_policy_opt,
split_key_prefix_len: index_template.split_key_prefix_len,
}
}
}
7 changes: 7 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ impl IndexingPipeline {
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
self.params.max_concurrent_split_uploads_index,
self.params.event_broker.clone(),
self.params.split_key_prefix_len,
);
let (uploader_mailbox, uploader_handle) = ctx
.spawn_actor()
Expand Down Expand Up @@ -526,6 +527,7 @@ pub struct IndexingPipelineParams {
pub split_store: IndexingSplitStore,
pub max_concurrent_split_uploads_index: usize,
pub cooperative_indexing_permits: Option<Arc<Semaphore>>,
pub split_key_prefix_len: u8,

// Merge-related parameters
pub merge_policy: Arc<dyn MergePolicy>,
Expand Down Expand Up @@ -668,6 +670,7 @@ mod tests {
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
split_key_prefix_len: 0,
merge_planner_mailbox,
event_broker: EventBroker::default(),
params_fingerprint: 42u64,
Expand Down Expand Up @@ -792,6 +795,7 @@ mod tests {
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
split_key_prefix_len: 0,
merge_planner_mailbox,
event_broker: Default::default(),
params_fingerprint: 42u64,
Expand Down Expand Up @@ -871,6 +875,7 @@ mod tests {
merge_io_throughput_limiter_opt: None,
merge_scheduler_service: universe.get_or_spawn_one(),
event_broker: Default::default(),
split_key_prefix_len: 0,
};
let merge_pipeline = MergePipeline::new(merge_pipeline_params, None, universe.spawn_ctx());
let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone();
Expand All @@ -893,6 +898,7 @@ mod tests {
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
split_key_prefix_len: 0,
merge_planner_mailbox: merge_planner_mailbox.clone(),
event_broker: Default::default(),
params_fingerprint: 42u64,
Expand Down Expand Up @@ -1021,6 +1027,7 @@ mod tests {
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
split_key_prefix_len: 0,
merge_planner_mailbox,
params_fingerprint: 42u64,
event_broker: Default::default(),
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ impl IndexingService {
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
split_key_prefix_len: index_config.split_key_prefix_len,
};
let merge_planner_mailbox =
self.get_or_create_merge_pipeline(merge_pipeline_params, immature_splits_opt, ctx)?;
Expand All @@ -402,6 +403,7 @@ impl IndexingService {
split_store,
max_concurrent_split_uploads_index,
cooperative_indexing_permits: self.cooperative_indexing_permits.clone(),
split_key_prefix_len: index_config.split_key_prefix_len,
merge_policy,
retention_policy,
max_concurrent_split_uploads_merge,
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ impl MergePipeline {
merge_publisher_mailbox.into(),
self.params.max_concurrent_split_uploads,
self.params.event_broker.clone(),
self.params.split_key_prefix_len,
);
let (merge_uploader_mailbox, merge_uploader_handle) = ctx
.spawn_actor()
Expand Down Expand Up @@ -563,6 +564,7 @@ pub struct MergePipelineParams {
pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline.
pub merge_io_throughput_limiter_opt: Option<Limiter>,
pub event_broker: EventBroker,
pub split_key_prefix_len: u8,
}

#[cfg(test)]
Expand Down Expand Up @@ -627,6 +629,7 @@ mod tests {
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
event_broker: Default::default(),
split_key_prefix_len: 0,
};
let pipeline = MergePipeline::new(pipeline_params, None, universe.spawn_ctx());
let _merge_planner_mailbox = pipeline.merge_planner_mailbox().clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ impl MergeSplitDownloader {
let _protect_guard = ctx.protect_zone();
let tantivy_dir = self
.split_store
.fetch_and_open_split(split.split_id(), download_directory, &io_controls)
.fetch_and_open_split(
split.split_id(),
&split.prefix,
download_directory,
&io_controls,
)
.await
.map_err(|error| {
let split_id = split.split_id();
Expand Down
Loading
Loading