diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs similarity index 97% rename from quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs rename to quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs index 482f5f58886..d9f456b7201 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs @@ -69,7 +69,10 @@ impl BroadcastIngesterCapacityScoreTask { .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; let usage = guard.mrecordlog.resource_usage(); let disk_used = ByteSize::b(usage.disk_used_bytes as u64); - let capacity_score = guard.wal_capacity_time_series.record_and_score(disk_used); + let memory_used = ByteSize::b(usage.memory_used_bytes as u64); + let capacity_score = guard + .wal_capacity_tracker + .record_and_score(disk_used, memory_used); let (open_shard_counts, _) = guard.get_shard_snapshot(); Ok(Some((capacity_score, open_shard_counts))) @@ -218,8 +221,8 @@ mod tests { state_guard.shards.insert(shard.queue_id(), shard); let (open_shard_counts, _) = state_guard.get_shard_snapshot(); let capacity_score = state_guard - .wal_capacity_time_series - .record_and_score(ByteSize::b(500)); + .wal_capacity_tracker + .record_and_score(ByteSize::b(500), ByteSize::b(0)); drop(state_guard); assert_eq!(capacity_score, 6); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index 18a00209de1..b579382af78 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. #[allow(dead_code)] -mod ingester_capacity_score; +mod capacity_score; mod local_shards; use std::time::Duration; @@ -26,7 +26,7 @@ pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(tes Duration::from_secs(5) }; -pub use ingester_capacity_score::{ +pub use capacity_score::{ BroadcastIngesterCapacityScoreTask, IngesterCapacityScoreUpdate, setup_ingester_capacity_update_listener, }; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index df51758a4ca..b77cefa7f38 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -131,7 +131,12 @@ impl Ingester { idle_shard_timeout: Duration, ) -> IngestV2Result { let self_node_id: NodeId = cluster.self_node_id().into(); - let state = IngesterState::load(wal_dir_path, disk_capacity, rate_limiter_settings); + let state = IngesterState::load( + wal_dir_path, + disk_capacity, + memory_capacity, + rate_limiter_settings, + ); let weak_state = state.weak(); BroadcastLocalShardsTask::spawn(cluster.clone(), weak_state.clone()); @@ -784,10 +789,12 @@ impl Ingester { } let wal_usage = state_guard.mrecordlog.resource_usage(); let disk_used = wal_usage.disk_used_bytes as u64; + let memory_used = wal_usage.memory_used_bytes as u64; let (open_shard_counts, closed_shards) = state_guard.get_shard_snapshot(); let capacity_score = state_guard - .wal_capacity_time_series - .score(ByteSize::b(disk_used)) as u32; + .wal_capacity_tracker + .score(ByteSize::b(disk_used), ByteSize::b(memory_used)) + as u32; drop(state_guard); if disk_used >= self.disk_capacity.as_u64() * 90 / 100 { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index 1fb32c0b2fd..87975a3c462 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -82,12 +82,20 @@ pub(super) struct IngestV2Metrics { pub wal_disk_used_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, pub ingest_results: IngestResultMetrics, + pub ingest_attempts: IntCounterVec<1>, } impl Default for IngestV2Metrics { fn default() -> Self { Self { ingest_results: IngestResultMetrics::default(), + ingest_attempts: new_counter_vec::<1>( + "ingest_attempts", + "Number of routing attempts by AZ locality", + "ingest", + &[], + ["az_routing"], + ), reset_shards_operations_total: new_counter_vec( "reset_shards_operations_total", "Total number of reset shards operations performed.", diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index d5432936e58..151bf219fb7 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -22,15 +22,13 @@ mod metrics; mod models; mod mrecord; mod mrecordlog_utils; -mod node_routing_table; mod publish_tracker; mod rate_meter; mod replication; mod router; -#[allow(dead_code)] mod routing_table; mod state; -mod wal_capacity_timeseries; +mod wal_capacity_tracker; mod workbench; use std::collections::HashMap; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs deleted file mode 100644 index 68299358f77..00000000000 --- a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs +++ /dev/null @@ -1,519 +0,0 @@ -// Copyright 2021-Present Datadog, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::{HashMap, HashSet}; - -use itertools::Itertools; -use quickwit_proto::ingest::Shard; -use quickwit_proto::types::{IndexId, IndexUid, NodeId, SourceId}; -use rand::rng; -use rand::seq::IndexedRandom; - -use crate::IngesterPool; - -/// A single ingester node's routing-relevant data for a specific (index, source) pair. -/// Each entry is self-describing: it carries its own node_id, index_uid, and source_id -/// so it can always be attributed back to a specific source on a specific node. -#[derive(Debug, Clone)] -pub(super) struct IngesterNode { - pub node_id: NodeId, - pub index_uid: IndexUid, - #[allow(unused)] - pub source_id: SourceId, - /// Score from 0-10. Higher means more available capacity. - pub capacity_score: usize, - /// Number of open shards on this node for this (index, source) pair. Tiebreaker for power of - /// two choices comparison - we favor a node with more open shards. - pub open_shard_count: usize, -} - -#[derive(Debug, Default)] -pub(super) struct RoutingEntry { - pub nodes: HashMap, -} - -/// Given a slice of candidates, picks the better of two random choices. -/// Higher capacity_score wins; tiebreak on more open_shard_count (more landing spots). -fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode { - debug_assert!(candidates.len() >= 2); - let mut iter = candidates.choose_multiple(&mut rng(), 2); - let (&a, &b) = (iter.next().unwrap(), iter.next().unwrap()); - - if (a.capacity_score, a.open_shard_count) >= (b.capacity_score, b.open_shard_count) { - a - } else { - b - } -} - -fn pick_from(candidates: Vec<&IngesterNode>) -> Option<&IngesterNode> { - match candidates.len() { - 0 => None, - 1 => Some(candidates[0]), - _ => Some(power_of_two_choices(&candidates)), - } -} - -impl RoutingEntry { - /// Pick an ingester node to persist the request to. Uses power of two choices based on reported - /// ingester capacity, if more than one eligible node exists. Prefers nodes in the same - /// availability zone, falling back to remote nodes. - fn pick_node( - &self, - ingester_pool: &IngesterPool, - unavailable_leaders: &HashSet, - self_availability_zone: &Option, - ) -> Option<&IngesterNode> { - let (local_ingesters, remote_ingesters): (Vec<&IngesterNode>, Vec<&IngesterNode>) = self - .nodes - .values() - .filter(|node| { - node.capacity_score > 0 - && node.open_shard_count > 0 - && ingester_pool.contains_key(&node.node_id) - && !unavailable_leaders.contains(&node.node_id) - }) - .partition(|node| { - let node_az = ingester_pool - .get(&node.node_id) - .and_then(|h| h.availability_zone); - node_az == *self_availability_zone - }); - - pick_from(local_ingesters).or_else(|| pick_from(remote_ingesters)) - } -} - -#[derive(Debug, Default)] -pub(super) struct NodeBasedRoutingTable { - table: HashMap<(IndexId, SourceId), RoutingEntry>, - self_availability_zone: Option, -} - -impl NodeBasedRoutingTable { - pub fn new(self_availability_zone: Option) -> Self { - Self { - self_availability_zone, - ..Default::default() - } - } - - pub fn pick_node( - &self, - index_id: &str, - source_id: &str, - ingester_pool: &IngesterPool, - unavailable_leaders: &HashSet, - ) -> Option<&IngesterNode> { - let key = (index_id.to_string(), source_id.to_string()); - let entry = self.table.get(&key)?; - entry.pick_node( - ingester_pool, - unavailable_leaders, - &self.self_availability_zone, - ) - } - - pub fn debug_info( - &self, - ingester_pool: &IngesterPool, - ) -> HashMap> { - let mut per_index: HashMap> = HashMap::new(); - for ((index_id, source_id), entry) in &self.table { - for (node_id, node) in &entry.nodes { - let az = ingester_pool.get(node_id).and_then(|h| h.availability_zone); - per_index - .entry(index_id.clone()) - .or_default() - .push(serde_json::json!({ - "source_id": source_id, - "node_id": node_id, - "capacity_score": node.capacity_score, - "open_shard_count": node.open_shard_count, - "availability_zone": az, - })); - } - } - per_index - } - - pub fn has_open_nodes( - &self, - index_id: &str, - source_id: &str, - ingester_pool: &IngesterPool, - unavailable_leaders: &HashSet, - ) -> bool { - let key = (index_id.to_string(), source_id.to_string()); - let Some(entry) = self.table.get(&key) else { - return false; - }; - entry.nodes.values().any(|node| { - node.capacity_score > 0 - && node.open_shard_count > 0 - && ingester_pool.contains_key(&node.node_id) - && !unavailable_leaders.contains(&node.node_id) - }) - } - - /// Applies a capacity update from the IngesterCapacityScoreUpdate broadcast. This is the - /// primary way the table learns about node availability and capacity. - pub fn apply_capacity_update( - &mut self, - node_id: NodeId, - index_uid: IndexUid, - source_id: SourceId, - capacity_score: usize, - open_shard_count: usize, - ) { - let key = (index_uid.index_id.to_string(), source_id.clone()); - - let entry = self.table.entry(key).or_default(); - let ingester_node = IngesterNode { - node_id: node_id.clone(), - index_uid, - source_id, - capacity_score, - open_shard_count, - }; - entry.nodes.insert(node_id, ingester_node); - } - - /// Merges routing updates from a GetOrCreateOpenShards control plane response into the - /// table. For existing nodes, updates their open shard count, including if the count is 0, from - /// the CP response while preserving capacity scores if they already exist. - /// New nodes get a default capacity_score of 5. - pub fn merge_from_shards( - &mut self, - index_uid: IndexUid, - source_id: SourceId, - shards: Vec, - ) { - let per_leader_count: HashMap = shards - .iter() - .map(|shard| { - let num_open_shards = shard.is_open() as usize; - let leader_id = NodeId::from(shard.leader_id.clone()); - (leader_id, num_open_shards) - }) - .into_grouping_map() - .sum(); - - let key = (index_uid.index_id.to_string(), source_id.clone()); - let entry = self.table.entry(key).or_default(); - - for (node_id, open_shard_count) in per_leader_count { - entry - .nodes - .entry(node_id.clone()) - .and_modify(|node| node.open_shard_count = open_shard_count) - .or_insert_with(|| IngesterNode { - node_id, - index_uid: index_uid.clone(), - source_id: source_id.clone(), - capacity_score: 5, - open_shard_count, - }); - } - } -} - -#[cfg(test)] -mod tests { - use quickwit_proto::ingest::ShardState; - use quickwit_proto::ingest::ingester::IngesterServiceClient; - use quickwit_proto::types::ShardId; - - use super::*; - use crate::IngesterPoolEntry; - - fn mocked_ingester(availability_zone: Option<&str>) -> IngesterPoolEntry { - IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: availability_zone.map(|s| s.to_string()), - } - } - - #[test] - fn test_apply_capacity_update() { - let mut table = NodeBasedRoutingTable::default(); - let key = ("test-index".to_string(), "test-source".into()); - - // Insert first node. - table.apply_capacity_update( - "node-1".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 8, - 3, - ); - let entry = table.table.get(&key).unwrap(); - assert_eq!(entry.nodes.len(), 1); - assert_eq!(entry.nodes.get("node-1").unwrap().capacity_score, 8); - - // Update existing node. - table.apply_capacity_update( - "node-1".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 4, - 5, - ); - let node = table.table.get(&key).unwrap().nodes.get("node-1").unwrap(); - assert_eq!(node.capacity_score, 4); - assert_eq!(node.open_shard_count, 5); - - // Add second node. - table.apply_capacity_update( - "node-2".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 6, - 2, - ); - assert_eq!(table.table.get(&key).unwrap().nodes.len(), 2); - - // Zero shards: node stays in table but becomes ineligible for routing. - table.apply_capacity_update( - "node-1".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 0, - 0, - ); - let entry = table.table.get(&key).unwrap(); - assert_eq!(entry.nodes.len(), 2); - assert_eq!(entry.nodes.get("node-1").unwrap().open_shard_count, 0); - assert_eq!(entry.nodes.get("node-1").unwrap().capacity_score, 0); - } - - #[test] - fn test_has_open_nodes() { - let mut table = NodeBasedRoutingTable::default(); - let pool = IngesterPool::default(); - - // Empty table. - assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); - - // Node exists but is not in pool. - table.apply_capacity_update( - "node-1".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 8, - 3, - ); - assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); - - // Node is in pool → true. - pool.insert("node-1".into(), mocked_ingester(None)); - assert!(table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); - - // Node is unavailable → false. - let unavailable: HashSet = HashSet::from(["node-1".into()]); - assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); - - // Second node available → true despite first being unavailable. - table.apply_capacity_update( - "node-2".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 6, - 2, - ); - pool.insert("node-2".into(), mocked_ingester(None)); - assert!(table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); - - // Node with capacity_score=0 is not eligible. - table.apply_capacity_update( - "node-2".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 0, - 2, - ); - assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); - } - - #[test] - fn test_pick_node_prefers_same_az() { - let mut table = NodeBasedRoutingTable::new(Some("az-1".to_string())); - let pool = IngesterPool::default(); - - table.apply_capacity_update( - "node-1".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 5, - 1, - ); - table.apply_capacity_update( - "node-2".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 5, - 1, - ); - pool.insert("node-1".into(), mocked_ingester(Some("az-1"))); - pool.insert("node-2".into(), mocked_ingester(Some("az-2"))); - - let picked = table - .pick_node("test-index", "test-source", &pool, &HashSet::new()) - .unwrap(); - assert_eq!(picked.node_id, NodeId::from("node-1")); - } - - #[test] - fn test_pick_node_falls_back_to_cross_az() { - let mut table = NodeBasedRoutingTable::new(Some("az-1".to_string())); - let pool = IngesterPool::default(); - - table.apply_capacity_update( - "node-2".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 5, - 1, - ); - pool.insert("node-2".into(), mocked_ingester(Some("az-2"))); - - let picked = table - .pick_node("test-index", "test-source", &pool, &HashSet::new()) - .unwrap(); - assert_eq!(picked.node_id, NodeId::from("node-2")); - } - - #[test] - fn test_pick_node_no_az_awareness() { - let mut table = NodeBasedRoutingTable::default(); - let pool = IngesterPool::default(); - - table.apply_capacity_update( - "node-1".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 5, - 1, - ); - pool.insert("node-1".into(), mocked_ingester(Some("az-1"))); - - let picked = table - .pick_node("test-index", "test-source", &pool, &HashSet::new()) - .unwrap(); - assert_eq!(picked.node_id, NodeId::from("node-1")); - } - - #[test] - fn test_pick_node_missing_entry() { - let table = NodeBasedRoutingTable::new(Some("az-1".to_string())); - let pool = IngesterPool::default(); - - assert!( - table - .pick_node("nonexistent", "source", &pool, &HashSet::new()) - .is_none() - ); - } - - #[test] - fn test_power_of_two_choices() { - // 3 candidates: best appears in the random pair 2/3 of the time and always - // wins when it does, so it should win ~67% of 1000 runs. Asserting > 550 - // is ~7.5 standard deviations from the mean — effectively impossible to flake. - let high = IngesterNode { - node_id: "high".into(), - index_uid: IndexUid::for_test("idx", 0), - source_id: "src".into(), - capacity_score: 9, - open_shard_count: 2, - }; - let mid = IngesterNode { - node_id: "mid".into(), - index_uid: IndexUid::for_test("idx", 0), - source_id: "src".into(), - capacity_score: 5, - open_shard_count: 2, - }; - let low = IngesterNode { - node_id: "low".into(), - index_uid: IndexUid::for_test("idx", 0), - source_id: "src".into(), - capacity_score: 1, - open_shard_count: 2, - }; - let candidates: Vec<&IngesterNode> = vec![&high, &mid, &low]; - - let mut high_wins = 0; - for _ in 0..1000 { - if power_of_two_choices(&candidates).node_id == "high" { - high_wins += 1; - } - } - assert!(high_wins > 550, "high won only {high_wins}/1000 times"); - } - - #[test] - fn test_merge_from_shards() { - let mut table = NodeBasedRoutingTable::default(); - let index_uid = IndexUid::for_test("test-index", 0); - let key = ("test-index".to_string(), "test-source".to_string()); - - let make_shard = |id: u64, leader: &str, open: bool| Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(id)), - shard_state: if open { - ShardState::Open as i32 - } else { - ShardState::Closed as i32 - }, - leader_id: leader.to_string(), - ..Default::default() - }; - - // Two open shards on node-1, one open + one closed on node-2, only closed on node-3. - let shards = vec![ - make_shard(1, "node-1", true), - make_shard(2, "node-1", true), - make_shard(3, "node-2", true), - make_shard(4, "node-2", false), - make_shard(5, "node-3", false), - ]; - table.merge_from_shards(index_uid.clone(), "test-source".into(), shards); - - let entry = table.table.get(&key).unwrap(); - assert_eq!(entry.nodes.len(), 3); - - let n1 = entry.nodes.get("node-1").unwrap(); - assert_eq!(n1.open_shard_count, 2); - assert_eq!(n1.capacity_score, 5); - - let n2 = entry.nodes.get("node-2").unwrap(); - assert_eq!(n2.open_shard_count, 1); - - let n3 = entry.nodes.get("node-3").unwrap(); - assert_eq!(n3.open_shard_count, 0); - - // Merging again adds new nodes but preserves existing ones. - let shards = vec![make_shard(10, "node-4", true)]; - table.merge_from_shards(index_uid, "test-source".into(), shards); - - let entry = table.table.get(&key).unwrap(); - assert_eq!(entry.nodes.len(), 4); - assert!(entry.nodes.contains_key("node-1")); - assert!(entry.nodes.contains_key("node-2")); - assert!(entry.nodes.contains_key("node-3")); - assert!(entry.nodes.contains_key("node-4")); - } -} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index a1f5bf86302..46a476431f9 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -46,10 +46,11 @@ use super::debouncing::{ }; use super::ingester::PERSIST_REQUEST_TIMEOUT; use super::metrics::IngestResultMetrics; -use super::node_routing_table::NodeBasedRoutingTable; +use super::routing_table::RoutingTable; use super::workbench::IngestWorkbench; use super::{IngesterPool, pending_subrequests}; use crate::get_ingest_router_buffer_size; +use crate::ingest_v2::metrics::INGEST_V2_METRICS; /// Duration after which ingest requests time out with [`IngestV2Error::Timeout`]. fn ingest_request_timeout() -> Duration { @@ -101,7 +102,7 @@ struct RouterState { // Debounces `GetOrCreateOpenShardsRequest` requests to the control plane. debouncer: GetOrCreateOpenShardsRequestDebouncer, // Routing table of nodes, their WAL capacity, and the number of open shards per source. - node_routing_table: NodeBasedRoutingTable, + routing_table: RoutingTable, } impl fmt::Debug for IngestRouter { @@ -124,7 +125,7 @@ impl IngestRouter { ) -> Self { let state = Arc::new(Mutex::new(RouterState { debouncer: GetOrCreateOpenShardsRequestDebouncer::default(), - node_routing_table: NodeBasedRoutingTable::new(self_availability_zone), + routing_table: RoutingTable::new(self_availability_zone), })); let ingest_semaphore_permits = get_ingest_router_buffer_size().as_u64() as usize; let ingest_semaphore = Arc::new(Semaphore::new(ingest_semaphore_permits)); @@ -160,7 +161,7 @@ impl IngestRouter { let mut state_guard = self.state.lock().await; for subrequest in pending_subrequests(&workbench.subworkbenches) { - if !state_guard.node_routing_table.has_open_nodes( + if !state_guard.routing_table.has_open_nodes( &subrequest.index_id, &subrequest.source_id, ingester_pool, @@ -251,7 +252,7 @@ impl IngestRouter { let mut state_guard = self.state.lock().await; for success in response.successes { - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( success.index_uid().clone(), success.source_id, success.open_shards, @@ -302,7 +303,7 @@ impl IngestRouter { // opportunity to get a fresh routing update. let mut state_guard = self.state.lock().await; for shard_update in routing_update.source_shard_updates { - state_guard.node_routing_table.apply_capacity_update( + state_guard.routing_table.apply_capacity_update( leader_id.clone(), shard_update.index_uid().clone(), shard_update.source_id, @@ -353,7 +354,7 @@ impl IngestRouter { let state_guard = self.state.lock().await; for subrequest in pending_subrequests(&workbench.subworkbenches) { - let ingester_node = state_guard.node_routing_table.pick_node( + let ingester_node = state_guard.routing_table.pick_node( &subrequest.index_id, &subrequest.source_id, &self.ingester_pool, @@ -367,6 +368,13 @@ impl IngestRouter { continue; } }; + let az_locality = state_guard + .routing_table + .classify_az_locality(&ingester_node.node_id, &self.ingester_pool); + INGEST_V2_METRICS + .ingest_attempts + .with_label_values([az_locality]) + .inc(); let persist_subrequest = PersistSubrequest { subrequest_id: subrequest.subrequest_id, index_uid: Some(ingester_node.index_uid.clone()), @@ -474,9 +482,7 @@ impl IngestRouter { pub async fn debug_info(&self) -> JsonValue { let state_guard = self.state.lock().await; - let routing_table_json = state_guard - .node_routing_table - .debug_info(&self.ingester_pool); + let routing_table_json = state_guard.routing_table.debug_info(&self.ingester_pool); json!({ "routing_table": routing_table_json, @@ -486,8 +492,7 @@ impl IngestRouter { fn update_ingest_metrics(ingest_result: &IngestV2Result, num_subrequests: usize) { let num_subrequests = num_subrequests as u64; - let ingest_results_metrics: &IngestResultMetrics = - &crate::ingest_v2::metrics::INGEST_V2_METRICS.ingest_results; + let ingest_results_metrics: &IngestResultMetrics = &INGEST_V2_METRICS.ingest_results; match ingest_result { Ok(ingest_response) => { ingest_results_metrics @@ -607,7 +612,7 @@ impl EventSubscriber for WeakRouterState { return; }; let mut state_guard = state.lock().await; - state_guard.node_routing_table.apply_capacity_update( + state_guard.routing_table.apply_capacity_update( update.node_id, update.source_uid.index_uid, update.source_uid.source_id, @@ -674,7 +679,7 @@ mod tests { { let mut state_guard = router.state.lock().await; - state_guard.node_routing_table.apply_capacity_update( + state_guard.routing_table.apply_capacity_update( "test-ingester-0".into(), IndexUid::for_test("test-index-0", 0), "test-source".to_string(), @@ -1273,7 +1278,7 @@ mod tests { let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0); { let mut state_guard = router.state.lock().await; - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid_0.clone(), "test-source".to_string(), vec![Shard { @@ -1285,7 +1290,7 @@ mod tests { ..Default::default() }], ); - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid_1.clone(), "test-source".to_string(), vec![Shard { @@ -1420,7 +1425,7 @@ mod tests { let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); { let mut state_guard = router.state.lock().await; - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid.clone(), "test-source".to_string(), vec![Shard { @@ -1529,7 +1534,7 @@ mod tests { { let mut state_guard = router.state.lock().await; - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid_0.clone(), "test-source".to_string(), vec![Shard { @@ -1540,7 +1545,7 @@ mod tests { ..Default::default() }], ); - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid_1.clone(), "test-source".to_string(), vec![Shard { @@ -1584,7 +1589,7 @@ mod tests { let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); { let mut state_guard = router.state.lock().await; - state_guard.node_routing_table.merge_from_shards( + state_guard.routing_table.merge_from_shards( index_uid.clone(), "test-source".to_string(), vec![Shard { @@ -1689,7 +1694,7 @@ mod tests { ingester_pool.insert("test-ingester-0".into(), mocked_ingester()); let state_guard = router.state.lock().await; let node = state_guard - .node_routing_table + .routing_table .pick_node("test-index", "test-source", &ingester_pool, &HashSet::new()) .unwrap(); assert_eq!(node.node_id, NodeId::from("test-ingester-0")); @@ -1836,7 +1841,7 @@ mod tests { ingester_pool.insert("test-ingester-0".into(), mocked_ingester()); let state_guard = router.state.lock().await; let node = state_guard - .node_routing_table + .routing_table .pick_node("test-index", "test-source", &ingester_pool, &HashSet::new()) .unwrap(); assert_eq!(node.node_id, NodeId::from("test-ingester-0")); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 4b4150d6e98..670822a6ae3 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -12,485 +12,238 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use quickwit_proto::ingest::{Shard, ShardIds, ShardState}; -use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId}; -use serde_json::{Value as JsonValue, json}; -use tracing::{info, warn}; +use itertools::Itertools; +use quickwit_proto::ingest::Shard; +use quickwit_proto::types::{IndexId, IndexUid, NodeId, SourceId}; +use rand::rng; +use rand::seq::IndexedRandom; use crate::IngesterPool; -#[derive(Debug)] -pub(super) struct RoutingEntry { +/// A single ingester node's routing-relevant data for a specific (index, source) pair. +/// Each entry is self-describing: it carries its own node_id, index_uid, and source_id +/// so it can always be attributed back to a specific source on a specific node. +#[derive(Debug, Clone)] +pub(super) struct IngesterNode { + pub node_id: NodeId, pub index_uid: IndexUid, + #[allow(unused)] pub source_id: SourceId, - pub shard_id: ShardId, - pub shard_state: ShardState, - pub leader_id: NodeId, + /// Score from 0-10. Higher means more available capacity. + pub capacity_score: usize, + /// Number of open shards on this node for this (index, source) pair. Tiebreaker for power of + /// two choices comparison - we favor a node with more open shards. + pub open_shard_count: usize, } -impl From for RoutingEntry { - fn from(shard: Shard) -> Self { - let shard_id = shard.shard_id().clone(); - let shard_state = shard.shard_state(); - Self { - index_uid: shard.index_uid().clone(), - source_id: shard.source_id, - shard_id, - shard_state, - leader_id: shard.leader_id.into(), - } - } +#[derive(Debug, Default)] +pub(super) struct RoutingEntry { + pub nodes: HashMap, } -/// The set of shards the router is aware of for the given index and source. -#[derive(Debug, Default)] -pub(super) struct RoutingTableEntry { - /// Index UID of the shards. - pub index_uid: IndexUid, - /// Source ID of the shards. - pub source_id: SourceId, - /// Shards located on this node. - pub local_shards: Vec, - pub local_round_robin_idx: AtomicUsize, - /// Shards located on remote nodes. - pub remote_shards: Vec, - pub remote_round_robin_idx: AtomicUsize, +/// Given a slice of candidates, picks the better of two random choices. +/// Higher capacity_score wins; tiebreak on more open_shard_count (more landing spots). +fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode { + debug_assert!(candidates.len() >= 2); + let mut iter = candidates.choose_multiple(&mut rng(), 2); + let (&a, &b) = (iter.next().unwrap(), iter.next().unwrap()); + + if (a.capacity_score, a.open_shard_count) >= (b.capacity_score, b.open_shard_count) { + a + } else { + b + } } -impl RoutingTableEntry { - /// Creates a new entry and ensures that the shards are open, unique, and sorted by shard ID. - fn new( - self_node_id: &NodeId, - index_uid: IndexUid, - source_id: SourceId, - mut shards: Vec, - ) -> Self { - let num_shards = shards.len(); +fn pick_from(candidates: Vec<&IngesterNode>) -> Option<&IngesterNode> { + match candidates.len() { + 0 => None, + 1 => Some(candidates[0]), + _ => Some(power_of_two_choices(&candidates)), + } +} - shards.sort_unstable_by(|left, right| left.shard_id.cmp(&right.shard_id)); - shards.dedup_by(|left, right| left.shard_id == right.shard_id); +impl RoutingEntry { + /// Pick an ingester node to persist the request to. Uses power of two choices based on reported + /// ingester capacity, if more than one eligible node exists. Prefers nodes in the same + /// availability zone, falling back to remote nodes. + fn pick_node( + &self, + ingester_pool: &IngesterPool, + unavailable_leaders: &HashSet, + self_availability_zone: &Option, + ) -> Option<&IngesterNode> { + let (local_ingesters, remote_ingesters): (Vec<&IngesterNode>, Vec<&IngesterNode>) = self + .nodes + .values() + .filter(|node| { + node.capacity_score > 0 + && node.open_shard_count > 0 + && ingester_pool.contains_key(&node.node_id) + && !unavailable_leaders.contains(&node.node_id) + }) + .partition(|node| { + let node_az = ingester_pool + .get(&node.node_id) + .and_then(|h| h.availability_zone); + node_az == *self_availability_zone + }); - let (local_shards, remote_shards): (Vec<_>, Vec<_>) = shards - .into_iter() - .filter(|shard| shard.is_open()) - .map(RoutingEntry::from) - .partition(|shard| *self_node_id == shard.leader_id); + pick_from(local_ingesters).or_else(|| pick_from(remote_ingesters)) + } +} - if num_shards > local_shards.len() + remote_shards.len() { - warn!("input shards should not contain closed shards or duplicates"); - } +#[derive(Debug, Default)] +pub(super) struct RoutingTable { + table: HashMap<(IndexId, SourceId), RoutingEntry>, + self_availability_zone: Option, +} +impl RoutingTable { + pub fn new(self_availability_zone: Option) -> Self { Self { - index_uid, - source_id, - local_shards, - remote_shards, + self_availability_zone, ..Default::default() } } - fn empty(index_uid: IndexUid, source_id: SourceId) -> Self { - Self { - index_uid, - source_id, - ..Default::default() - } - } - - /// Returns `true` if at least one shard in the table entry is open and has a leader available. - /// As it goes through the list of shards in the entry, it populates `closed_shard_ids` and - /// `unavailable_leaders` with the shard IDs of the closed shards and the node ID of the - /// unavailable ingesters encountered along the way. - pub fn has_open_shards( + pub fn pick_node( &self, + index_id: &str, + source_id: &str, ingester_pool: &IngesterPool, - closed_shard_ids: &mut Vec, - unavailable_leaders: &mut HashSet, - ) -> bool { - let shards = self.local_shards.iter().chain(self.remote_shards.iter()); - - for shard in shards { - match shard.shard_state { - ShardState::Closed => { - closed_shard_ids.push(shard.shard_id.clone()); - continue; - } - ShardState::Unavailable | ShardState::Unspecified => { - continue; - } - ShardState::Open => { - if unavailable_leaders.contains(&shard.leader_id) { - continue; - } - if ingester_pool.contains_key(&shard.leader_id) { - return true; - } else { - let leader_id: NodeId = shard.leader_id.clone(); - unavailable_leaders.insert(leader_id); - } - } - } - } - false + unavailable_leaders: &HashSet, + ) -> Option<&IngesterNode> { + let key = (index_id.to_string(), source_id.to_string()); + let entry = self.table.get(&key)?; + entry.pick_node( + ingester_pool, + unavailable_leaders, + &self.self_availability_zone, + ) } - /// Returns the next open and available shard in the table entry in a round-robin fashion. - pub fn next_open_shard_round_robin( + pub fn classify_az_locality( &self, + target_node_id: &NodeId, ingester_pool: &IngesterPool, - rate_limited_shards: &HashSet, - ) -> Result<&RoutingEntry, NextOpenShardError> { - let mut error = NextOpenShardError::NoShardsAvailable; - - for (shards, round_robin_idx) in [ - (&self.local_shards, &self.local_round_robin_idx), - (&self.remote_shards, &self.remote_round_robin_idx), - ] { - if shards.is_empty() { - continue; - } - for _attempt in 0..shards.len() { - let shard_idx = round_robin_idx.fetch_add(1, Ordering::Relaxed); - let shard_routing_entry: &RoutingEntry = &shards[shard_idx % shards.len()]; - - if !shard_routing_entry.shard_state.is_open() { - continue; - } - if rate_limited_shards.contains(&shard_routing_entry.shard_id) { - error = NextOpenShardError::RateLimited; - continue; - } - if ingester_pool.contains_key(&shard_routing_entry.leader_id) { - return Ok(shard_routing_entry); - } - } - } - Err(error) - } - - /// Inserts the open shards the routing table is not aware of. - fn insert_open_shards( - &mut self, - self_node_id: &NodeId, - leader_id: &NodeId, - index_uid: &IndexUid, - shard_ids: &[ShardId], - ) { - match self.index_uid.cmp(index_uid) { - // If we receive an update for a new incarnation of the index, then we clear the entry - // and insert all the shards. - std::cmp::Ordering::Less => { - self.index_uid = index_uid.clone(); - self.clear_shards(); - } - // If we receive an update for a previous incarnation of the index, then we ignore it. - std::cmp::Ordering::Greater => { - return; - } - std::cmp::Ordering::Equal => {} + ) -> &'static str { + let Some(self_az) = &self.self_availability_zone else { + return "az_unaware"; }; - let target_shards = if self_node_id == leader_id { - &mut self.local_shards - } else { - &mut self.remote_shards - }; - let mut num_inserted_shards = 0; - let num_target_shards = target_shards.len(); - - if num_target_shards == 0 { - target_shards.reserve(num_target_shards); - target_shards.extend(shard_ids.iter().map(|shard_id| RoutingEntry { - index_uid: self.index_uid.clone(), - source_id: self.source_id.clone(), - shard_id: shard_id.clone(), - shard_state: ShardState::Open, - leader_id: leader_id.clone(), - })); - num_inserted_shards = target_shards.len(); - } else { - let shard_ids_range = target_shards[0].shard_id.clone() - ..=target_shards[num_target_shards - 1].shard_id.clone(); - - for shard_id in shard_ids { - // If we can't find the shard, then we insert it. - if shard_ids_range.contains(shard_id) { - continue; - } - if target_shards[..num_target_shards] - .binary_search_by(|shard| shard.shard_id.cmp(shard_id)) - .is_err() - { - target_shards.push(RoutingEntry { - index_uid: self.index_uid.clone(), - source_id: self.source_id.clone(), - shard_id: shard_id.clone(), - shard_state: ShardState::Open, - leader_id: leader_id.clone(), - }); - num_inserted_shards += 1; - } - } - } - if num_inserted_shards > 0 { - target_shards.sort_unstable_by(|left, right| left.shard_id.cmp(&right.shard_id)); - - info!( - index_uid=%self.index_uid, - source_id=%self.source_id, - "inserted {num_inserted_shards} shards into routing table" - ); + let target_az = ingester_pool + .get(target_node_id) + .and_then(|entry| entry.availability_zone); + match target_az { + Some(ref az) if az == self_az => "same_az", + Some(_) => "cross_az", + None => "az_unaware", } } - /// Clears local and remote shards. - fn clear_shards(&mut self) { - self.local_shards.clear(); - self.local_round_robin_idx = AtomicUsize::default(); - self.remote_shards.clear(); - self.remote_round_robin_idx = AtomicUsize::default(); - } - - /// Closes the shards identified by their shard IDs. - fn close_shards(&mut self, index_uid: &IndexUid, shard_ids: &[ShardId]) { - // If the shard table was just recently updated with shards for a new index UID, then we can - // safely discard this request. - if self.index_uid != *index_uid { - return; - } - for shards in [&mut self.local_shards, &mut self.remote_shards] { - if shards.is_empty() { - continue; - } - let num_shards = shards.len(); - let shard_ids_range = - shards[0].shard_id.clone()..=shards[num_shards - 1].shard_id.clone(); - - for shard_id in shard_ids { - if !shard_ids_range.contains(shard_id) { - continue; - } - if let Ok(shard_idx) = shards.binary_search_by(|shard| shard.shard_id.cmp(shard_id)) - { - shards[shard_idx].shard_state = ShardState::Closed; - } - } - } - } - - /// Shards the shards identified by their shard IDs. - fn delete_shards(&mut self, index_uid: &IndexUid, shard_ids: &[ShardId]) { - // If the shard table was just recently updated with shards for a new index UID, then we can - // safely discard this request. - if self.index_uid != *index_uid { - return; - } - for shards in [&mut self.local_shards, &mut self.remote_shards] { - if shards.is_empty() { - continue; - } - let num_shards = shards.len(); - let shard_ids_range = - shards[0].shard_id.clone()..=shards[num_shards - 1].shard_id.clone(); - let mut deleted_any = false; - - for shard_id in shard_ids { - if !shard_ids_range.contains(shard_id) { - continue; - } - if let Ok(shard_idx) = shards.binary_search_by(|shard| shard.shard_id.cmp(shard_id)) - { - // We use `Unspecified` as a tombstone. - shards[shard_idx].shard_state = ShardState::Unspecified; - deleted_any = true; - } - } - if deleted_any { - shards.retain(|shard| shard.shard_state != ShardState::Unspecified); + pub fn debug_info( + &self, + ingester_pool: &IngesterPool, + ) -> HashMap> { + let mut per_index: HashMap> = HashMap::new(); + for ((index_id, source_id), entry) in &self.table { + for (node_id, node) in &entry.nodes { + let az = ingester_pool.get(node_id).and_then(|h| h.availability_zone); + per_index + .entry(index_id.clone()) + .or_default() + .push(serde_json::json!({ + "source_id": source_id, + "node_id": node_id, + "capacity_score": node.capacity_score, + "open_shard_count": node.open_shard_count, + "availability_zone": az, + })); } } + per_index } - #[cfg(test)] - pub fn len(&self) -> usize { - self.local_shards.len() + self.remote_shards.len() - } - - #[cfg(test)] - pub fn all_shards(&self) -> Vec<&RoutingEntry> { - let mut shards = Vec::with_capacity(self.len()); - shards.extend(&self.local_shards); - shards.extend(&self.remote_shards); - shards - } -} - -#[derive(Debug, PartialEq, Eq)] -pub(super) enum NextOpenShardError { - NoShardsAvailable, - RateLimited, -} - -/// Stores the list of shards the router is aware of for each index and source. The resolution from -/// index and source to shards is performed using index ID (not index UID) and source ID. -#[derive(Debug)] -pub(super) struct RoutingTable { - pub self_node_id: NodeId, - pub table: HashMap<(IndexId, SourceId), RoutingTableEntry>, -} - -impl RoutingTable { - pub fn find_entry( + pub fn has_open_nodes( &self, - index_id: impl Into, - source_id: impl Into, - ) -> Option<&RoutingTableEntry> { - let key = (index_id.into(), source_id.into()); - self.table.get(&key) - } - - /// Returns `true` if the router already knows about a shard for a given source that has - /// an available `leader`. - /// - /// If this function returns false, it populates the set of unavailable leaders and closed - /// shards. These will be joined to the GetOrCreate shard request emitted to the control - /// plane. - pub fn has_open_shards( - &self, - index_id: impl Into, - source_id: impl Into, + index_id: &str, + source_id: &str, ingester_pool: &IngesterPool, - closed_shards: &mut Vec, - unavailable_leaders: &mut HashSet, + unavailable_leaders: &HashSet, ) -> bool { - let Some(entry) = self.find_entry(index_id, source_id) else { + let key = (index_id.to_string(), source_id.to_string()); + let Some(entry) = self.table.get(&key) else { return false; }; - let mut closed_shard_ids: Vec = Vec::new(); - - let result = - entry.has_open_shards(ingester_pool, &mut closed_shard_ids, unavailable_leaders); - - if !closed_shard_ids.is_empty() { - closed_shards.push(ShardIds { - index_uid: entry.index_uid.clone().into(), - source_id: entry.source_id.clone(), - shard_ids: closed_shard_ids, - }); - } - result + entry.nodes.values().any(|node| { + node.capacity_score > 0 + && node.open_shard_count > 0 + && ingester_pool.contains_key(&node.node_id) + && !unavailable_leaders.contains(&node.node_id) + }) } - /// Replaces the routing table entry for the source with the provided shards. - pub fn replace_shards( + /// Applies a capacity update from the IngesterCapacityScoreUpdate broadcast. This is the + /// primary way the table learns about node availability and capacity. + pub fn apply_capacity_update( &mut self, + node_id: NodeId, index_uid: IndexUid, - source_id: impl Into, - shards: Vec, + source_id: SourceId, + capacity_score: usize, + open_shard_count: usize, ) { - let index_id: IndexId = index_uid.index_id.to_string(); - let source_id: SourceId = source_id.into(); - let key = (index_id, source_id.clone()); - - match self.table.entry(key) { - Entry::Vacant(entry) => { - entry.insert(RoutingTableEntry::new( - &self.self_node_id, - index_uid, - source_id, - shards, - )); - } - Entry::Occupied(mut entry) => { - assert!( - entry.get().index_uid <= index_uid, - "new index incarnation should be greater or equal" - ); + let key = (index_uid.index_id.to_string(), source_id.clone()); - entry.insert(RoutingTableEntry::new( - &self.self_node_id, - index_uid, - source_id, - shards, - )); - } + let entry = self.table.entry(key).or_default(); + let ingester_node = IngesterNode { + node_id: node_id.clone(), + index_uid, + source_id, + capacity_score, + open_shard_count, }; + entry.nodes.insert(node_id, ingester_node); } - /// Inserts the shards the routing table is not aware of. - pub fn insert_open_shards( + /// Merges routing updates from a GetOrCreateOpenShards control plane response into the + /// table. For existing nodes, updates their open shard count, including if the count is 0, from + /// the CP response while preserving capacity scores if they already exist. + /// New nodes get a default capacity_score of 5. + pub fn merge_from_shards( &mut self, - leader_id: &NodeId, index_uid: IndexUid, - source_id: impl Into, - shard_ids: &[ShardId], - ) { - let index_id: IndexId = index_uid.index_id.to_string(); - let source_id: SourceId = source_id.into(); - let key = (index_id, source_id.clone()); - - self.table - .entry(key.clone()) - .or_insert_with(|| RoutingTableEntry::empty(index_uid.clone(), source_id)) - .insert_open_shards(&self.self_node_id, leader_id, &index_uid, shard_ids); - } - - /// Closes the targeted shards. - pub fn close_shards( - &mut self, - index_uid: &IndexUid, - source_id: impl Into, - shard_ids: &[ShardId], - ) { - let key = (index_uid.index_id.clone(), source_id.into()); - if let Some(entry) = self.table.get_mut(&key) { - entry.close_shards(index_uid, shard_ids); - } - } - - /// Deletes the targeted shards. - pub fn delete_shards( - &mut self, - index_uid: &IndexUid, - source_id: impl Into, - shard_ids: &[ShardId], + source_id: SourceId, + shards: Vec, ) { - let key = (index_uid.index_id.clone(), source_id.into()); - if let Some(entry) = self.table.get_mut(&key) { - entry.delete_shards(index_uid, shard_ids); - } - } - - pub fn debug_info(&self) -> HashMap> { - let mut per_index_shards_json: HashMap> = HashMap::new(); - - for ((index_id, source_id), entry) in &self.table { - for (shards, is_local) in &[(&entry.local_shards, true), (&entry.remote_shards, false)] - { - let shards_json = shards.iter().map(|shard| { - json!({ - "index_uid": shard.index_uid, - "source_id": source_id, - "shard_id": shard.shard_id, - "shard_state": shard.shard_state.as_json_str_name(), - "is_local": is_local, - }) + let per_leader_count: HashMap = shards + .iter() + .map(|shard| { + let num_open_shards = shard.is_open() as usize; + let leader_id = NodeId::from(shard.leader_id.clone()); + (leader_id, num_open_shards) + }) + .into_grouping_map() + .sum(); + + let key = (index_uid.index_id.to_string(), source_id.clone()); + let entry = self.table.entry(key).or_default(); + + for (node_id, open_shard_count) in per_leader_count { + entry + .nodes + .entry(node_id.clone()) + .and_modify(|node| node.open_shard_count = open_shard_count) + .or_insert_with(|| IngesterNode { + node_id, + index_uid: index_uid.clone(), + source_id: source_id.clone(), + capacity_score: 5, + open_shard_count, }); - per_index_shards_json - .entry(index_id.clone()) - .or_default() - .extend(shards_json); - } } - per_index_shards_json - } - - #[cfg(test)] - pub fn len(&self) -> usize { - self.table.len() } } @@ -498,624 +251,315 @@ impl RoutingTable { mod tests { use quickwit_proto::ingest::ShardState; use quickwit_proto::ingest::ingester::IngesterServiceClient; + use quickwit_proto::types::ShardId; use super::*; + use crate::IngesterPoolEntry; + + fn mocked_ingester(availability_zone: Option<&str>) -> IngesterPoolEntry { + IngesterPoolEntry { + client: IngesterServiceClient::mocked(), + availability_zone: availability_zone.map(|s| s.to_string()), + } + } #[test] - fn test_routing_table_entry_new() { - let self_node_id: NodeId = "test-node-0".into(); - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - let table_entry = RoutingTableEntry::new( - &self_node_id, - index_uid.clone(), - source_id.clone(), - Vec::new(), + fn test_apply_capacity_update() { + let mut table = RoutingTable::default(); + let key = ("test-index".to_string(), "test-source".into()); + + // Insert first node. + table.apply_capacity_update( + "node-1".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 8, + 3, ); - assert_eq!(table_entry.len(), 0); - - let index_uid: IndexUid = IndexUid::for_test("test-index", 0); - let shards = vec![ - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(3)), - shard_state: ShardState::Open as i32, - leader_id: "test-node-0".to_string(), - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - leader_id: "test-node-0".to_string(), - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(2)), - shard_state: ShardState::Open as i32, - leader_id: "test-node-1".to_string(), - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - leader_id: "test-node-0".to_string(), - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(4)), - shard_state: ShardState::Closed as i32, - leader_id: "test-node-0".to_string(), - ..Default::default() - }, - ]; - let table_entry = RoutingTableEntry::new(&self_node_id, index_uid, source_id, shards); - assert_eq!(table_entry.local_shards.len(), 2); - assert_eq!(table_entry.local_shards[0].shard_id, ShardId::from(1)); - assert_eq!(table_entry.local_shards[1].shard_id, ShardId::from(3)); - - assert_eq!(table_entry.remote_shards.len(), 1); - assert_eq!(table_entry.remote_shards[0].shard_id, ShardId::from(2)); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 1); + assert_eq!(entry.nodes.get("node-1").unwrap().capacity_score, 8); + + // Update existing node. + table.apply_capacity_update( + "node-1".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 4, + 5, + ); + let node = table.table.get(&key).unwrap().nodes.get("node-1").unwrap(); + assert_eq!(node.capacity_score, 4); + assert_eq!(node.open_shard_count, 5); + + // Add second node. + table.apply_capacity_update( + "node-2".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 6, + 2, + ); + assert_eq!(table.table.get(&key).unwrap().nodes.len(), 2); + + // Zero shards: node stays in table but becomes ineligible for routing. + table.apply_capacity_update( + "node-1".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 0, + 0, + ); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 2); + assert_eq!(entry.nodes.get("node-1").unwrap().open_shard_count, 0); + assert_eq!(entry.nodes.get("node-1").unwrap().capacity_score, 0); } #[test] - fn test_routing_table_entry_has_open_shards() { - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - let table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone()); - - let mut closed_shard_ids = Vec::new(); - let ingester_pool = IngesterPool::default(); - let mut unavailable_leaders = HashSet::new(); - - assert!(!table_entry.has_open_shards( - &ingester_pool, - &mut closed_shard_ids, - &mut unavailable_leaders - )); - assert!(closed_shard_ids.is_empty()); - assert!(unavailable_leaders.is_empty()); - - ingester_pool.insert( - "test-ingester-0".into(), - crate::IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: None, - }, + fn test_has_open_nodes() { + let mut table = RoutingTable::default(); + let pool = IngesterPool::default(); + + // Empty table. + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + + // Node exists but is not in pool. + table.apply_capacity_update( + "node-1".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 8, + 3, ); - ingester_pool.insert( - "test-ingester-1".into(), - crate::IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: None, - }, + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + + // Node is in pool → true. + pool.insert("node-1".into(), mocked_ingester(None)); + assert!(table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + + // Node is unavailable → false. + let unavailable: HashSet = HashSet::from(["node-1".into()]); + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); + + // Second node available → true despite first being unavailable. + table.apply_capacity_update( + "node-2".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 6, + 2, ); - - let table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Closed, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - ], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: Vec::new(), - remote_round_robin_idx: AtomicUsize::default(), - }; - assert!(table_entry.has_open_shards( - &ingester_pool, - &mut closed_shard_ids, - &mut unavailable_leaders - )); - assert_eq!(closed_shard_ids.len(), 1); - assert_eq!(closed_shard_ids[0], ShardId::from(1)); - assert!(unavailable_leaders.is_empty()); - - closed_shard_ids.clear(); - - let table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id, - local_shards: Vec::new(), - local_round_robin_idx: AtomicUsize::default(), - remote_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Closed, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-2".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(3), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - ], - remote_round_robin_idx: AtomicUsize::default(), - }; - assert!(table_entry.has_open_shards( - &ingester_pool, - &mut closed_shard_ids, - &mut unavailable_leaders - )); - assert_eq!(closed_shard_ids.len(), 1); - assert_eq!(closed_shard_ids[0], ShardId::from(1)); - assert_eq!(unavailable_leaders.len(), 1); - assert!(unavailable_leaders.contains("test-ingester-2")); + pool.insert("node-2".into(), mocked_ingester(None)); + assert!(table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); + + // Node with capacity_score=0 is not eligible. + table.apply_capacity_update( + "node-2".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 0, + 2, + ); + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); } #[test] - fn test_routing_table_entry_next_open_shard_round_robin() { - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - let table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone()); - let ingester_pool = IngesterPool::default(); - let mut rate_limited_shards = HashSet::new(); - - let error = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap_err(); - assert_eq!(error, NextOpenShardError::NoShardsAvailable); - - ingester_pool.insert( - "test-ingester-0".into(), - crate::IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: None, - }, + fn test_pick_node_prefers_same_az() { + let mut table = RoutingTable::new(Some("az-1".to_string())); + let pool = IngesterPool::default(); + + table.apply_capacity_update( + "node-1".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 5, + 1, ); - ingester_pool.insert( - "test-ingester-1".into(), - crate::IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: None, - }, + table.apply_capacity_update( + "node-2".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 5, + 1, ); + pool.insert("node-1".into(), mocked_ingester(Some("az-1"))); + pool.insert("node-2".into(), mocked_ingester(Some("az-2"))); - let table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Closed, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(3), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - ], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: Vec::new(), - remote_round_robin_idx: AtomicUsize::default(), - }; - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(3)); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); - - let table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Closed, - leader_id: "test-ingester-0".into(), - }], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(3), - shard_state: ShardState::Closed, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(4), - shard_state: ShardState::Open, - leader_id: "test-ingester-2".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(5), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - ], - remote_round_robin_idx: AtomicUsize::default(), - }; - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(5)); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); - - rate_limited_shards.insert(ShardId::from(5)); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) + let picked = table + .pick_node("test-index", "test-source", &pool, &HashSet::new()) .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); + assert_eq!(picked.node_id, NodeId::from("node-1")); } #[test] - fn test_routing_table_entry_next_open_shard_round_robin_rate_limited_error() { - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - - let ingester_pool = IngesterPool::default(); - ingester_pool.insert( - "test-ingester-0".into(), - crate::IngesterPoolEntry { - client: IngesterServiceClient::mocked(), - availability_zone: None, - }, + fn test_pick_node_falls_back_to_cross_az() { + let mut table = RoutingTable::new(Some("az-1".to_string())); + let pool = IngesterPool::default(); + + table.apply_capacity_update( + "node-2".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 5, + 1, ); + pool.insert("node-2".into(), mocked_ingester(Some("az-2"))); - let rate_limited_shards = HashSet::from_iter([ShardId::from(1)]); - - let table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: Vec::new(), - remote_round_robin_idx: AtomicUsize::default(), - }; - let error = table_entry - .next_open_shard_round_robin(&ingester_pool, &rate_limited_shards) - .unwrap_err(); - assert_eq!(error, NextOpenShardError::RateLimited); + let picked = table + .pick_node("test-index", "test-source", &pool, &HashSet::new()) + .unwrap(); + assert_eq!(picked.node_id, NodeId::from("node-2")); } #[test] - fn test_routing_table_entry_insert_open_shards() { - let index_uid_0 = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - let mut table_entry = RoutingTableEntry::empty(index_uid_0.clone(), source_id.clone()); - - let local_node_id: NodeId = "test-ingester-0".into(); - let remote_node_id: NodeId = "test-ingester-1".into(); - table_entry.insert_open_shards(&local_node_id, &local_node_id, &index_uid_0, &[]); - - assert_eq!(table_entry.local_shards.len(), 0); - assert_eq!(table_entry.remote_shards.len(), 0); - - table_entry.insert_open_shards( - &local_node_id, - &local_node_id, - &index_uid_0, - &[ShardId::from(2)], + fn test_pick_node_no_az_awareness() { + let mut table = RoutingTable::default(); + let pool = IngesterPool::default(); + + table.apply_capacity_update( + "node-1".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 5, + 1, ); + pool.insert("node-1".into(), mocked_ingester(Some("az-1"))); - assert_eq!(table_entry.local_shards.len(), 1); - assert_eq!(table_entry.remote_shards.len(), 0); - - assert_eq!(table_entry.local_shards[0].index_uid, index_uid_0); - assert_eq!(table_entry.local_shards[0].source_id, source_id); - assert_eq!(table_entry.local_shards[0].shard_id, ShardId::from(2)); - assert_eq!(table_entry.local_shards[0].shard_state, ShardState::Open); - assert_eq!(table_entry.local_shards[0].leader_id, local_node_id); + let picked = table + .pick_node("test-index", "test-source", &pool, &HashSet::new()) + .unwrap(); + assert_eq!(picked.node_id, NodeId::from("node-1")); + } - table_entry.local_shards[0].shard_state = ShardState::Closed; - table_entry.insert_open_shards( - &local_node_id, - &local_node_id, - &index_uid_0, - &[ShardId::from(1), ShardId::from(2)], + #[test] + fn test_pick_node_missing_entry() { + let table = RoutingTable::new(Some("az-1".to_string())); + let pool = IngesterPool::default(); + + assert!( + table + .pick_node("nonexistent", "source", &pool, &HashSet::new()) + .is_none() ); + } - assert_eq!(table_entry.local_shards.len(), 2); - assert_eq!(table_entry.remote_shards.len(), 0); - - assert_eq!(table_entry.local_shards[0].shard_id, ShardId::from(1)); - assert_eq!(table_entry.local_shards[0].shard_state, ShardState::Open); - assert_eq!(table_entry.local_shards[1].shard_id, ShardId::from(2)); - assert_eq!(table_entry.local_shards[1].shard_state, ShardState::Closed); - - table_entry.local_shards.clear(); - table_entry.insert_open_shards( - &local_node_id, - &remote_node_id, - &index_uid_0, - &[ShardId::from(2)], - ); + #[test] + fn test_power_of_two_choices() { + // 3 candidates: best appears in the random pair 2/3 of the time and always + // wins when it does, so it should win ~67% of 1000 runs. Asserting > 550 + // is ~7.5 standard deviations from the mean — effectively impossible to flake. + let high = IngesterNode { + node_id: "high".into(), + index_uid: IndexUid::for_test("idx", 0), + source_id: "src".into(), + capacity_score: 9, + open_shard_count: 2, + }; + let mid = IngesterNode { + node_id: "mid".into(), + index_uid: IndexUid::for_test("idx", 0), + source_id: "src".into(), + capacity_score: 5, + open_shard_count: 2, + }; + let low = IngesterNode { + node_id: "low".into(), + index_uid: IndexUid::for_test("idx", 0), + source_id: "src".into(), + capacity_score: 1, + open_shard_count: 2, + }; + let candidates: Vec<&IngesterNode> = vec![&high, &mid, &low]; - assert_eq!(table_entry.local_shards.len(), 0); - assert_eq!(table_entry.remote_shards.len(), 1); + let mut high_wins = 0; + for _ in 0..1000 { + if power_of_two_choices(&candidates).node_id == "high" { + high_wins += 1; + } + } + assert!(high_wins > 550, "high won only {high_wins}/1000 times"); + } - assert_eq!(table_entry.remote_shards[0].index_uid, index_uid_0); - assert_eq!(table_entry.remote_shards[0].source_id, source_id); - assert_eq!(table_entry.remote_shards[0].shard_id, ShardId::from(2)); - assert_eq!(table_entry.remote_shards[0].shard_state, ShardState::Open); - assert_eq!(table_entry.remote_shards[0].leader_id, remote_node_id); + #[test] + fn test_merge_from_shards() { + let mut table = RoutingTable::default(); + let index_uid = IndexUid::for_test("test-index", 0); + let key = ("test-index".to_string(), "test-source".to_string()); + + let make_shard = |id: u64, leader: &str, open: bool| Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(id)), + shard_state: if open { + ShardState::Open as i32 + } else { + ShardState::Closed as i32 + }, + leader_id: leader.to_string(), + ..Default::default() + }; - table_entry.remote_shards[0].shard_state = ShardState::Closed; - table_entry.insert_open_shards( - &local_node_id, - &remote_node_id, - &index_uid_0, - &[ShardId::from(1), ShardId::from(2)], - ); + // Two open shards on node-1, one open + one closed on node-2, only closed on node-3. + let shards = vec![ + make_shard(1, "node-1", true), + make_shard(2, "node-1", true), + make_shard(3, "node-2", true), + make_shard(4, "node-2", false), + make_shard(5, "node-3", false), + ]; + table.merge_from_shards(index_uid.clone(), "test-source".into(), shards); - assert_eq!(table_entry.local_shards.len(), 0); - assert_eq!(table_entry.remote_shards.len(), 2); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 3); - assert_eq!(table_entry.remote_shards[0].shard_id, ShardId::from(1)); - assert_eq!(table_entry.remote_shards[0].shard_state, ShardState::Open); - assert_eq!(table_entry.remote_shards[1].shard_id, ShardId::from(2)); - assert_eq!(table_entry.remote_shards[1].shard_state, ShardState::Closed); + let n1 = entry.nodes.get("node-1").unwrap(); + assert_eq!(n1.open_shard_count, 2); + assert_eq!(n1.capacity_score, 5); - // Update index incarnation. - let index_uid_1 = IndexUid::for_test("test-index", 1); - table_entry.insert_open_shards( - &local_node_id, - &local_node_id, - &index_uid_1, - &[ShardId::from(1)], - ); + let n2 = entry.nodes.get("node-2").unwrap(); + assert_eq!(n2.open_shard_count, 1); - assert_eq!(table_entry.index_uid, index_uid_1); - assert_eq!(table_entry.local_shards.len(), 1); - assert_eq!(table_entry.remote_shards.len(), 0); + let n3 = entry.nodes.get("node-3").unwrap(); + assert_eq!(n3.open_shard_count, 0); - assert_eq!(table_entry.local_shards[0].index_uid, index_uid_1); - assert_eq!(table_entry.local_shards[0].source_id, source_id); - assert_eq!(table_entry.local_shards[0].shard_id, ShardId::from(1)); - assert_eq!(table_entry.local_shards[0].shard_state, ShardState::Open); - assert_eq!(table_entry.local_shards[0].leader_id, local_node_id); + // Merging again adds new nodes but preserves existing ones. + let shards = vec![make_shard(10, "node-4", true)]; + table.merge_from_shards(index_uid, "test-source".into(), shards); - // Ignore previous index incarnation. - table_entry.insert_open_shards( - &local_node_id, - &local_node_id, - &index_uid_0, - &[ShardId::from(12), ShardId::from(42), ShardId::from(1337)], - ); - assert_eq!(table_entry.index_uid, index_uid_1); - assert_eq!(table_entry.local_shards.len(), 1); - assert_eq!(table_entry.remote_shards.len(), 0); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 4); + assert!(entry.nodes.contains_key("node-1")); + assert!(entry.nodes.contains_key("node-2")); + assert!(entry.nodes.contains_key("node-3")); + assert!(entry.nodes.contains_key("node-4")); } #[test] - fn test_routing_table_entry_close_shards() { - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - - let mut table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone()); - table_entry.close_shards(&index_uid, &[]); - table_entry.close_shards(&index_uid, &[ShardId::from(1)]); - assert!(table_entry.local_shards.is_empty()); - assert!(table_entry.remote_shards.is_empty()); - - let mut table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(3), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - ], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(5), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(6), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(7), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - ], - remote_round_robin_idx: AtomicUsize::default(), - }; - table_entry.close_shards( - &index_uid, - &[ - ShardId::from(1), - ShardId::from(3), - ShardId::from(4), - ShardId::from(6), - ShardId::from(8), - ], + fn test_classify_az_locality() { + let table = RoutingTable::new(Some("az-1".to_string())); + let pool = IngesterPool::default(); + pool.insert("node-local".into(), mocked_ingester(Some("az-1"))); + pool.insert("node-remote".into(), mocked_ingester(Some("az-2"))); + pool.insert("node-no-az".into(), mocked_ingester(None)); + + assert_eq!( + table.classify_az_locality(&"node-local".into(), &pool), + "same_az" + ); + assert_eq!( + table.classify_az_locality(&"node-remote".into(), &pool), + "cross_az" + ); + assert_eq!( + table.classify_az_locality(&"node-no-az".into(), &pool), + "az_unaware" ); - assert!(table_entry.local_shards[0].shard_state.is_closed()); - assert!(table_entry.local_shards[1].shard_state.is_open()); - assert!(table_entry.local_shards[2].shard_state.is_closed()); - assert!(table_entry.remote_shards[0].shard_state.is_open()); - assert!(table_entry.remote_shards[1].shard_state.is_closed()); - assert!(table_entry.remote_shards[2].shard_state.is_open()); - } - - #[test] - fn test_routing_table_entry_delete_shards() { - let index_uid = IndexUid::for_test("test-index", 0); - let source_id: SourceId = "test-source".into(); - - let mut table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone()); - table_entry.delete_shards(&index_uid, &[]); - table_entry.delete_shards(&index_uid, &[ShardId::from(1)]); - assert!(table_entry.local_shards.is_empty()); - assert!(table_entry.remote_shards.is_empty()); - let mut table_entry = RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - local_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(3), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - ], - local_round_robin_idx: AtomicUsize::default(), - remote_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(5), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(6), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(7), - shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), - }, - ], - remote_round_robin_idx: AtomicUsize::default(), - }; - table_entry.delete_shards( - &index_uid, - &[ - ShardId::from(1), - ShardId::from(3), - ShardId::from(4), - ShardId::from(6), - ShardId::from(8), - ], + let table_no_az = RoutingTable::default(); + assert_eq!( + table_no_az.classify_az_locality(&"node-local".into(), &pool), + "az_unaware" ); - assert_eq!(table_entry.local_shards.len(), 1); - assert_eq!(table_entry.local_shards[0].shard_id, ShardId::from(2)); - assert_eq!(table_entry.remote_shards.len(), 2); - assert_eq!(table_entry.remote_shards[0].shard_id, ShardId::from(5)); - assert_eq!(table_entry.remote_shards[1].shard_id, ShardId::from(7)); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index e158bce7c58..b77de8d608a 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -35,7 +35,7 @@ use tracing::{error, info}; use super::models::IngesterShard; use super::rate_meter::RateMeter; use super::replication::{ReplicationStreamTaskHandle, ReplicationTaskHandle}; -use super::wal_capacity_timeseries::WalDiskCapacityTimeSeries; +use super::wal_capacity_tracker::WalCapacityTracker; use crate::ingest_v2::mrecordlog_utils::{force_delete_queue, queue_position_range}; use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{FollowerId, LeaderId, OpenShardCounts}; @@ -61,7 +61,7 @@ pub(super) struct InnerIngesterState { pub replication_streams: HashMap, // Replication tasks running for each replication stream opened with leaders. pub replication_tasks: HashMap, - pub wal_capacity_time_series: WalDiskCapacityTimeSeries, + pub wal_capacity_tracker: WalCapacityTracker, status: IngesterStatus, status_tx: watch::Sender, } @@ -130,7 +130,7 @@ impl InnerIngesterState { } impl IngesterState { - fn new(disk_capacity: ByteSize) -> Self { + fn new(disk_capacity: ByteSize, memory_capacity: ByteSize) -> Self { let status = IngesterStatus::Initializing; let (status_tx, status_rx) = watch::channel(status); let inner = InnerIngesterState { @@ -138,7 +138,7 @@ impl IngesterState { doc_mappers: Default::default(), replication_streams: Default::default(), replication_tasks: Default::default(), - wal_capacity_time_series: WalDiskCapacityTimeSeries::new(disk_capacity), + wal_capacity_tracker: WalCapacityTracker::new(disk_capacity, memory_capacity), status, status_tx, }; @@ -155,9 +155,10 @@ impl IngesterState { pub fn load( wal_dir_path: &Path, disk_capacity: ByteSize, + memory_capacity: ByteSize, rate_limiter_settings: RateLimiterSettings, ) -> Self { - let state = Self::new(disk_capacity); + let state = Self::new(disk_capacity, memory_capacity); let state_clone = state.clone(); let wal_dir_path = wal_dir_path.to_path_buf(); @@ -180,6 +181,7 @@ impl IngesterState { let mut state = IngesterState::load( temp_dir.path(), disk_capacity, + ByteSize::mb(256), RateLimiterSettings::default(), ); @@ -530,7 +532,7 @@ mod tests { #[tokio::test] async fn test_ingester_state_does_not_lock_while_initializing() { - let state = IngesterState::new(ByteSize::mb(256)); + let state = IngesterState::new(ByteSize::mb(256), ByteSize::mb(256)); let inner_guard = state.inner.lock().await; assert_eq!(inner_guard.status(), IngesterStatus::Initializing); @@ -545,7 +547,7 @@ mod tests { #[tokio::test] async fn test_ingester_state_failed() { - let state = IngesterState::new(ByteSize::mb(256)); + let state = IngesterState::new(ByteSize::mb(256), ByteSize::mb(256)); state.inner.lock().await.set_status(IngesterStatus::Failed); @@ -558,7 +560,7 @@ mod tests { #[tokio::test] async fn test_ingester_state_init() { - let mut state = IngesterState::new(ByteSize::mb(256)); + let mut state = IngesterState::new(ByteSize::mb(256), ByteSize::mb(256)); let temp_dir = tempfile::tempdir().unwrap(); state diff --git a/quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_timeseries.rs b/quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_tracker.rs similarity index 76% rename from quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_timeseries.rs rename to quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_tracker.rs index 58f030cbf74..f24e8254053 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_timeseries.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/wal_capacity_tracker.rs @@ -25,38 +25,36 @@ const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; /// reading would be discarded when the next reading is inserted. const WAL_CAPACITY_READINGS_LEN: usize = WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1; -pub struct WalDiskCapacityTimeSeries { - disk_capacity: ByteSize, +struct WalCapacityTimeSeries { + capacity: ByteSize, readings: RingBuffer, } -impl WalDiskCapacityTimeSeries { - pub fn new(disk_capacity: ByteSize) -> Self { +impl WalCapacityTimeSeries { + fn new(capacity: ByteSize) -> Self { #[cfg(not(test))] - assert!(disk_capacity.as_u64() > 0); + assert!(capacity.as_u64() > 0); Self { - disk_capacity, + capacity, readings: RingBuffer::default(), } } - /// Records a disk usage reading and returns the resulting capacity score. - pub fn record_and_score(&mut self, disk_used: ByteSize) -> usize { - self.record(disk_used); + fn record_and_score(&mut self, used: ByteSize) -> usize { + self.record(used); let remaining = self.current().unwrap_or(1.0); let delta = self.delta().unwrap_or(0.0); compute_capacity_score(remaining, delta) } - /// Computes a capacity score for the given disk usage without recording it. - pub fn score(&self, disk_used: ByteSize) -> usize { - let remaining = 1.0 - (disk_used.as_u64() as f64 / self.disk_capacity.as_u64() as f64); + fn score(&self, used: ByteSize) -> usize { + let remaining = 1.0 - (used.as_u64() as f64 / self.capacity.as_u64() as f64); let delta = self.delta().unwrap_or(0.0); compute_capacity_score(remaining, delta) } - fn record(&mut self, disk_used: ByteSize) { - let remaining = 1.0 - (disk_used.as_u64() as f64 / self.disk_capacity.as_u64() as f64); + fn record(&mut self, used: ByteSize) { + let remaining = 1.0 - (used.as_u64() as f64 / self.capacity.as_u64() as f64); self.readings.push_back(remaining.clamp(0.0, 1.0)); } @@ -64,8 +62,6 @@ impl WalDiskCapacityTimeSeries { self.readings.last() } - /// How much remaining capacity changed between the oldest and newest readings. - /// Positive = improving, negative = draining. fn delta(&self) -> Option { let current = self.readings.last()?; let oldest = self.readings.front()?; @@ -73,6 +69,35 @@ impl WalDiskCapacityTimeSeries { } } +pub struct WalCapacityTracker { + disk: WalCapacityTimeSeries, + memory: WalCapacityTimeSeries, +} + +impl WalCapacityTracker { + pub fn new(disk_capacity: ByteSize, memory_capacity: ByteSize) -> Self { + Self { + disk: WalCapacityTimeSeries::new(disk_capacity), + memory: WalCapacityTimeSeries::new(memory_capacity), + } + } + + /// Records disk and memory usage readings and returns the resulting capacity score. + /// The score is the minimum of the individual disk and memory scores. + pub fn record_and_score(&mut self, disk_used: ByteSize, memory_used: ByteSize) -> usize { + let disk_score = self.disk.record_and_score(disk_used); + let memory_score = self.memory.record_and_score(memory_used); + disk_score.min(memory_score) + } + + /// Computes a capacity score for the given usage without recording it. + pub fn score(&self, disk_used: ByteSize, memory_used: ByteSize) -> usize { + let disk_score = self.disk.score(disk_used); + let memory_score = self.memory.score(memory_used); + disk_score.min(memory_score) + } +} + /// Computes a capacity score from 0 to 10 using a PD controller. /// /// The score has two components: @@ -115,18 +140,18 @@ fn compute_capacity_score(remaining_capacity: f64, capacity_delta: f64) -> usize mod tests { use super::*; - fn ts() -> WalDiskCapacityTimeSeries { - WalDiskCapacityTimeSeries::new(ByteSize::b(100)) + fn ts() -> WalCapacityTimeSeries { + WalCapacityTimeSeries::new(ByteSize::b(100)) } /// Helper: record a reading with `used` bytes against the series' fixed capacity. - fn record(series: &mut WalDiskCapacityTimeSeries, used: u64) { + fn record(series: &mut WalCapacityTimeSeries, used: u64) { series.record(ByteSize::b(used)); } #[test] fn test_wal_disk_capacity_current_after_record() { - let mut series = WalDiskCapacityTimeSeries::new(ByteSize::b(256)); + let mut series = WalCapacityTimeSeries::new(ByteSize::b(256)); // 192 of 256 used => 25% remaining series.record(ByteSize::b(192)); assert_eq!(series.current(), Some(0.25)); @@ -211,4 +236,14 @@ mod tests { record(&mut series, 0); assert_eq!(series.delta(), Some(0.50)); } + + #[test] + fn test_wal_capacity_tracker_returns_min() { + let mut tracker = WalCapacityTracker::new(ByteSize::b(100), ByteSize::b(100)); + // Disk 10% used (score 9), memory 90% used (score 2) → returns 2. + assert_eq!( + tracker.record_and_score(ByteSize::b(10), ByteSize::b(90)), + 2 + ); + } }