From c9129602907516d2307b0e75779ef088a2b0c684 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 16 Jun 2026 11:28:12 -0400 Subject: [PATCH 1/3] Give indexing plan a time-based ID; use it to tiebreak publish tokens --- .../src/indexing_scheduler/mod.rs | 11 ++- .../src/actors/indexing_pipeline.rs | 7 ++ .../src/actors/indexing_service.rs | 22 +++-- .../src/actors/parquet_pipeline/pipeline.rs | 7 ++ .../src/source/ingest/mod.rs | 94 ++++++++++++++----- quickwit/quickwit-indexing/src/source/mod.rs | 8 +- .../file_backed/file_backed_index/shards.rs | 43 ++++++++- .../postgres/queries/shards/acquire.sql | 8 ++ .../quickwit-metastore/src/tests/shard.rs | 86 +++++++++++++++++ .../protos/quickwit/indexing.proto | 5 + .../src/codegen/quickwit/quickwit.indexing.rs | 6 ++ 11 files changed, 264 insertions(+), 33 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index c929f82f725..a704517df83 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -35,6 +35,7 @@ use quickwit_proto::types::NodeId; use scheduling::{SourceToSchedule, SourceToScheduleType}; use serde::Serialize; use tracing::{debug, info, warn}; +use ulid::Ulid; use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier}; @@ -414,11 +415,16 @@ impl IndexingScheduler { ) { debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan"); APPLY_PLAN_TOTAL.inc(); + // ULIDs are time-ordered, so a later application always yields a greater id. Indexers use + // it as the publish token for the shards they acquire. This guarantees a stale plan + // can never steal a shard from a more recent one. + let indexing_plan_id = Ulid::new().to_string(); for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() { // We don't want to block on a slow indexer so we apply this change asynchronously // TODO not blocking is cool, but we need to make sure there is not accumulation // possible here. let notify_on_drop = notify_on_drop.clone(); + let indexing_plan_id = indexing_plan_id.clone(); tokio::spawn({ let indexer = indexers .iter() @@ -430,7 +436,10 @@ impl IndexingScheduler { if let Err(error) = indexer .client .clone() - .apply_indexing_plan(ApplyIndexingPlanRequest { indexing_tasks }) + .apply_indexing_plan(ApplyIndexingPlanRequest { + indexing_tasks, + indexing_plan_id, + }) .await { warn!( diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 9d6287245ac..a73ab904b69 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -89,6 +89,9 @@ pub struct IndexingPipeline { // requiring a respawn of the pipeline. // We keep the list of shards here however, to reassign them after a respawn. shard_ids: BTreeSet, + // Id of the last indexing plan assigned to this pipeline. Kept here, like `shard_ids`, so it + // can be re-sent to the source on respawn; the source adopts it as its publish token. + indexing_plan_id: String, _indexing_pipelines_gauge_guard: GaugeGuard, } @@ -137,6 +140,7 @@ impl IndexingPipeline { ..Default::default() }, shard_ids: Default::default(), + indexing_plan_id: String::new(), _indexing_pipelines_gauge_guard: indexing_pipelines_gauge_guard, } } @@ -402,6 +406,7 @@ impl IndexingPipeline { .spawn(actor_source); let assign_shards_message = AssignShards(Assignment { shard_ids: self.shard_ids.clone(), + indexing_plan_id: self.indexing_plan_id.clone(), }); source_mailbox.send_message(assign_shards_message).await?; @@ -496,6 +501,8 @@ impl Handler for IndexingPipeline { ) -> Result<(), ActorExitStatus> { self.shard_ids .clone_from(&assign_shards_message.0.shard_ids); + self.indexing_plan_id + .clone_from(&assign_shards_message.0.indexing_plan_id); // If the pipeline is running, we forward the message to its source. // If it is not, it will be respawned soon, and the shards will be assigned afterward. if let Some(handles) = &self.handles_opt { diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 4137ac0c556..a82442c918d 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -760,8 +760,8 @@ impl IndexingService { /// or not. /// /// If a pipeline actor has failed, this function just logs an error. - async fn assign_shards_to_pipelines(&mut self, tasks: &[IndexingTask]) { - for task in tasks { + async fn assign_shards_to_pipelines(&mut self, plan_request: &ApplyIndexingPlanRequest) { + for task in &plan_request.indexing_tasks { if task.shard_ids.is_empty() { continue; } @@ -771,6 +771,7 @@ impl IndexingService { }; let assignment = Assignment { shard_ids: task.shard_ids.iter().cloned().collect(), + indexing_plan_id: plan_request.indexing_plan_id.clone(), }; let message = AssignShards(assignment); @@ -785,9 +786,10 @@ impl IndexingService { /// - Starting the pipelines that are not running. async fn apply_indexing_plan( &mut self, - tasks: &[IndexingTask], + plan_request: ApplyIndexingPlanRequest, ctx: &ActorContext, ) -> Result<(), IndexingError> { + let tasks = &plan_request.indexing_tasks; let pipeline_diff = self.compute_pipeline_diff(tasks); if !pipeline_diff.pipelines_to_shutdown.is_empty() { @@ -801,7 +803,7 @@ impl IndexingService { .spawn_pipelines(&pipeline_diff.pipelines_to_spawn, ctx) .await?; } - self.assign_shards_to_pipelines(tasks).await; + self.assign_shards_to_pipelines(&plan_request).await; self.update_chitchat_running_plan().await; if !spawn_pipeline_failures.is_empty() { @@ -1135,7 +1137,7 @@ impl Handler for IndexingService { ctx: &ActorContext, ) -> Result { Ok(self - .apply_indexing_plan(&plan_request.indexing_tasks, ctx) + .apply_indexing_plan(plan_request, ctx) .await .map(|_| ApplyIndexingPlanResponse {})) } @@ -1465,7 +1467,10 @@ mod tests { }, ]; indexing_service - .ask_for_res(ApplyIndexingPlanRequest { indexing_tasks }) + .ask_for_res(ApplyIndexingPlanRequest { + indexing_tasks, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }) .await .unwrap(); assert_eq!( @@ -1531,6 +1536,7 @@ mod tests { indexing_service .ask_for_res(ApplyIndexingPlanRequest { indexing_tasks: indexing_tasks.clone(), + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), }) .await .unwrap(); @@ -1587,6 +1593,7 @@ mod tests { indexing_service .ask_for_res(ApplyIndexingPlanRequest { indexing_tasks: indexing_tasks.clone(), + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), }) .await .unwrap(); @@ -1646,6 +1653,7 @@ mod tests { indexing_service .ask_for_res(ApplyIndexingPlanRequest { indexing_tasks: indexing_tasks.clone(), + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), }) .await .unwrap(); @@ -1665,6 +1673,7 @@ mod tests { indexing_service .ask_for_res(ApplyIndexingPlanRequest { indexing_tasks: Vec::new(), + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), }) .await .unwrap(); @@ -2072,6 +2081,7 @@ mod tests { params_fingerprint: 0, }, ], + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), }) .await .unwrap(); diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/pipeline.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/pipeline.rs index b0b7f869d40..26d3b23fea9 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/pipeline.rs @@ -114,6 +114,9 @@ pub struct MetricsPipeline { handles_opt: Option, kill_switch: KillSwitch, shard_ids: BTreeSet, + // Id of the last indexing plan assigned to this pipeline. Kept here, like `shard_ids`, so it + // can be re-sent to the source on respawn; the source adopts it as its publish token. + indexing_plan_id: String, _indexing_pipelines_gauge_guard: GaugeGuard, } @@ -163,6 +166,7 @@ impl MetricsPipeline { ..Default::default() }, shard_ids: Default::default(), + indexing_plan_id: String::new(), _indexing_pipelines_gauge_guard: indexing_pipelines_gauge_guard, } } @@ -447,6 +451,7 @@ impl MetricsPipeline { .spawn(actor_source); let assign_shards_message = AssignShards(Assignment { shard_ids: self.shard_ids.clone(), + indexing_plan_id: self.indexing_plan_id.clone(), }); source_mailbox.send_message(assign_shards_message).await?; @@ -539,6 +544,8 @@ impl Handler for MetricsPipeline { ) -> Result<(), ActorExitStatus> { self.shard_ids .clone_from(&assign_shards_message.0.shard_ids); + self.indexing_plan_id + .clone_from(&assign_shards_message.0.indexing_plan_id); if let Some(handles) = &self.handles_opt { info!( shard_ids=?assign_shards_message.0.shard_ids, diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index b6703198b6b..5502332e8d2 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -43,10 +43,9 @@ use serde::Serialize; use serde_json::json; use tokio::time; use tracing::{debug, error, info, warn}; -use ulid::Ulid; use super::{ - BATCH_NUM_BYTES_LIMIT, BatchBuilder, EMIT_BATCHES_TIMEOUT, Source, SourceContext, + Assignment, BATCH_NUM_BYTES_LIMIT, BatchBuilder, EMIT_BATCHES_TIMEOUT, Source, SourceContext, SourceRuntime, SourceSink, TypedSourceFactory, }; use crate::models::{LocalShardPositionsUpdate, NewPublishLock, NewPublishToken, PublishLock}; @@ -101,11 +100,6 @@ impl ClientId { pipeline_uid, } } - - fn new_publish_token(&self) -> String { - let ulid = if cfg!(test) { Ulid::nil() } else { Ulid::new() }; - format!("{self}/{ulid}") - } } #[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize)] @@ -176,9 +170,10 @@ impl IngestSource { retry_params, ); // We start as dead. The first reset with a non-empty list of shards will create an alive - // publish lock. + // publish lock. The publish token is left empty until then: the first reset adopts the + // indexing plan id carried by the assignment. let publish_lock = PublishLock::dead(); - let publish_token = client_id.new_publish_token(); + let publish_token = PublishToken::new(); Ok(IngestSource { client_id, @@ -393,6 +388,7 @@ impl IngestSource { async fn reset_if_needed( &mut self, new_assigned_shard_ids: &BTreeSet, + indexing_plan_id: &str, source_sink: &SourceSink, ctx: &SourceContext, ) -> anyhow::Result<()> { @@ -439,7 +435,7 @@ impl IngestSource { self.fetch_stream.reset(); self.publish_lock.kill().await; self.publish_lock = PublishLock::default(); - self.publish_token = self.client_id.new_publish_token(); + self.publish_token = indexing_plan_id.to_string(); source_sink .send_publish_lock(NewPublishLock(self.publish_lock.clone()), ctx) .await?; @@ -504,11 +500,15 @@ impl Source for IngestSource { async fn assign_shards( &mut self, - new_assigned_shard_ids: BTreeSet, + assignment: Assignment, source_sink: &SourceSink, ctx: &SourceContext, ) -> anyhow::Result<()> { - self.reset_if_needed(&new_assigned_shard_ids, source_sink, ctx) + let Assignment { + shard_ids: new_assigned_shard_ids, + indexing_plan_id, + } = assignment; + self.reset_if_needed(&new_assigned_shard_ids, &indexing_plan_id, source_sink, ctx) .await?; // As enforced by `reset_if_needed`, at this point, all currently assigned shards should be @@ -963,21 +963,36 @@ mod tests { let shard_ids: BTreeSet = once(0).map(ShardId::from).collect(); let publish_lock = source.publish_lock.clone(); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); assert_eq!(sequence_rx.recv().await.unwrap(), 1); assert!(!publish_lock.is_alive()); assert!(source.publish_lock.is_alive()); - assert!(!source.publish_token.is_empty()); + // The first reset adopts the assignment's indexing plan id as the publish token. + assert_eq!(source.publish_token, "01ARZ3NDEKTSV4RRFFQ69G5FAV"); // We assign [0,1] (previously [0]). This should just add the shard 1. // The stream does not need to be reset. let shard_ids: BTreeSet = (0..2).map(ShardId::from).collect(); let publish_lock = source.publish_lock.clone(); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); assert_eq!(sequence_rx.recv().await.unwrap(), 2); @@ -990,7 +1005,14 @@ mod tests { let shard_ids: BTreeSet = (1..3).map(ShardId::from).collect(); let publish_lock = source.publish_lock.clone(); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); @@ -1006,12 +1028,12 @@ mod tests { .unwrap(); assert_ne!(&source.publish_lock, &publish_lock); - // assert!(publish_token != source.publish_token); - let NewPublishToken(publish_token) = doc_processor_inbox .recv_typed_message::() .await .unwrap(); + // On reset the source re-adopts the assignment's plan id and forwards it downstream. + assert_eq!(publish_token, "01ARZ3NDEKTSV4RRFFQ69G5FAV"); assert_eq!(source.publish_token, publish_token); assert_eq!(source.assigned_shards.len(), 2); @@ -1169,7 +1191,14 @@ mod tests { BTreeSet::from_iter([ShardId::from(1), ShardId::from(2)]); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); @@ -1340,7 +1369,14 @@ mod tests { // In this scenario, the indexer will only be able to acquire shard 1. source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); @@ -1613,7 +1649,14 @@ mod tests { let shard_ids: BTreeSet = BTreeSet::from_iter([ShardId::from(1)]); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); @@ -1908,7 +1951,14 @@ mod tests { }); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 8b4db0844ad..83bd315a098 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -266,7 +266,7 @@ pub trait Source: Send + 'static { /// plane. async fn assign_shards( &mut self, - _shard_ids: BTreeSet, + _assignment: Assignment, _source_sink: &SourceSink, _ctx: &SourceContext, ) -> anyhow::Result<()> { @@ -337,6 +337,8 @@ struct Loop; #[derive(Debug)] pub struct Assignment { pub shard_ids: BTreeSet, + /// ULID of the originating indexing plan, used as the publish token when (re)acquiring shards. + pub indexing_plan_id: String, } #[derive(Debug)] @@ -402,9 +404,9 @@ impl Handler for SourceActor { assign_shards_message: AssignShards, ctx: &SourceContext, ) -> Result<(), ActorExitStatus> { - let AssignShards(Assignment { shard_ids }) = assign_shards_message; + let AssignShards(assignment) = assign_shards_message; self.source - .assign_shards(shard_ids, &self.source_sink, ctx) + .assign_shards(assignment, &self.source_sink, ctx) .await?; Ok(()) } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs index c30a27ea101..2dac6524e51 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs @@ -25,7 +25,7 @@ use quickwit_proto::metastore::{ }; use quickwit_proto::types::{IndexUid, Position, PublishToken, ShardId, SourceId, queue_id}; use time::OffsetDateTime; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use crate::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta}; use crate::file_backed::MutationOccurred; @@ -51,6 +51,16 @@ impl fmt::Debug for Shards { } } +/// Whether a shard recording `existing_token` can be acquired by a pipeline presenting +/// `presented_token`. Acquisition is monotonic; a missing or legacy (`'/'`-containing) token ranks +/// below any ULID. +fn can_acquire_shard(existing_token: &str, presented_token: &str) -> bool { + if existing_token.is_empty() || existing_token.contains('/') { + return true; + } + !presented_token.contains('/') && presented_token >= existing_token +} + impl Shards { pub(super) fn empty(index_uid: IndexUid, source_id: SourceId) -> Self { Self { @@ -164,6 +174,17 @@ impl Shards { for shard_id in &request.shard_ids { if let Some(shard) = self.shards.get_mut(shard_id) { + if !can_acquire_shard(shard.publish_token(), &request.publish_token) { + error!( + index_uid=%self.index_uid, + source_id=%self.source_id, + %shard_id, + existing_publish_token=%shard.publish_token(), + publish_token=%request.publish_token, + "cannot acquire shard held by a more recent publish token" + ); + continue; + } if shard.publish_token() != request.publish_token { shard.publish_token = Some(request.publish_token.clone()); mutation_occurred = true; @@ -535,6 +556,26 @@ mod tests { ); } + #[test] + fn test_can_acquire_shard() { + const OLDER: &str = "01000000000000000000000000"; + const NEWER: &str = "02000000000000000000000000"; + const LEGACY: &str = "indexer/node/index:0/source/01000000000000000000000000"; + + // No token recorded yet: free to acquire. + assert!(can_acquire_shard("", NEWER)); + // A legacy (pre-ULID) recorded token is always superseded by a ULID. + assert!(can_acquire_shard(LEGACY, NEWER)); + // A newer ULID supersedes an older one. + assert!(can_acquire_shard(OLDER, NEWER)); + // The same ULID re-acquires (e.g. after a local respawn). + assert!(can_acquire_shard(NEWER, NEWER)); + // An older ULID cannot steal a shard owned by a newer one. + assert!(!can_acquire_shard(NEWER, OLDER)); + // A legacy presented token cannot supersede a recorded ULID. + assert!(!can_acquire_shard(NEWER, LEGACY)); + } + #[test] fn test_delete_shards() { let index_uid = IndexUid::for_test("test-index", 0); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql index 740235a3851..4ab98021321 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql @@ -6,5 +6,13 @@ WHERE index_uid = $1 AND source_id = $2 AND shard_id = ANY ($3) + -- Monotonic acquisition: the presented token ($4) must rank >= the recorded one. + AND ( + publish_token IS NULL -- never acquired: free to take + OR publish_token = '' -- empty placeholder: free to take + OR publish_token LIKE '%/%' -- legacy pre-ULID token: ranks below any ULID, so superseded + -- both are ULIDs: take it only if ours is newer-or-equal (and ours is itself a ULID) + OR ($4 NOT LIKE '%/%' AND $4 >= publish_token) + ) RETURNING * diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index f781a2e24ab..a6c8c99e845 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -334,6 +334,92 @@ pub async fn test_metastore_acquire_shards< assert_eq!(shard.publish_position_inclusive(), Position::Beginning); assert_eq!(shard.publish_token(), "test-publish-token-foo"); + // A token ranking below the recorded one is refused: the shard is left untouched and absent + // from the response. + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(1)], + publish_token: "test-publish-token-aaa".to_string(), + }) + .await + .unwrap(); + assert!(acquire_shards_response.acquired_shards.is_empty()); + + // The same token re-acquires successfully (idempotent, e.g. after a local respawn). + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(1)], + publish_token: "test-publish-token-foo".to_string(), + }) + .await + .unwrap(); + assert_eq!(acquire_shards_response.acquired_shards.len(), 1); + assert_eq!( + acquire_shards_response.acquired_shards[0].publish_token(), + "test-publish-token-foo" + ); + + // A strictly greater token takes the shard over. + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(1)], + publish_token: "test-publish-token-zzz".to_string(), + }) + .await + .unwrap(); + assert_eq!(acquire_shards_response.acquired_shards.len(), 1); + assert_eq!( + acquire_shards_response.acquired_shards[0].publish_token(), + "test-publish-token-zzz" + ); + + // Legacy (pre-ULID, '/'-containing) tokens rank below any ULID. Shard 4 starts unowned: a + // legacy token can claim it, but is then superseded by a ULID and can no longer take it back. + let legacy_token = "indexer/test-node/test-index:0/test-source/01000000000000000000000000"; + let ulid_token = "02000000000000000000000000"; + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(4)], + publish_token: legacy_token.to_string(), + }) + .await + .unwrap(); + assert_eq!(acquire_shards_response.acquired_shards.len(), 1); + + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(4)], + publish_token: ulid_token.to_string(), + }) + .await + .unwrap(); + assert_eq!(acquire_shards_response.acquired_shards.len(), 1); + assert_eq!( + acquire_shards_response.acquired_shards[0].publish_token(), + ulid_token + ); + + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(4)], + publish_token: legacy_token.to_string(), + }) + .await + .unwrap(); + assert!(acquire_shards_response.acquired_shards.is_empty()); + cleanup_index(&mut metastore, test_index.index_uid).await; } diff --git a/quickwit/quickwit-proto/protos/quickwit/indexing.proto b/quickwit/quickwit-proto/protos/quickwit/indexing.proto index a4c28f46829..00a2e299add 100644 --- a/quickwit/quickwit-proto/protos/quickwit/indexing.proto +++ b/quickwit/quickwit-proto/protos/quickwit/indexing.proto @@ -26,6 +26,11 @@ service IndexingService { message ApplyIndexingPlanRequest { repeated IndexingTask indexing_tasks = 1; + // Identifier of the indexing plan, minted by the control plane as a ULID when the plan is + // applied. Indexers use it as the publish token for the shards they acquire: since ULIDs are + // monotonic, `AcquireShards` only succeeds for a token greater than or equal to the one already + // recorded, so a stale plan can never steal a shard from a more recent one. + string indexing_plan_id = 2; } message PipelineUid { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs index dc89720854a..986d1f4c156 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs @@ -4,6 +4,12 @@ pub struct ApplyIndexingPlanRequest { #[prost(message, repeated, tag = "1")] pub indexing_tasks: ::prost::alloc::vec::Vec, + /// Identifier of the indexing plan, minted by the control plane as a ULID when the plan is + /// applied. Indexers use it as the publish token for the shards they acquire: since ULIDs are + /// monotonic, `AcquireShards` only succeeds for a token greater than or equal to the one already + /// recorded, so a stale plan can never steal a shard from a more recent one. + #[prost(string, tag = "2")] + pub indexing_plan_id: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, ::prost::Message)] From 894fad03e4073585307b38d4ef40e1fa61591701 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 16 Jun 2026 11:53:11 -0400 Subject: [PATCH 2/3] tweaks --- .../src/actors/indexing_service.rs | 3 +-- .../src/metastore/postgres/metastore.rs | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index a82442c918d..0d4242d8528 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -789,8 +789,7 @@ impl IndexingService { plan_request: ApplyIndexingPlanRequest, ctx: &ActorContext, ) -> Result<(), IndexingError> { - let tasks = &plan_request.indexing_tasks; - let pipeline_diff = self.compute_pipeline_diff(tasks); + let pipeline_diff = self.compute_pipeline_diff(&plan_request.indexing_tasks); if !pipeline_diff.pipelines_to_shutdown.is_empty() { self.shutdown_pipelines(&pipeline_diff.pipelines_to_shutdown) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 43bd995e1ba..c5c6c6f0992 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -1436,6 +1436,11 @@ impl MetastoreService for PostgresqlMetastore { .bind(&request.publish_token) .fetch_all(&self.connection_pool) .await?; + + if pg_shards.len() != request.shard_ids.len() { + warn_on_unacquired_shards(&request, &pg_shards); + } + let acquired_shards = pg_shards .into_iter() .map(|pg_shard| pg_shard.into()) @@ -2953,6 +2958,28 @@ impl PostgresqlMetastore { } } +/// Best-effort diagnostics for the acquire error path: logs the shards from `request` that were not +/// acquired — those absent from `acquired_pg_shards` because a more recent publish token owns them, +/// or because they no longer exist. Does not touch the database. +fn warn_on_unacquired_shards(request: &AcquireShardsRequest, acquired_pg_shards: &[PgShard]) { + let not_acquired_shard_ids: Vec<&ShardId> = request + .shard_ids + .iter() + .filter(|shard_id| { + !acquired_pg_shards + .iter() + .any(|pg_shard| &pg_shard.shard_id == *shard_id) + }) + .collect(); + warn!( + index_uid=%request.index_uid(), + source_id=%request.source_id, + shard_ids=?not_acquired_shard_ids, + publish_token=%request.publish_token, + "could not acquire shards: held by a more recent publish token, or no longer present" + ); +} + async fn open_or_fetch_shard<'e>( executor: impl Executor<'e, Database = Postgres> + Clone, subrequest: &OpenShardSubrequest, From 78374bb73bfac444f0cf2cc91c0cdf77623ae3fb Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 16 Jun 2026 15:12:17 -0400 Subject: [PATCH 3/3] Fixing deployment plan --- .../file_backed/file_backed_index/shards.rs | 18 ++- .../src/metastore/postgres/metastore.rs | 4 +- .../postgres/queries/shards/acquire.sql | 10 +- .../quickwit-metastore/src/tests/shard.rs | 133 +++++++----------- 4 files changed, 74 insertions(+), 91 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs index 2dac6524e51..51c405ab368 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs @@ -52,13 +52,20 @@ impl fmt::Debug for Shards { } /// Whether a shard recording `existing_token` can be acquired by a pipeline presenting -/// `presented_token`. Acquisition is monotonic; a missing or legacy (`'/'`-containing) token ranks -/// below any ULID. +/// `presented_token`. Acquisition between ULIDs is monotonic (newer-or-equal wins). A legacy +/// (`'/'`-containing, pre-ULID) presented token always wins, so a rolling upgrade can still hand a +/// shard to an old indexer; a missing or legacy recorded token loses to any ULID. fn can_acquire_shard(existing_token: &str, presented_token: &str) -> bool { + // An old indexer presenting a legacy token keeps the pre-upgrade overwrite behavior. + if presented_token.contains('/') { + return true; + } + // A missing or legacy recorded token loses to any ULID. if existing_token.is_empty() || existing_token.contains('/') { return true; } - !presented_token.contains('/') && presented_token >= existing_token + // Both are ULIDs: acquire only if ours is newer-or-equal. + presented_token >= existing_token } impl Shards { @@ -572,8 +579,9 @@ mod tests { assert!(can_acquire_shard(NEWER, NEWER)); // An older ULID cannot steal a shard owned by a newer one. assert!(!can_acquire_shard(NEWER, OLDER)); - // A legacy presented token cannot supersede a recorded ULID. - assert!(!can_acquire_shard(NEWER, LEGACY)); + // A legacy presented token always wins, so a rolling upgrade can move a shard from a new + // indexer back to an old one. + assert!(can_acquire_shard(NEWER, LEGACY)); } #[test] diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index c5c6c6f0992..2c64b4794b8 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -1436,11 +1436,11 @@ impl MetastoreService for PostgresqlMetastore { .bind(&request.publish_token) .fetch_all(&self.connection_pool) .await?; - + if pg_shards.len() != request.shard_ids.len() { warn_on_unacquired_shards(&request, &pg_shards); } - + let acquired_shards = pg_shards .into_iter() .map(|pg_shard| pg_shard.into()) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql index 4ab98021321..e23a0b21a0a 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql @@ -6,13 +6,13 @@ WHERE index_uid = $1 AND source_id = $2 AND shard_id = ANY ($3) - -- Monotonic acquisition: the presented token ($4) must rank >= the recorded one. + -- Acquisition is monotonic between ULIDs; a legacy presented token keeps pre-upgrade behavior. AND ( - publish_token IS NULL -- never acquired: free to take + $4 LIKE '%/%' -- presented token is legacy (pre-ULID): always takes, for rolling upgrades + OR publish_token IS NULL -- never acquired: free to take OR publish_token = '' -- empty placeholder: free to take - OR publish_token LIKE '%/%' -- legacy pre-ULID token: ranks below any ULID, so superseded - -- both are ULIDs: take it only if ours is newer-or-equal (and ours is itself a ULID) - OR ($4 NOT LIKE '%/%' AND $4 >= publish_token) + OR publish_token LIKE '%/%' -- recorded token is legacy: superseded by any ULID + OR $4 >= publish_token -- both are ULIDs: take only if ours is newer-or-equal ) RETURNING * diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index a6c8c99e845..bf32042d6ea 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -229,6 +229,18 @@ pub async fn test_metastore_acquire_shards< ) .await; + // Publish tokens are the ULID of the indexing plan that minted them; ULIDs are time-ordered, so + // lexicographic order is chronological order. A token containing '/' is the legacy pre-ULID + // format: it loses to a ULID when recorded, but always wins when presented, so a rolling + // upgrade can still hand a shard back to an old indexer. + const OLDER_TOKEN: &str = "01000000000000000000000000"; + const TOKEN: &str = "02000000000000000000000000"; + const NEWER_TOKEN: &str = "03000000000000000000000000"; + const LEGACY_TOKEN: &str = + "indexer/test-node/test-index:0/test-source/01000000000000000000000000"; + + // Shard 1 owned by `TOKEN`, shard 2 unowned, shard 3 owned by a legacy token, shard 4 owned by + // `TOKEN`. let shards = vec![ Shard { index_uid: Some(test_index.index_uid.clone()), @@ -239,7 +251,7 @@ pub async fn test_metastore_acquire_shards< follower_id: Some("test-ingester-bar".to_string()), doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), - publish_token: Some("test-publish-token-foo".to_string()), + publish_token: Some(TOKEN.to_string()), update_timestamp: 1724158996, }, Shard { @@ -251,7 +263,7 @@ pub async fn test_metastore_acquire_shards< follower_id: Some("test-ingester-qux".to_string()), doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), - publish_token: Some("test-publish-token-bar".to_string()), + publish_token: None, update_timestamp: 1724158996, }, Shard { @@ -263,7 +275,7 @@ pub async fn test_metastore_acquire_shards< follower_id: Some("test-ingester-baz".to_string()), doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), - publish_token: None, + publish_token: Some(LEGACY_TOKEN.to_string()), update_timestamp: 1724158996, }, Shard { @@ -275,7 +287,7 @@ pub async fn test_metastore_acquire_shards< follower_id: Some("test-ingester-tux".to_string()), doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), - publish_token: None, + publish_token: Some(TOKEN.to_string()), update_timestamp: 1724158996, }, ]; @@ -283,142 +295,105 @@ pub async fn test_metastore_acquire_shards< .insert_shards(&test_index.index_uid, &test_index.source_id, shards) .await; - // Test acquire shards. - let acquire_shards_request = AcquireShardsRequest { - index_uid: Some(test_index.index_uid.clone()), - source_id: test_index.source_id.clone(), - shard_ids: vec![ - ShardId::from(1), - ShardId::from(2), - ShardId::from(3), - ShardId::from(666), - ], // shard 666 does not exist - publish_token: "test-publish-token-foo".to_string(), - }; - let mut acquire_shards_response = metastore - .acquire_shards(acquire_shards_request) - .await - .unwrap(); - - acquire_shards_response - .acquired_shards - .sort_unstable_by(|left, right| left.shard_id().cmp(right.shard_id())); - - let shard = &acquire_shards_response.acquired_shards[0]; - assert_eq!(shard.index_uid(), &test_index.index_uid); - assert_eq!(shard.source_id, test_index.source_id); - assert_eq!(shard.shard_id(), ShardId::from(1)); - assert_eq!(shard.shard_state(), ShardState::Closed); - assert_eq!(shard.leader_id, "test-ingester-foo"); - assert_eq!(shard.follower_id(), "test-ingester-bar"); - assert_eq!(shard.publish_position_inclusive(), Position::Beginning); - assert_eq!(shard.publish_token(), "test-publish-token-foo"); - - let shard = &acquire_shards_response.acquired_shards[1]; - assert_eq!(shard.index_uid(), &test_index.index_uid); - assert_eq!(shard.source_id, test_index.source_id); - assert_eq!(shard.shard_id(), ShardId::from(2)); - assert_eq!(shard.shard_state(), ShardState::Open); - assert_eq!(shard.leader_id, "test-ingester-bar"); - assert_eq!(shard.follower_id(), "test-ingester-qux"); - assert_eq!(shard.publish_position_inclusive(), Position::Beginning); - assert_eq!(shard.publish_token(), "test-publish-token-foo"); - - let shard = &acquire_shards_response.acquired_shards[2]; - assert_eq!(shard.index_uid(), &test_index.index_uid); - assert_eq!(shard.source_id, test_index.source_id); - assert_eq!(shard.shard_id(), ShardId::from(3)); - assert_eq!(shard.shard_state(), ShardState::Open); - assert_eq!(shard.leader_id, "test-ingester-qux"); - assert_eq!(shard.follower_id(), "test-ingester-baz"); - assert_eq!(shard.publish_position_inclusive(), Position::Beginning); - assert_eq!(shard.publish_token(), "test-publish-token-foo"); - - // A token ranking below the recorded one is refused: the shard is left untouched and absent - // from the response. + // A token ranking below the recorded one — and a non-existent shard — are refused: both are + // omitted from the response and the recorded token is left untouched. let acquire_shards_response = metastore .acquire_shards(AcquireShardsRequest { index_uid: Some(test_index.index_uid.clone()), source_id: test_index.source_id.clone(), - shard_ids: vec![ShardId::from(1)], - publish_token: "test-publish-token-aaa".to_string(), + shard_ids: vec![ShardId::from(1), ShardId::from(666)], + publish_token: OLDER_TOKEN.to_string(), }) .await .unwrap(); assert!(acquire_shards_response.acquired_shards.is_empty()); - // The same token re-acquires successfully (idempotent, e.g. after a local respawn). + // The same token re-acquires successfully (idempotent, e.g. after a local respawn); the full + // shard is returned. let acquire_shards_response = metastore .acquire_shards(AcquireShardsRequest { index_uid: Some(test_index.index_uid.clone()), source_id: test_index.source_id.clone(), shard_ids: vec![ShardId::from(1)], - publish_token: "test-publish-token-foo".to_string(), + publish_token: TOKEN.to_string(), }) .await .unwrap(); assert_eq!(acquire_shards_response.acquired_shards.len(), 1); - assert_eq!( - acquire_shards_response.acquired_shards[0].publish_token(), - "test-publish-token-foo" - ); + let shard = &acquire_shards_response.acquired_shards[0]; + assert_eq!(shard.index_uid(), &test_index.index_uid); + assert_eq!(shard.source_id, test_index.source_id); + assert_eq!(shard.shard_id(), ShardId::from(1)); + assert_eq!(shard.shard_state(), ShardState::Closed); + assert_eq!(shard.leader_id, "test-ingester-foo"); + assert_eq!(shard.follower_id(), "test-ingester-bar"); + assert_eq!(shard.publish_position_inclusive(), Position::Beginning); + assert_eq!(shard.publish_token(), TOKEN); - // A strictly greater token takes the shard over. + // A strictly newer ULID takes the shard over. let acquire_shards_response = metastore .acquire_shards(AcquireShardsRequest { index_uid: Some(test_index.index_uid.clone()), source_id: test_index.source_id.clone(), shard_ids: vec![ShardId::from(1)], - publish_token: "test-publish-token-zzz".to_string(), + publish_token: NEWER_TOKEN.to_string(), }) .await .unwrap(); assert_eq!(acquire_shards_response.acquired_shards.len(), 1); assert_eq!( acquire_shards_response.acquired_shards[0].publish_token(), - "test-publish-token-zzz" + NEWER_TOKEN ); - // Legacy (pre-ULID, '/'-containing) tokens rank below any ULID. Shard 4 starts unowned: a - // legacy token can claim it, but is then superseded by a ULID and can no longer take it back. - let legacy_token = "indexer/test-node/test-index:0/test-source/01000000000000000000000000"; - let ulid_token = "02000000000000000000000000"; + // An unowned shard can be acquired by any ULID. let acquire_shards_response = metastore .acquire_shards(AcquireShardsRequest { index_uid: Some(test_index.index_uid.clone()), source_id: test_index.source_id.clone(), - shard_ids: vec![ShardId::from(4)], - publish_token: legacy_token.to_string(), + shard_ids: vec![ShardId::from(2)], + publish_token: TOKEN.to_string(), }) .await .unwrap(); assert_eq!(acquire_shards_response.acquired_shards.len(), 1); + assert_eq!( + acquire_shards_response.acquired_shards[0].publish_token(), + TOKEN + ); + // A ULID supersedes a recorded legacy token. let acquire_shards_response = metastore .acquire_shards(AcquireShardsRequest { index_uid: Some(test_index.index_uid.clone()), source_id: test_index.source_id.clone(), - shard_ids: vec![ShardId::from(4)], - publish_token: ulid_token.to_string(), + shard_ids: vec![ShardId::from(3)], + publish_token: TOKEN.to_string(), }) .await .unwrap(); assert_eq!(acquire_shards_response.acquired_shards.len(), 1); assert_eq!( acquire_shards_response.acquired_shards[0].publish_token(), - ulid_token + TOKEN ); + // Shard 4 is owned by a ULID, yet a legacy token reclaims it: the rolling-upgrade hand-back, + // where the control plane moves a shard from a new indexer back to an old one. let acquire_shards_response = metastore .acquire_shards(AcquireShardsRequest { index_uid: Some(test_index.index_uid.clone()), source_id: test_index.source_id.clone(), shard_ids: vec![ShardId::from(4)], - publish_token: legacy_token.to_string(), + publish_token: LEGACY_TOKEN.to_string(), }) .await .unwrap(); - assert!(acquire_shards_response.acquired_shards.is_empty()); + assert_eq!(acquire_shards_response.acquired_shards.len(), 1); + assert_eq!( + acquire_shards_response.acquired_shards[0].publish_token(), + LEGACY_TOKEN + ); cleanup_index(&mut metastore, test_index.index_uid).await; }