From 9ca88af867250131f78ae801abb1cfcb1260cd1d Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 4 Mar 2026 11:13:57 -0500 Subject: [PATCH 1/4] Remove old routing table; Take both disk and memory WAL readings --- .../broadcast/ingester_capacity_score.rs | 9 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 13 +- quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 4 +- .../src/ingest_v2/routing_table.rs | 1121 ----------------- .../quickwit-ingest/src/ingest_v2/state.rs | 18 +- ..._timeseries.rs => wal_capacity_tracker.rs} | 75 +- 6 files changed, 82 insertions(+), 1158 deletions(-) delete mode 100644 quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs rename quickwit/quickwit-ingest/src/ingest_v2/{wal_capacity_timeseries.rs => wal_capacity_tracker.rs} (76%) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs index 482f5f58886..d9f456b7201 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs @@ -69,7 +69,10 @@ impl BroadcastIngesterCapacityScoreTask { .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; let usage = guard.mrecordlog.resource_usage(); let disk_used = ByteSize::b(usage.disk_used_bytes as u64); - let capacity_score = guard.wal_capacity_time_series.record_and_score(disk_used); + let memory_used = ByteSize::b(usage.memory_used_bytes as u64); + let capacity_score = guard + .wal_capacity_tracker + .record_and_score(disk_used, memory_used); let (open_shard_counts, _) = guard.get_shard_snapshot(); Ok(Some((capacity_score, open_shard_counts))) @@ -218,8 +221,8 @@ mod tests { state_guard.shards.insert(shard.queue_id(), shard); let (open_shard_counts, _) = state_guard.get_shard_snapshot(); let capacity_score = state_guard - .wal_capacity_time_series - .record_and_score(ByteSize::b(500)); + .wal_capacity_tracker + .record_and_score(ByteSize::b(500), ByteSize::b(0)); drop(state_guard); assert_eq!(capacity_score, 6); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index df51758a4ca..b77cefa7f38 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -131,7 +131,12 @@ impl Ingester { idle_shard_timeout: Duration, ) -> IngestV2Result { let self_node_id: NodeId = cluster.self_node_id().into(); - let state = IngesterState::load(wal_dir_path, disk_capacity, rate_limiter_settings); + let state = IngesterState::load( + wal_dir_path, + disk_capacity, + memory_capacity, + rate_limiter_settings, + ); let weak_state = state.weak(); BroadcastLocalShardsTask::spawn(cluster.clone(), weak_state.clone()); @@ -784,10 +789,12 @@ impl Ingester { } let wal_usage = state_guard.mrecordlog.resource_usage(); let disk_used = wal_usage.disk_used_bytes as u64; + let memory_used = wal_usage.memory_used_bytes as u64; let (open_shard_counts, closed_shards) = state_guard.get_shard_snapshot(); let capacity_score = state_guard - .wal_capacity_time_series - .score(ByteSize::b(disk_used)) as u32; + .wal_capacity_tracker + .score(ByteSize::b(disk_used), ByteSize::b(memory_used)) + as u32; drop(state_guard); if disk_used >= self.disk_capacity.as_u64() * 90 / 100 { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index d5432936e58..4003f8d1d0e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -27,10 +27,8 @@ mod publish_tracker; mod rate_meter; mod replication; mod router; -#[allow(dead_code)] -mod routing_table; mod state; -mod wal_capacity_timeseries; +mod wal_capacity_tracker; mod workbench; use std::collections::HashMap; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs deleted file mode 100644 index 4b4150d6e98..00000000000 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ /dev/null @@ -1,1121 +0,0 @@ -// Copyright 2021-Present Datadog, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use quickwit_proto::ingest::{Shard, ShardIds, ShardState}; -use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId}; -use serde_json::{Value as JsonValue, json}; -use tracing::{info, warn}; - -use crate::IngesterPool; - -#[derive(Debug)] -pub(super) struct RoutingEntry { - pub index_uid: IndexUid, - pub source_id: SourceId, - pub shard_id: ShardId, - pub shard_state: ShardState, - pub leader_id: NodeId, -} - -impl From for RoutingEntry { - fn from(shard: Shard) -> Self { - let shard_id = shard.shard_id().clone(); - let shard_state = shard.shard_state(); - Self { - index_uid: shard.index_uid().clone(), - source_id: shard.source_id, - shard_id, - shard_state, - leader_id: shard.leader_id.into(), - } - } -} - -/// The set of shards the router is aware of for the given index and source. -#[derive(Debug, Default)] -pub(super) struct RoutingTableEntry { - /// Index UID of the shards. - pub index_uid: IndexUid, - /// Source ID of the shards. - pub source_id: SourceId, - /// Shards located on this node. - pub local_shards: Vec, - pub local_round_robin_idx: AtomicUsize, - /// Shards located on remote nodes. - pub remote_shards: Vec, - pub remote_round_robin_idx: AtomicUsize, -} - -impl RoutingTableEntry { - /// Creates a new entry and ensures that the shards are open, unique, and sorted by shard ID. - fn new( - self_node_id: &NodeId, - index_uid: IndexUid, - source_id: SourceId, - mut shards: Vec, - ) -> Self { - let num_shards = shards.len(); - - shards.sort_unstable_by(|left, right| left.shard_id.cmp(&right.shard_id)); - shards.dedup_by(|left, right| left.shard_id == right.shard_id); - - let (local_shards, remote_shards): (Vec<_>, Vec<_>) = shards - .into_iter() - .filter(|shard| shard.is_open()) - .map(RoutingEntry::from) - .partition(|shard| *self_node_id == shard.leader_id); - - if num_shards > local_shards.len() + remote_shards.len() { - warn!("input shards should not contain closed shards or duplicates"); - } - - Self { - index_uid, - source_id, - local_shards, - remote_shards, - ..Default::default() - } - } - - fn empty(index_uid: IndexUid, source_id: SourceId) -> Self { - Self { - index_uid, - source_id, - ..Default::default() - } - } - - /// Returns `true` if at least one shard in the table entry is open and has a leader available. - /// As it goes through the list of shards in the entry, it populates `closed_shard_ids` and - /// `unavailable_leaders` with the shard IDs of the closed shards and the node ID of the - /// unavailable ingesters encountered along the way. - pub fn has_open_shards( - &self, - ingester_pool: &IngesterPool, - closed_shard_ids: &mut Vec, - unavailable_leaders: &mut HashSet, - ) -> bool { - let shards = self.local_shards.iter().chain(self.remote_shards.iter()); - - for shard in shards { - match shard.shard_state { - ShardState::Closed => { - closed_shard_ids.push(shard.shard_id.clone()); - continue; - } - ShardState::Unavailable | ShardState::Unspecified => { - continue; - } - ShardState::Open => { - if unavailable_leaders.contains(&shard.leader_id) { - continue; - } - if ingester_pool.contains_key(&shard.leader_id) { - return true; - } else { - let leader_id: NodeId = shard.leader_id.clone(); - unavailable_leaders.insert(leader_id); - } - } - } - } - false - } - - /// Returns the next open and available shard in the table entry in a round-robin fashion. - pub fn next_open_shard_round_robin( - &self, - ingester_pool: &IngesterPool, - rate_limited_shards: &HashSet, - ) -> Result<&RoutingEntry, NextOpenShardError> { - let mut error = NextOpenShardError::NoShardsAvailable; - - for (shards, round_robin_idx) in [ - (&self.local_shards, &self.local_round_robin_idx), - (&self.remote_shards, &self.remote_round_robin_idx), - ] { - if shards.is_empty() { - continue; - } - for _attempt in 0..shards.len() { - let shard_idx = round_robin_idx.fetch_add(1, Ordering::Relaxed); - let shard_routing_entry: &RoutingEntry = &shards[shard_idx % shards.len()]; - - if !shard_routing_entry.shard_state.is_open() { - continue; - } - if rate_limited_shards.contains(&shard_routing_entry.shard_id) { - error = NextOpenShardError::RateLimited; - continue; - } - if ingester_pool.contains_key(&shard_routing_entry.leader_id) { - return Ok(shard_routing_entry); - } - } - } - Err(error) - } - - /// Inserts the open shards the routing table is not aware of. - fn insert_open_shards( - &mut self, - self_node_id: &NodeId, - leader_id: &NodeId, - index_uid: &IndexUid, - shard_ids: &[ShardId], - ) { - match self.index_uid.cmp(index_uid) { - // If we receive an update for a new incarnation of the index, then we clear the entry - // and insert all the shards. - std::cmp::Ordering::Less => { - self.index_uid = index_uid.clone(); - self.clear_shards(); - } - // If we receive an update for a previous incarnation of the index, then we ignore it. - std::cmp::Ordering::Greater => { - return; - } - std::cmp::Ordering::Equal => {} - }; - let target_shards = if self_node_id == leader_id { - &mut self.local_shards - } else { - &mut self.remote_shards - }; - let mut num_inserted_shards = 0; - let num_target_shards = target_shards.len(); - - if num_target_shards == 0 { - target_shards.reserve(num_target_shards); - target_shards.extend(shard_ids.iter().map(|shard_id| RoutingEntry { - index_uid: self.index_uid.clone(), - source_id: self.source_id.clone(), - shard_id: shard_id.clone(), - shard_state: ShardState::Open, - leader_id: leader_id.clone(), - })); - num_inserted_shards = target_shards.len(); - } else { - let shard_ids_range = target_shards[0].shard_id.clone() - ..=target_shards[num_target_shards - 1].shard_id.clone(); - - for shard_id in shard_ids { - // If we can't find the shard, then we insert it. - if shard_ids_range.contains(shard_id) { - continue; - } - if target_shards[..num_target_shards] - .binary_search_by(|shard| shard.shard_id.cmp(shard_id)) - .is_err() - { - target_shards.push(RoutingEntry { - index_uid: self.index_uid.clone(), - source_id: self.source_id.clone(), - shard_id: shard_id.clone(), - shard_state: ShardState::Open, - leader_id: leader_id.clone(), - }); - num_inserted_shards += 1; - } - } - } - if num_inserted_shards > 0 { - target_shards.sort_unstable_by(|left, right| left.shard_id.cmp(&right.shard_id)); - - info!( - index_uid=%self.index_uid, - source_id=%self.source_id, - "inserted {num_inserted_shards} shards into routing table" - ); - } - } - - /// Clears local and remote shards. - fn clear_shards(&mut self) { - self.local_shards.clear(); - self.local_round_robin_idx = AtomicUsize::default(); - self.remote_shards.clear(); - self.remote_round_robin_idx = AtomicUsize::default(); - } - - /// Closes the shards identified by their shard IDs. - fn close_shards(&mut self, index_uid: &IndexUid, shard_ids: &[ShardId]) { - // If the shard table was just recently updated with shards for a new index UID, then we can - // safely discard this request. - if self.index_uid != *index_uid { - return; - } - for shards in [&mut self.local_shards, &mut self.remote_shards] { - if shards.is_empty() { - continue; - } - let num_shards = shards.len(); - let shard_ids_range = - shards[0].shard_id.clone()..=shards[num_shards - 1].shard_id.clone(); - - for shard_id in shard_ids { - if !shard_ids_range.contains(shard_id) { - continue; - } - if let Ok(shard_idx) = shards.binary_search_by(|shard| shard.shard_id.cmp(shard_id)) - { - shards[shard_idx].shard_state = ShardState::Closed; - } - } - } - } - - /// Shards the shards identified by their shard IDs. - fn delete_shards(&mut self, index_uid: &IndexUid, shard_ids: &[ShardId]) { - // If the shard table was just recently updated with shards for a new index UID, then we can - // safely discard this request. - if self.index_uid != *index_uid { - return; - } - for shards in [&mut self.local_shards, &mut self.remote_shards] { - if shards.is_empty() { - continue; - } - let num_shards = shards.len(); - let shard_ids_range = - shards[0].shard_id.clone()..=shards[num_shards - 1].shard_id.clone(); - let mut deleted_any = false; - - for shard_id in shard_ids { - if !shard_ids_range.contains(shard_id) { - continue; - } - if let Ok(shard_idx) = shards.binary_search_by(|shard| shard.shard_id.cmp(shard_id)) - { - // We use `Unspecified` as a tombstone. - shards[shard_idx].shard_state = ShardState::Unspecified; - deleted_any = true; - } - } - if deleted_any { - shards.retain(|shard| shard.shard_state != ShardState::Unspecified); - } - } - } - - #[cfg(test)] - pub fn len(&self) -> usize { - self.local_shards.len() + self.remote_shards.len() - } - - #[cfg(test)] - pub fn all_shards(&self) -> Vec<&RoutingEntry> { - let mut shards = Vec::with_capacity(self.len()); - shards.extend(&self.local_shards); - shards.extend(&self.remote_shards); - shards - } -} - -#[derive(Debug, PartialEq, Eq)] -pub(super) enum NextOpenShardError { - NoShardsAvailable, - RateLimited, -} - -/// Stores the list of shards the router is aware of for each index and source. The resolution from -/// index and source to shards is performed using index ID (not index UID) and source ID. -#[derive(Debug)] -pub(super) struct RoutingTable { - pub self_node_id: NodeId, - pub table: HashMap<(IndexId, SourceId), RoutingTableEntry>, -} - -impl RoutingTable { - pub fn find_entry( - &self, - index_id: impl Into, - source_id: impl Into, - ) -> Option<&RoutingTableEntry> { - let key = (index_id.into(), source_id.into()); - self.table.get(&key) - } - - /// Returns `true` if the router already knows about a shard for a given source that has - /// an available `leader`. - /// - /// If this function returns false, it populates the set of unavailable leaders and closed - /// shards. These will be joined to the GetOrCreate shard request emitted to the control - /// plane. - pub fn has_open_shards( - &self, - index_id: impl Into, - source_id: impl Into, - ingester_pool: &IngesterPool, - closed_shards: &mut Vec, - unavailable_leaders: &mut HashSet, - ) -> bool { - let Some(entry) = self.find_entry(index_id, source_id) else { - return false; - }; - let mut closed_shard_ids: Vec = Vec::new(); - - let result = - entry.has_open_shards(ingester_pool, &mut closed_shard_ids, unavailable_leaders); - - if !closed_shard_ids.is_empty() { - closed_shards.push(ShardIds { - index_uid: entry.index_uid.clone().into(), - source_id: entry.source_id.clone(), - shard_ids: closed_shard_ids, - }); - } - result - } - - /// Replaces the routing table entry for the source with the provided shards. - pub fn replace_shards( - &mut self, - index_uid: IndexUid, - source_id: impl Into, - shards: Vec, - ) { - let index_id: IndexId = index_uid.index_id.to_string(); - let source_id: SourceId = source_id.into(); - let key = (index_id, source_id.clone()); - - match self.table.entry(key) { - Entry::Vacant(entry) => { - entry.insert(RoutingTableEntry::new( - &self.self_node_id, - index_uid, - source_id, - shards, - )); - } - Entry::Occupied(mut entry) => { - assert!( - entry.get().index_uid <= index_uid, - "new index incarnation should be greater or equal" - ); - - entry.insert(RoutingTableEntry::new( - &self.self_node_id, - index_uid, - source_id, - shards, - )); - } - }; - } - - /// Inserts the shards the routing table is not aware of. - pub fn insert_open_shards( - &mut self, - leader_id: &NodeId, - index_uid: IndexUid, - source_id: impl Into, - shard_ids: &[ShardId], - ) { - let index_id: IndexId = index_uid.index_id.to_string(); - let source_id: SourceId = source_id.into(); - let key = (index_id, source_id.clone()); - - self.table - .entry(key.clone()) - .or_insert_with(|| RoutingTableEntry::empty(index_uid.clone(), source_id)) - .insert_open_shards(&self.self_node_id, leader_id, &index_uid, shard_ids); - } - - /// Closes the targeted shards. - pub fn close_shards( - &mut self, - index_uid: &IndexUid, - source_id: impl Into, - shard_ids: &[ShardId], - ) { - let key = (index_uid.index_id.clone(), source_id.into()); - if let Some(entry) = self.table.get_mut(&key) { - entry.close_shards(index_uid, shard_ids); - } - } - - /// Deletes the targeted shards. - pub fn delete_shards( - &mut self, - index_uid: &IndexUid, - source_id: impl Into, - shard_ids: &[ShardId], - ) { - let key = (index_uid.index_id.clone(), source_id.into()); - if let Some(entry) = self.table.get_mut(&key) { - entry.delete_shards(index_uid, shard_ids); - } - } - - pub fn debug_info(&self) -> HashMap> { - let mut per_index_shards_json: HashMap> = HashMap::new(); - - for ((index_id, source_id), entry) in &self.table { - for (shards, is_local) in &[(&entry.local_shards, true), (&entry.remote_shards, false)] - { - let shards_json = shards.iter().map(|shard| { - json!({ - "index_uid": shard.index_uid, - "source_id": source_id, - "shard_id": shard.shard_id, - "shard_state": shard.shard_state.as_json_str_name(), - "is_local": is_local, - }) - }); - per_index_shards_json - .entry(index_id.clone()) - .or_default() - .extend(shards_json); - } - } - per_index_shards_json - } - - #[cfg(test)] - pub fn len(&self) -> usize { - self.table.len() - } -} - -#[cfg(test)] -mod tests { - use quickwit_proto::ingest::ShardState; - use quickwit_proto::ingest::ingester::IngesterServiceClient; - - use super::*; - - #[test] - fn test_routing_table_entry_new() { - let self_node_id: NodeId = "test-node-0".into(); - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - let table_entry = RoutingTableEntry::new( - &self_node_id, - index_uid.clone(), - source_id.clone(), - Vec::new(), - ); - assert_eq!(table_entry.len(), 0); - - let index_uid: IndexUid = IndexUid::for_test("test-index", 0); - let shards = vec![ - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(3)), - shard_state: ShardState::Open as i32, - leader_id: "test-node-0".to_string(), - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - leader_id: "test-node-0".to_string(), - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(2)), - shard_state: ShardState::Open as i32, - leader_id: "test-node-1".to_string(), - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - leader_id: "test-node-0".to_string(), - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(4)), - shard_state: ShardState::Closed as i32, - leader_id: "test-node-0".to_string(), - ..Default::default() - }, - ]; - let table_entry = RoutingTableEntry::new(&self_node_id, index_uid, source_id, shards); - assert_eq!(table_entry.local_shards.len(), 2); - assert_eq!(table_entry.local_shards[0].shard_id, ShardId::from(1)); - assert_eq!(table_entry.local_shards[1].shard_id, ShardId::from(3)); - - assert_eq!(table_entry.remote_shards.len(), 1); - assert_eq!(table_entry.remote_shards[0].shard_id, ShardId::from(2)); - } - - #[test] - fn test_routing_table_entry_has_open_shards() { - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - let table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone()); - - let mut closed_shard_ids = Vec::new(); - let ingester_pool = IngesterPool::default(); - let mut unavailable_leaders = HashSet::new(); - - assert!(!table_entry.has_open_shards( - &ingester_pool, - &mut closed_shard_ids, - &mut unavailable_leaders - )); - assert!(closed_shard_ids.is_empty()); - assert!(unavailable_leaders.is_empty()); - - ingester_pool.insert( - "test-ingester-0".into(), - crate::IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: None, - }, - ); - ingester_pool.insert( - "test-ingester-1".into(), - crate::IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: None, - }, - ); - - let table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Closed, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - ], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: Vec::new(), - remote_round_robin_idx: AtomicUsize::default(), - }; - assert!(table_entry.has_open_shards( - &ingester_pool, - &mut closed_shard_ids, - &mut unavailable_leaders - )); - assert_eq!(closed_shard_ids.len(), 1); - assert_eq!(closed_shard_ids[0], ShardId::from(1)); - assert!(unavailable_leaders.is_empty()); - - closed_shard_ids.clear(); - - let table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id, - local_shards: Vec::new(), - local_round_robin_idx: AtomicUsize::default(), - remote_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Closed, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-2".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(3), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - ], - remote_round_robin_idx: AtomicUsize::default(), - }; - assert!(table_entry.has_open_shards( - &ingester_pool, - &mut closed_shard_ids, - &mut unavailable_leaders - )); - assert_eq!(closed_shard_ids.len(), 1); - assert_eq!(closed_shard_ids[0], ShardId::from(1)); - assert_eq!(unavailable_leaders.len(), 1); - assert!(unavailable_leaders.contains("test-ingester-2")); - } - - #[test] - fn test_routing_table_entry_next_open_shard_round_robin() { - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - let table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone()); - let ingester_pool = IngesterPool::default(); - let mut rate_limited_shards = HashSet::new(); - - let error = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap_err(); - assert_eq!(error, NextOpenShardError::NoShardsAvailable); - - ingester_pool.insert( - "test-ingester-0".into(), - crate::IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: None, - }, - ); - ingester_pool.insert( - "test-ingester-1".into(), - crate::IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: None, - }, - ); - - let table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Closed, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(3), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - ], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: Vec::new(), - remote_round_robin_idx: AtomicUsize::default(), - }; - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(3)); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); - - let table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Closed, - leader_id: "test-ingester-0".into(), - }], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(3), - shard_state: ShardState::Closed, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(4), - shard_state: ShardState::Open, - leader_id: "test-ingester-2".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(5), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - ], - remote_round_robin_idx: AtomicUsize::default(), - }; - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(5)); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); - - rate_limited_shards.insert(ShardId::from(5)); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); - } - - #[test] - fn test_routing_table_entry_next_open_shard_round_robin_rate_limited_error() { - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - - let ingester_pool = IngesterPool::default(); - ingester_pool.insert( - "test-ingester-0".into(), - crate::IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: None, - }, - ); - - let rate_limited_shards = HashSet::from_iter([ShardId::from(1)]); - - let table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: Vec::new(), - remote_round_robin_idx: AtomicUsize::default(), - }; - let error = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap_err(); - assert_eq!(error, NextOpenShardError::RateLimited); - } - - #[test] - fn test_routing_table_entry_insert_open_shards() { - let index_uid_0 = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - let mut table_entry = RoutingTableEntry::empty(index_uid_0.clone(), source_id.clone()); - - let local_node_id: NodeId = "test-ingester-0".into(); - let remote_node_id: NodeId = "test-ingester-1".into(); - table_entry.insert_open_shards(&local_node_id, &local_node_id, &index_uid_0, &[]); - - assert_eq!(table_entry.local_shards.len(), 0); - assert_eq!(table_entry.remote_shards.len(), 0); - - table_entry.insert_open_shards( - &local_node_id, - &local_node_id, - &index_uid_0, - &[ShardId::from(2)], - ); - - assert_eq!(table_entry.local_shards.len(), 1); - assert_eq!(table_entry.remote_shards.len(), 0); - - assert_eq!(table_entry.local_shards[0].index_uid, index_uid_0); - assert_eq!(table_entry.local_shards[0].source_id, source_id); - assert_eq!(table_entry.local_shards[0].shard_id, ShardId::from(2)); - assert_eq!(table_entry.local_shards[0].shard_state, ShardState::Open); - assert_eq!(table_entry.local_shards[0].leader_id, local_node_id); - - table_entry.local_shards[0].shard_state = ShardState::Closed; - table_entry.insert_open_shards( - &local_node_id, - &local_node_id, - &index_uid_0, - &[ShardId::from(1), ShardId::from(2)], - ); - - assert_eq!(table_entry.local_shards.len(), 2); - assert_eq!(table_entry.remote_shards.len(), 0); - - assert_eq!(table_entry.local_shards[0].shard_id, ShardId::from(1)); - assert_eq!(table_entry.local_shards[0].shard_state, ShardState::Open); - assert_eq!(table_entry.local_shards[1].shard_id, ShardId::from(2)); - assert_eq!(table_entry.local_shards[1].shard_state, ShardState::Closed); - - table_entry.local_shards.clear(); - table_entry.insert_open_shards( - &local_node_id, - &remote_node_id, - &index_uid_0, - &[ShardId::from(2)], - ); - - assert_eq!(table_entry.local_shards.len(), 0); - assert_eq!(table_entry.remote_shards.len(), 1); - - assert_eq!(table_entry.remote_shards[0].index_uid, index_uid_0); - assert_eq!(table_entry.remote_shards[0].source_id, source_id); - assert_eq!(table_entry.remote_shards[0].shard_id, ShardId::from(2)); - assert_eq!(table_entry.remote_shards[0].shard_state, ShardState::Open); - assert_eq!(table_entry.remote_shards[0].leader_id, remote_node_id); - - table_entry.remote_shards[0].shard_state = ShardState::Closed; - table_entry.insert_open_shards( - &local_node_id, - &remote_node_id, - &index_uid_0, - &[ShardId::from(1), ShardId::from(2)], - ); - - assert_eq!(table_entry.local_shards.len(), 0); - assert_eq!(table_entry.remote_shards.len(), 2); - - assert_eq!(table_entry.remote_shards[0].shard_id, ShardId::from(1)); - assert_eq!(table_entry.remote_shards[0].shard_state, ShardState::Open); - assert_eq!(table_entry.remote_shards[1].shard_id, ShardId::from(2)); - assert_eq!(table_entry.remote_shards[1].shard_state, ShardState::Closed); - - // Update index incarnation. - let index_uid_1 = IndexUid::for_test("test-index", 1); - table_entry.insert_open_shards( - &local_node_id, - &local_node_id, - &index_uid_1, - &[ShardId::from(1)], - ); - - assert_eq!(table_entry.index_uid, index_uid_1); - assert_eq!(table_entry.local_shards.len(), 1); - assert_eq!(table_entry.remote_shards.len(), 0); - - assert_eq!(table_entry.local_shards[0].index_uid, index_uid_1); - assert_eq!(table_entry.local_shards[0].source_id, source_id); - assert_eq!(table_entry.local_shards[0].shard_id, ShardId::from(1)); - assert_eq!(table_entry.local_shards[0].shard_state, ShardState::Open); - assert_eq!(table_entry.local_shards[0].leader_id, local_node_id); - - // Ignore previous index incarnation. - table_entry.insert_open_shards( - &local_node_id, - &local_node_id, - &index_uid_0, - &[ShardId::from(12), ShardId::from(42), ShardId::from(1337)], - ); - assert_eq!(table_entry.index_uid, index_uid_1); - assert_eq!(table_entry.local_shards.len(), 1); - assert_eq!(table_entry.remote_shards.len(), 0); - } - - #[test] - fn test_routing_table_entry_close_shards() { - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - - let mut table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone()); - table_entry.close_shards(&index_uid, &[]); - table_entry.close_shards(&index_uid, &[ShardId::from(1)]); - assert!(table_entry.local_shards.is_empty()); - assert!(table_entry.remote_shards.is_empty()); - - let mut table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(3), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - ], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(5), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(6), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(7), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - ], - remote_round_robin_idx: AtomicUsize::default(), - }; - table_entry.close_shards( - &index_uid, - &[ - ShardId::from(1), - ShardId::from(3), - ShardId::from(4), - ShardId::from(6), - ShardId::from(8), - ], - ); - assert!(table_entry.local_shards[0].shard_state.is_closed()); - assert!(table_entry.local_shards[1].shard_state.is_open()); - assert!(table_entry.local_shards[2].shard_state.is_closed()); - assert!(table_entry.remote_shards[0].shard_state.is_open()); - assert!(table_entry.remote_shards[1].shard_state.is_closed()); - assert!(table_entry.remote_shards[2].shard_state.is_open()); - } - - #[test] - fn test_routing_table_entry_delete_shards() { - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - - let mut table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone()); - table_entry.delete_shards(&index_uid, &[]); - table_entry.delete_shards(&index_uid, &[ShardId::from(1)]); - assert!(table_entry.local_shards.is_empty()); - assert!(table_entry.remote_shards.is_empty()); - - let mut table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(3), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - ], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(5), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(6), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(7), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - ], - remote_round_robin_idx: AtomicUsize::default(), - }; - table_entry.delete_shards( - &index_uid, - &[ - ShardId::from(1), - ShardId::from(3), - ShardId::from(4), - ShardId::from(6), - ShardId::from(8), - ], - ); - assert_eq!(table_entry.local_shards.len(), 1); - assert_eq!(table_entry.local_shards[0].shard_id, ShardId::from(2)); - assert_eq!(table_entry.remote_shards.len(), 2); - assert_eq!(table_entry.remote_shards[0].shard_id, ShardId::from(5)); - assert_eq!(table_entry.remote_shards[1].shard_id, ShardId::from(7)); - } -} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index e158bce7c58..b77de8d608a 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -35,7 +35,7 @@ use tracing::{error, info}; use super::models::IngesterShard; use super::rate_meter::RateMeter; use super::replication::{ReplicationStreamTaskHandle, ReplicationTaskHandle}; -use super::wal_capacity_timeseries::WalDiskCapacityTimeSeries; +use super::wal_capacity_tracker::WalCapacityTracker; use crate::ingest_v2::mrecordlog_utils::{force_delete_queue, queue_position_range}; use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{FollowerId, LeaderId, OpenShardCounts}; @@ -61,7 +61,7 @@ pub(super) struct InnerIngesterState { pub replication_streams: HashMap, // Replication tasks running for each replication stream opened with leaders. pub replication_tasks: HashMap, - pub wal_capacity_time_series: WalDiskCapacityTimeSeries, + pub wal_capacity_tracker: WalCapacityTracker, status: IngesterStatus, status_tx: watch::Sender, } @@ -130,7 +130,7 @@ impl InnerIngesterState { } impl IngesterState { - fn new(disk_capacity: ByteSize) -> Self { + fn new(disk_capacity: ByteSize, memory_capacity: ByteSize) -> Self { let status = IngesterStatus::Initializing; let (status_tx, status_rx) = watch::channel(status); let inner = InnerIngesterState { @@ -138,7 +138,7 @@ impl IngesterState { doc_mappers: Default::default(), replication_streams: Default::default(), replication_tasks: Default::default(), - wal_capacity_time_series: WalDiskCapacityTimeSeries::new(disk_capacity), + wal_capacity_tracker: WalCapacityTracker::new(disk_capacity, memory_capacity), status, status_tx, }; @@ -155,9 +155,10 @@ impl IngesterState { pub fn load( wal_dir_path: &Path, disk_capacity: ByteSize, + memory_capacity: ByteSize, rate_limiter_settings: RateLimiterSettings, ) -> Self { - let state = Self::new(disk_capacity); + let state = Self::new(disk_capacity, memory_capacity); let state_clone = state.clone(); let wal_dir_path = wal_dir_path.to_path_buf(); @@ -180,6 +181,7 @@ impl IngesterState { let mut state = IngesterState::load( temp_dir.path(), disk_capacity, + ByteSize::mb(256), RateLimiterSettings::default(), ); @@ -530,7 +532,7 @@ mod tests { #[tokio::test] async fn test_ingester_state_does_not_lock_while_initializing() { - let state = IngesterState::new(ByteSize::mb(256)); + let state = IngesterState::new(ByteSize::mb(256), ByteSize::mb(256)); let inner_guard = state.inner.lock().await; assert_eq!(inner_guard.status(), IngesterStatus::Initializing); @@ -545,7 +547,7 @@ mod tests { #[tokio::test] async fn test_ingester_state_failed() { - let state = IngesterState::new(ByteSize::mb(256)); + let state = IngesterState::new(ByteSize::mb(256), ByteSize::mb(256)); state.inner.lock().await.set_status(IngesterStatus::Failed); @@ -558,7 +560,7 @@ mod tests { #[tokio::test] async fn test_ingester_state_init() { - let mut state = IngesterState::new(ByteSize::mb(256)); + let mut state = IngesterState::new(ByteSize::mb(256), ByteSize::mb(256)); let temp_dir = tempfile::tempdir().unwrap(); state diff --git a/quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_timeseries.rs b/quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_tracker.rs similarity index 76% rename from quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_timeseries.rs rename to quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_tracker.rs index 58f030cbf74..f24e8254053 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_timeseries.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_tracker.rs @@ -25,38 +25,36 @@ const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; /// reading would be discarded when the next reading is inserted. const WAL_CAPACITY_READINGS_LEN: usize = WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1; -pub struct WalDiskCapacityTimeSeries { - disk_capacity: ByteSize, +struct WalCapacityTimeSeries { + capacity: ByteSize, readings: RingBuffer, } -impl WalDiskCapacityTimeSeries { - pub fn new(disk_capacity: ByteSize) -> Self { +impl WalCapacityTimeSeries { + fn new(capacity: ByteSize) -> Self { #[cfg(not(test))] - assert!(disk_capacity.as_u64() > 0); + assert!(capacity.as_u64() > 0); Self { - disk_capacity, + capacity, readings: RingBuffer::default(), } } - /// Records a disk usage reading and returns the resulting capacity score. - pub fn record_and_score(&mut self, disk_used: ByteSize) -> usize { - self.record(disk_used); + fn record_and_score(&mut self, used: ByteSize) -> usize { + self.record(used); let remaining = self.current().unwrap_or(1.0); let delta = self.delta().unwrap_or(0.0); compute_capacity_score(remaining, delta) } - /// Computes a capacity score for the given disk usage without recording it. - pub fn score(&self, disk_used: ByteSize) -> usize { - let remaining = 1.0 - (disk_used.as_u64() as f64 / self.disk_capacity.as_u64() as f64); + fn score(&self, used: ByteSize) -> usize { + let remaining = 1.0 - (used.as_u64() as f64 / self.capacity.as_u64() as f64); let delta = self.delta().unwrap_or(0.0); compute_capacity_score(remaining, delta) } - fn record(&mut self, disk_used: ByteSize) { - let remaining = 1.0 - (disk_used.as_u64() as f64 / self.disk_capacity.as_u64() as f64); + fn record(&mut self, used: ByteSize) { + let remaining = 1.0 - (used.as_u64() as f64 / self.capacity.as_u64() as f64); self.readings.push_back(remaining.clamp(0.0, 1.0)); } @@ -64,8 +62,6 @@ impl WalDiskCapacityTimeSeries { self.readings.last() } - /// How much remaining capacity changed between the oldest and newest readings. - /// Positive = improving, negative = draining. fn delta(&self) -> Option { let current = self.readings.last()?; let oldest = self.readings.front()?; @@ -73,6 +69,35 @@ impl WalDiskCapacityTimeSeries { } } +pub struct WalCapacityTracker { + disk: WalCapacityTimeSeries, + memory: WalCapacityTimeSeries, +} + +impl WalCapacityTracker { + pub fn new(disk_capacity: ByteSize, memory_capacity: ByteSize) -> Self { + Self { + disk: WalCapacityTimeSeries::new(disk_capacity), + memory: WalCapacityTimeSeries::new(memory_capacity), + } + } + + /// Records disk and memory usage readings and returns the resulting capacity score. + /// The score is the minimum of the individual disk and memory scores. + pub fn record_and_score(&mut self, disk_used: ByteSize, memory_used: ByteSize) -> usize { + let disk_score = self.disk.record_and_score(disk_used); + let memory_score = self.memory.record_and_score(memory_used); + disk_score.min(memory_score) + } + + /// Computes a capacity score for the given usage without recording it. + pub fn score(&self, disk_used: ByteSize, memory_used: ByteSize) -> usize { + let disk_score = self.disk.score(disk_used); + let memory_score = self.memory.score(memory_used); + disk_score.min(memory_score) + } +} + /// Computes a capacity score from 0 to 10 using a PD controller. /// /// The score has two components: @@ -115,18 +140,18 @@ fn compute_capacity_score(remaining_capacity: f64, capacity_delta: f64) -> usize mod tests { use super::*; - fn ts() -> WalDiskCapacityTimeSeries { - WalDiskCapacityTimeSeries::new(ByteSize::b(100)) + fn ts() -> WalCapacityTimeSeries { + WalCapacityTimeSeries::new(ByteSize::b(100)) } /// Helper: record a reading with `used` bytes against the series' fixed capacity. - fn record(series: &mut WalDiskCapacityTimeSeries, used: u64) { + fn record(series: &mut WalCapacityTimeSeries, used: u64) { series.record(ByteSize::b(used)); } #[test] fn test_wal_disk_capacity_current_after_record() { - let mut series = WalDiskCapacityTimeSeries::new(ByteSize::b(256)); + let mut series = WalCapacityTimeSeries::new(ByteSize::b(256)); // 192 of 256 used => 25% remaining series.record(ByteSize::b(192)); assert_eq!(series.current(), Some(0.25)); @@ -211,4 +236,14 @@ mod tests { record(&mut series, 0); assert_eq!(series.delta(), Some(0.50)); } + + #[test] + fn test_wal_capacity_tracker_returns_min() { + let mut tracker = WalCapacityTracker::new(ByteSize::b(100), ByteSize::b(100)); + // Disk 10% used (score 9), memory 90% used (score 2) → returns 2. + assert_eq!( + tracker.record_and_score(ByteSize::b(10), ByteSize::b(90)), + 2 + ); + } } From 4860c28c643917394b704810d16ec4e4670a5841 Mon Sep 17 00:00:00 2001 From: nadav-govari Date: Thu, 5 Mar 2026 15:50:21 -0500 Subject: [PATCH 2/4] Add az-aware ingest attempts metric (#6194) * Add metrics for az-aware ingest * finally fix that lint issue * emit a simple metric instead * rename * lints --- quickwit/quickwit-ingest/src/ingest_v2/metrics.rs | 8 ++++++++ quickwit/quickwit-ingest/src/ingest_v2/router.rs | 11 +++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index 1fb32c0b2fd..87975a3c462 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -82,12 +82,20 @@ pub(super) struct IngestV2Metrics { pub wal_disk_used_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, pub ingest_results: IngestResultMetrics, + pub ingest_attempts: IntCounterVec<1>, } impl Default for IngestV2Metrics { fn default() -> Self { Self { ingest_results: IngestResultMetrics::default(), + ingest_attempts: new_counter_vec::<1>( + "ingest_attempts", + "Number of routing attempts by AZ locality", + "ingest", + &[], + ["az_routing"], + ), reset_shards_operations_total: new_counter_vec( "reset_shards_operations_total", "Total number of reset shards operations performed.", diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index a1f5bf86302..77c4adb330f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -50,6 +50,7 @@ use super::node_routing_table::NodeBasedRoutingTable; use super::workbench::IngestWorkbench; use super::{IngesterPool, pending_subrequests}; use crate::get_ingest_router_buffer_size; +use crate::ingest_v2::metrics::INGEST_V2_METRICS; /// Duration after which ingest requests time out with [`IngestV2Error::Timeout`]. fn ingest_request_timeout() -> Duration { @@ -367,6 +368,13 @@ impl IngestRouter { continue; } }; + let az_locality = state_guard + .node_routing_table + .classify_az_locality(&ingester_node.node_id, &self.self_node_id); + INGEST_V2_METRICS + .ingest_attempts + .with_label_values([az_locality]) + .inc(); let persist_subrequest = PersistSubrequest { subrequest_id: subrequest.subrequest_id, index_uid: Some(ingester_node.index_uid.clone()), @@ -486,8 +494,7 @@ impl IngestRouter { fn update_ingest_metrics(ingest_result: &IngestV2Result, num_subrequests: usize) { let num_subrequests = num_subrequests as u64; - let ingest_results_metrics: &IngestResultMetrics = - &crate::ingest_v2::metrics::INGEST_V2_METRICS.ingest_results; + let ingest_results_metrics: &IngestResultMetrics = &INGEST_V2_METRICS.ingest_results; match ingest_result { Ok(ingest_response) => { ingest_results_metrics From fae3fbb3d2891909db98d9114f36fdfd3fd4f8af Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Mon, 9 Mar 2026 14:27:36 -0400 Subject: [PATCH 3/4] Address PR comments --- ...er_capacity_score.rs => capacity_score.rs} | 0 .../src/ingest_v2/broadcast/mod.rs | 4 +- quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 2 +- .../quickwit-ingest/src/ingest_v2/router.rs | 40 +++++++------- ...node_routing_table.rs => routing_table.rs} | 52 +++++++++++++++---- 5 files changed, 66 insertions(+), 32 deletions(-) rename quickwit/quickwit-ingest/src/ingest_v2/broadcast/{ingester_capacity_score.rs => capacity_score.rs} (100%) rename quickwit/quickwit-ingest/src/ingest_v2/{node_routing_table.rs => routing_table.rs} (90%) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs similarity index 100% rename from quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs rename to quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index 18a00209de1..b579382af78 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. #[allow(dead_code)] -mod ingester_capacity_score; +mod capacity_score; mod local_shards; use std::time::Duration; @@ -26,7 +26,7 @@ pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(tes Duration::from_secs(5) }; -pub use ingester_capacity_score::{ +pub use capacity_score::{ BroadcastIngesterCapacityScoreTask, IngesterCapacityScoreUpdate, setup_ingester_capacity_update_listener, }; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 4003f8d1d0e..471b15e95e2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -22,7 +22,7 @@ mod metrics; mod models; mod mrecord; mod mrecordlog_utils; -mod node_routing_table; +mod routing_table; mod publish_tracker; mod rate_meter; mod replication; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 77c4adb330f..a803b242c4f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -46,7 +46,7 @@ use super::debouncing::{ }; use super::ingester::PERSIST_REQUEST_TIMEOUT; use super::metrics::IngestResultMetrics; -use super::node_routing_table::NodeBasedRoutingTable; +use super::routing_table::RoutingTable; use super::workbench::IngestWorkbench; use super::{IngesterPool, pending_subrequests}; use crate::get_ingest_router_buffer_size; @@ -102,7 +102,7 @@ struct RouterState { // Debounces `GetOrCreateOpenShardsRequest` requests to the control plane. debouncer: GetOrCreateOpenShardsRequestDebouncer, // Routing table of nodes, their WAL capacity, and the number of open shards per source. - node_routing_table: NodeBasedRoutingTable, + routing_table: RoutingTable, } impl fmt::Debug for IngestRouter { @@ -125,7 +125,7 @@ impl IngestRouter { ) -> Self { let state = Arc::new(Mutex::new(RouterState { debouncer: GetOrCreateOpenShardsRequestDebouncer::default(), - node_routing_table: NodeBasedRoutingTable::new(self_availability_zone), + routing_table: RoutingTable::new(self_availability_zone), })); let ingest_semaphore_permits = get_ingest_router_buffer_size().as_u64() as usize; let ingest_semaphore = Arc::new(Semaphore::new(ingest_semaphore_permits)); @@ -161,7 +161,7 @@ impl IngestRouter { let mut state_guard = self.state.lock().await; for subrequest in pending_subrequests(&workbench.subworkbenches) { - if !state_guard.node_routing_table.has_open_nodes( + if !state_guard.routing_table.has_open_nodes( &subrequest.index_id, &subrequest.source_id, ingester_pool, @@ -252,7 +252,7 @@ impl IngestRouter { let mut state_guard = self.state.lock().await; for success in response.successes { - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( success.index_uid().clone(), success.source_id, success.open_shards, @@ -303,7 +303,7 @@ impl IngestRouter { // opportunity to get a fresh routing update. let mut state_guard = self.state.lock().await; for shard_update in routing_update.source_shard_updates { - state_guard.node_routing_table.apply_capacity_update( + state_guard.routing_table.apply_capacity_update( leader_id.clone(), shard_update.index_uid().clone(), shard_update.source_id, @@ -354,7 +354,7 @@ impl IngestRouter { let state_guard = self.state.lock().await; for subrequest in pending_subrequests(&workbench.subworkbenches) { - let ingester_node = state_guard.node_routing_table.pick_node( + let ingester_node = state_guard.routing_table.pick_node( &subrequest.index_id, &subrequest.source_id, &self.ingester_pool, @@ -369,8 +369,8 @@ impl IngestRouter { } }; let az_locality = state_guard - .node_routing_table - .classify_az_locality(&ingester_node.node_id, &self.self_node_id); + .routing_table + .classify_az_locality(&ingester_node.node_id, &self.ingester_pool); INGEST_V2_METRICS .ingest_attempts .with_label_values([az_locality]) @@ -483,7 +483,7 @@ impl IngestRouter { pub async fn debug_info(&self) -> JsonValue { let state_guard = self.state.lock().await; let routing_table_json = state_guard - .node_routing_table + .routing_table .debug_info(&self.ingester_pool); json!({ @@ -614,7 +614,7 @@ impl EventSubscriber for WeakRouterState { return; }; let mut state_guard = state.lock().await; - state_guard.node_routing_table.apply_capacity_update( + state_guard.routing_table.apply_capacity_update( update.node_id, update.source_uid.index_uid, update.source_uid.source_id, @@ -681,7 +681,7 @@ mod tests { { let mut state_guard = router.state.lock().await; - state_guard.node_routing_table.apply_capacity_update( + state_guard.routing_table.apply_capacity_update( "test-ingester-0".into(), IndexUid::for_test("test-index-0", 0), "test-source".to_string(), @@ -1280,7 +1280,7 @@ mod tests { let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0); { let mut state_guard = router.state.lock().await; - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid_0.clone(), "test-source".to_string(), vec![Shard { @@ -1292,7 +1292,7 @@ mod tests { ..Default::default() }], ); - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid_1.clone(), "test-source".to_string(), vec![Shard { @@ -1427,7 +1427,7 @@ mod tests { let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); { let mut state_guard = router.state.lock().await; - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid.clone(), "test-source".to_string(), vec![Shard { @@ -1536,7 +1536,7 @@ mod tests { { let mut state_guard = router.state.lock().await; - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid_0.clone(), "test-source".to_string(), vec![Shard { @@ -1547,7 +1547,7 @@ mod tests { ..Default::default() }], ); - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid_1.clone(), "test-source".to_string(), vec![Shard { @@ -1591,7 +1591,7 @@ mod tests { let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); { let mut state_guard = router.state.lock().await; - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid.clone(), "test-source".to_string(), vec![Shard { @@ -1696,7 +1696,7 @@ mod tests { ingester_pool.insert("test-ingester-0".into(), mocked_ingester()); let state_guard = router.state.lock().await; let node = state_guard - .node_routing_table + .routing_table .pick_node("test-index", "test-source", &ingester_pool, &HashSet::new()) .unwrap(); assert_eq!(node.node_id, NodeId::from("test-ingester-0")); @@ -1843,7 +1843,7 @@ mod tests { ingester_pool.insert("test-ingester-0".into(), mocked_ingester()); let state_guard = router.state.lock().await; let node = state_guard - .node_routing_table + .routing_table .pick_node("test-index", "test-source", &ingester_pool, &HashSet::new()) .unwrap(); assert_eq!(node.node_id, NodeId::from("test-ingester-0")); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs similarity index 90% rename from quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs rename to quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 68299358f77..4468747e172 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -96,12 +96,12 @@ impl RoutingEntry { } #[derive(Debug, Default)] -pub(super) struct NodeBasedRoutingTable { +pub(super) struct RoutingTable { table: HashMap<(IndexId, SourceId), RoutingEntry>, self_availability_zone: Option, } -impl NodeBasedRoutingTable { +impl RoutingTable { pub fn new(self_availability_zone: Option) -> Self { Self { self_availability_zone, @@ -125,6 +125,24 @@ impl NodeBasedRoutingTable { ) } + pub fn classify_az_locality( + &self, + target_node_id: &NodeId, + ingester_pool: &IngesterPool, + ) -> &'static str { + let Some(self_az) = &self.self_availability_zone else { + return "az_unaware"; + }; + let target_az = ingester_pool + .get(target_node_id) + .and_then(|entry| entry.availability_zone); + match target_az { + Some(ref az) if az == self_az => "same_az", + Some(_) => "cross_az", + None => "az_unaware", + } + } + pub fn debug_info( &self, ingester_pool: &IngesterPool, @@ -247,7 +265,7 @@ mod tests { #[test] fn test_apply_capacity_update() { - let mut table = NodeBasedRoutingTable::default(); + let mut table = RoutingTable::default(); let key = ("test-index".to_string(), "test-source".into()); // Insert first node. @@ -300,7 +318,7 @@ mod tests { #[test] fn test_has_open_nodes() { - let mut table = NodeBasedRoutingTable::default(); + let mut table = RoutingTable::default(); let pool = IngesterPool::default(); // Empty table. @@ -348,7 +366,7 @@ mod tests { #[test] fn test_pick_node_prefers_same_az() { - let mut table = NodeBasedRoutingTable::new(Some("az-1".to_string())); + let mut table = RoutingTable::new(Some("az-1".to_string())); let pool = IngesterPool::default(); table.apply_capacity_update( @@ -376,7 +394,7 @@ mod tests { #[test] fn test_pick_node_falls_back_to_cross_az() { - let mut table = NodeBasedRoutingTable::new(Some("az-1".to_string())); + let mut table = RoutingTable::new(Some("az-1".to_string())); let pool = IngesterPool::default(); table.apply_capacity_update( @@ -396,7 +414,7 @@ mod tests { #[test] fn test_pick_node_no_az_awareness() { - let mut table = NodeBasedRoutingTable::default(); + let mut table = RoutingTable::default(); let pool = IngesterPool::default(); table.apply_capacity_update( @@ -416,7 +434,7 @@ mod tests { #[test] fn test_pick_node_missing_entry() { - let table = NodeBasedRoutingTable::new(Some("az-1".to_string())); + let table = RoutingTable::new(Some("az-1".to_string())); let pool = IngesterPool::default(); assert!( @@ -465,7 +483,7 @@ mod tests { #[test] fn test_merge_from_shards() { - let mut table = NodeBasedRoutingTable::default(); + let mut table = RoutingTable::default(); let index_uid = IndexUid::for_test("test-index", 0); let key = ("test-index".to_string(), "test-source".to_string()); @@ -516,4 +534,20 @@ mod tests { assert!(entry.nodes.contains_key("node-3")); assert!(entry.nodes.contains_key("node-4")); } + + #[test] + fn test_classify_az_locality() { + let table = RoutingTable::new(Some("az-1".to_string())); + let pool = IngesterPool::default(); + pool.insert("node-local".into(), mocked_ingester(Some("az-1"))); + pool.insert("node-remote".into(), mocked_ingester(Some("az-2"))); + pool.insert("node-no-az".into(), mocked_ingester(None)); + + assert_eq!(table.classify_az_locality(&"node-local".into(), &pool), "same_az"); + assert_eq!(table.classify_az_locality(&"node-remote".into(), &pool), "cross_az"); + assert_eq!(table.classify_az_locality(&"node-no-az".into(), &pool), "az_unaware"); + + let table_no_az = RoutingTable::default(); + assert_eq!(table_no_az.classify_az_locality(&"node-local".into(), &pool), "az_unaware"); + } } From 77e6cb667a38b8e9cebdb0feb344179e5178d66b Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Mon, 9 Mar 2026 14:39:27 -0400 Subject: [PATCH 4/4] clippy --- quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 2 +- .../quickwit-ingest/src/ingest_v2/router.rs | 4 +--- .../src/ingest_v2/routing_table.rs | 20 +++++++++++++++---- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 471b15e95e2..151bf219fb7 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -22,11 +22,11 @@ mod metrics; mod models; mod mrecord; mod mrecordlog_utils; -mod routing_table; mod publish_tracker; mod rate_meter; mod replication; mod router; +mod routing_table; mod state; mod wal_capacity_tracker; mod workbench; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index a803b242c4f..46a476431f9 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -482,9 +482,7 @@ impl IngestRouter { pub async fn debug_info(&self) -> JsonValue { let state_guard = self.state.lock().await; - let routing_table_json = state_guard - .routing_table - .debug_info(&self.ingester_pool); + let routing_table_json = state_guard.routing_table.debug_info(&self.ingester_pool); json!({ "routing_table": routing_table_json, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 4468747e172..670822a6ae3 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -543,11 +543,23 @@ mod tests { pool.insert("node-remote".into(), mocked_ingester(Some("az-2"))); pool.insert("node-no-az".into(), mocked_ingester(None)); - assert_eq!(table.classify_az_locality(&"node-local".into(), &pool), "same_az"); - assert_eq!(table.classify_az_locality(&"node-remote".into(), &pool), "cross_az"); - assert_eq!(table.classify_az_locality(&"node-no-az".into(), &pool), "az_unaware"); + assert_eq!( + table.classify_az_locality(&"node-local".into(), &pool), + "same_az" + ); + assert_eq!( + table.classify_az_locality(&"node-remote".into(), &pool), + "cross_az" + ); + assert_eq!( + table.classify_az_locality(&"node-no-az".into(), &pool), + "az_unaware" + ); let table_no_az = RoutingTable::default(); - assert_eq!(table_no_az.classify_az_locality(&"node-local".into(), &pool), "az_unaware"); + assert_eq!( + table_no_az.classify_az_locality(&"node-local".into(), &pool), + "az_unaware" + ); } }