diff --git a/nexus/db-queries/src/db/datastore/db_metadata.rs b/nexus/db-queries/src/db/datastore/db_metadata.rs index 6dd529cd40a..2fc349f2d83 100644 --- a/nexus/db-queries/src/db/datastore/db_metadata.rs +++ b/nexus/db-queries/src/db/datastore/db_metadata.rs @@ -25,10 +25,12 @@ use nexus_db_model::SchemaUpgradeStep; use nexus_db_model::SchemaVersion; use nexus_types::deployment::BlueprintZoneDisposition; use omicron_common::api::external::Error; +use omicron_uuid_kinds::BlueprintUuid; use omicron_uuid_kinds::GenericUuid; use omicron_uuid_kinds::OmicronZoneUuid; use semver::Version; use slog::{Logger, error, info, o}; +use std::collections::BTreeSet; use std::ops::Bound; use std::str::FromStr; @@ -686,6 +688,77 @@ impl DataStore { Ok(()) } + /// Returns information about access for all of the given Nexus ids + /// + /// This set is assumed to be pretty small. + pub async fn database_nexus_access_all( + &self, + opctx: &OpContext, + nexus_ids: &BTreeSet, + ) -> Result, Error> { + use nexus_db_schema::schema::db_metadata_nexus::dsl; + + let db_nexus_ids: BTreeSet<_> = nexus_ids + .iter() + .copied() + .map(nexus_db_model::to_db_typed_uuid) + .collect(); + dsl::db_metadata_nexus + .filter(dsl::nexus_id.eq_any(db_nexus_ids)) + .load_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Updates the "last_drained_blueprint_id" for the given Nexus id + pub async fn database_nexus_access_update_blueprint( + &self, + opctx: &OpContext, + nexus_id: OmicronZoneUuid, + blueprint_id: Option, + ) -> Result { + use nexus_db_schema::schema::db_metadata_nexus::dsl; + + let nexus_id = nexus_db_model::to_db_typed_uuid(nexus_id); + let blueprint_id = blueprint_id.map(nexus_db_model::to_db_typed_uuid); + + let conn = self.pool_connection_authorized(opctx).await?; + let count = diesel::update(dsl::db_metadata_nexus) + .filter(dsl::nexus_id.eq(nexus_id)) + // To be conservative, we'll only update this value if the record is + // currently active. There's no reason it should ever not be active + // if we're calling this function and if there were an easy way to + // return an error in that case, we'd just do that. + .filter(dsl::state.eq(DbMetadataNexusState::Active)) + .set(dsl::last_drained_blueprint_id.eq(blueprint_id)) + .execute_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + Ok(count) + } + + /// Updates the state for the given Nexus id to "quiesced" + pub async fn database_nexus_access_update_quiesced( + &self, + nexus_id: OmicronZoneUuid, + ) -> Result { + // A traditional authz check is not possible here because we've quiesced + // the DataStore, so no further connections are ordinarily available. + // (We use the lower-level pool interface to bypass that.) + let conn = self.pool.claim_quiesced().await?; + + use nexus_db_schema::schema::db_metadata_nexus::dsl; + + let nexus_id = nexus_db_model::to_db_typed_uuid(nexus_id); + let count = diesel::update(dsl::db_metadata_nexus) + .filter(dsl::nexus_id.eq(nexus_id)) + .set(dsl::state.eq(DbMetadataNexusState::Quiesced)) + .execute_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + Ok(count) + } + // Returns the access this Nexus has to the database async fn database_nexus_access( &self, diff --git a/nexus/db-queries/src/db/pool.rs b/nexus/db-queries/src/db/pool.rs index 28a76a14965..65473aad14e 100644 --- a/nexus/db-queries/src/db/pool.rs +++ b/nexus/db-queries/src/db/pool.rs @@ -232,6 +232,18 @@ impl Pool { }) } + /// Returns a connection from the pool, bypassing the quiesce check + /// + /// This is only intended for use *during* quiesce to update our final + /// database record. + pub async fn claim_quiesced( + &self, + ) -> Result, Error> { + self.inner.claim().await.map_err(|err| { + Error::unavail(&format!("Failed to access DB connection: {err}")) + }) + } + /// Disables creation of all new database claims /// /// This is currently a one-way trip. The pool cannot be un-quiesced. diff --git a/nexus/db-queries/src/db/pub_test_utils/mod.rs b/nexus/db-queries/src/db/pub_test_utils/mod.rs index 420f96ee6ab..6662fe8cc06 100644 --- a/nexus/db-queries/src/db/pub_test_utils/mod.rs +++ b/nexus/db-queries/src/db/pub_test_utils/mod.rs @@ -247,6 +247,21 @@ impl TestDatabase { } } + /// Returns a new independent datastore atop a new pool atop the same + /// database + /// + /// This is normally not necessary. You can clone the `Arc` + /// returned by `datastore()`. However, this is important for tests that + /// need separate datastores to test their separate quiesce behaviors. + pub async fn extra_datastore(&self, log: &Logger) -> Arc { + let pool = new_pool(log, &self.db); + Arc::new( + DataStore::new(&log, pool, None, IdentityCheckPolicy::DontCare) + .await + .unwrap(), + ) + } + pub fn opctx(&self) -> &OpContext { match &self.kind { TestKind::NoPool diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 60ddc534463..14283341354 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -140,6 +140,8 @@ use nexus_config::DnsTasksConfig; use nexus_db_model::DnsGroup; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; +use nexus_types::deployment::Blueprint; +use nexus_types::deployment::BlueprintTarget; use nexus_types::deployment::PendingMgsUpdates; use omicron_uuid_kinds::OmicronZoneUuid; use oximeter::types::ProducerRegistry; @@ -426,8 +428,10 @@ impl BackgroundTasksInitializer { // Background task: blueprint loader // // Registration is below so that it can watch the planner. - let blueprint_loader = - blueprint_load::TargetBlueprintLoader::new(datastore.clone()); + let blueprint_loader = blueprint_load::TargetBlueprintLoader::new( + datastore.clone(), + args.blueprint_load_tx, + ); let rx_blueprint = blueprint_loader.watcher(); // Background task: blueprint executor @@ -1023,6 +1027,9 @@ pub struct BackgroundTasksData { pub saga_recovery: saga_recovery::SagaRecoveryHelpers>, /// Channel for TUF repository artifacts to be replicated out to sleds pub tuf_artifact_replication_rx: mpsc::Receiver, + /// Channel for exposing the latest loaded blueprint + pub blueprint_load_tx: + watch::Sender>>, /// `reqwest::Client` for webhook delivery requests. /// /// This is shared with the external API as it's also used when sending diff --git a/nexus/src/app/background/tasks/blueprint_execution.rs b/nexus/src/app/background/tasks/blueprint_execution.rs index ad3e4f18204..a5d59d31d08 100644 --- a/nexus/src/app/background/tasks/blueprint_execution.rs +++ b/nexus/src/app/background/tasks/blueprint_execution.rs @@ -428,11 +428,16 @@ mod test { let mut task = BlueprintExecutor::new( datastore.clone(), resolver.clone(), - blueprint_rx, + blueprint_rx.clone(), OmicronZoneUuid::new_v4(), Activator::new(), dummy_tx, - NexusQuiesceHandle::new(&opctx.log, datastore.clone()), + NexusQuiesceHandle::new( + datastore.clone(), + OmicronZoneUuid::new_v4(), + blueprint_rx, + opctx.child(BTreeMap::new()), + ), ); // Now we're ready. diff --git a/nexus/src/app/background/tasks/blueprint_load.rs b/nexus/src/app/background/tasks/blueprint_load.rs index 7b7f546388d..469be135fb6 100644 --- a/nexus/src/app/background/tasks/blueprint_load.rs +++ b/nexus/src/app/background/tasks/blueprint_load.rs @@ -24,8 +24,10 @@ pub struct TargetBlueprintLoader { } impl TargetBlueprintLoader { - pub fn new(datastore: Arc) -> TargetBlueprintLoader { - let (tx, _) = watch::channel(None); + pub fn new( + datastore: Arc, + tx: watch::Sender>>, + ) -> TargetBlueprintLoader { TargetBlueprintLoader { datastore, last: None, tx } } @@ -256,7 +258,8 @@ mod test { datastore.clone(), ); - let mut task = TargetBlueprintLoader::new(datastore.clone()); + let (tx, _) = watch::channel(None); + let mut task = TargetBlueprintLoader::new(datastore.clone(), tx); let mut rx = task.watcher(); // We expect to see the initial blueprint set up by nexus-test-utils diff --git a/nexus/src/app/background/tasks/blueprint_planner.rs b/nexus/src/app/background/tasks/blueprint_planner.rs index 691706b189e..0f316f136bb 100644 --- a/nexus/src/app/background/tasks/blueprint_planner.rs +++ b/nexus/src/app/background/tasks/blueprint_planner.rs @@ -290,6 +290,7 @@ mod test { ReconfiguratorConfigView, }; use omicron_uuid_kinds::OmicronZoneUuid; + use std::collections::BTreeMap; type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; @@ -305,7 +306,9 @@ mod test { ); // Spin up the blueprint loader background task. - let mut loader = TargetBlueprintLoader::new(datastore.clone()); + let (tx_loader, _) = watch::channel(None); + let mut loader = + TargetBlueprintLoader::new(datastore.clone(), tx_loader); let mut rx_loader = loader.watcher(); loader.activate(&opctx).await; let (_initial_target, initial_blueprint) = &*rx_loader @@ -427,7 +430,12 @@ mod test { OmicronZoneUuid::new_v4(), Activator::new(), dummy_tx, - NexusQuiesceHandle::new(&opctx.log, datastore.clone()), + NexusQuiesceHandle::new( + datastore.clone(), + OmicronZoneUuid::new_v4(), + rx_loader.clone(), + opctx.child(BTreeMap::new()), + ), ); let value = executor.activate(&opctx).await; let value = value.as_object().expect("response is not a JSON object"); diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index b2a36de09bd..04b09c5934a 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -337,7 +337,20 @@ impl Nexus { sec_store, )); - let quiesce = NexusQuiesceHandle::new(&log, db_datastore.clone()); + let (blueprint_load_tx, blueprint_load_rx) = watch::channel(None); + let quiesce_log = log.new(o!("component" => "NexusQuiesceHandle")); + let quiesce_opctx = OpContext::for_background( + quiesce_log, + Arc::clone(&authz), + authn::Context::internal_api(), + Arc::clone(&db_datastore) as Arc, + ); + let quiesce = NexusQuiesceHandle::new( + db_datastore.clone(), + config.deployment.id, + blueprint_load_rx, + quiesce_opctx, + ); // It's a bit of a red flag to use an unbounded channel. // @@ -601,6 +614,7 @@ impl Nexus { }, tuf_artifact_replication_rx, mgs_updates_tx, + blueprint_load_tx, }, ); diff --git a/nexus/src/app/quiesce.rs b/nexus/src/app/quiesce.rs index 674366d8ecc..b5c97354f1c 100644 --- a/nexus/src/app/quiesce.rs +++ b/nexus/src/app/quiesce.rs @@ -4,18 +4,29 @@ //! Manage Nexus quiesce state +use anyhow::{Context, anyhow, bail}; use assert_matches::assert_matches; use chrono::Utc; +use nexus_db_model::DbMetadataNexusState; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; +use nexus_types::deployment::Blueprint; +use nexus_types::deployment::BlueprintTarget; +use nexus_types::deployment::BlueprintZoneDisposition; +use nexus_types::deployment::BlueprintZoneType; use nexus_types::internal_api::views::QuiesceState; use nexus_types::internal_api::views::QuiesceStatus; use nexus_types::quiesce::SagaQuiesceHandle; use omicron_common::api::external::LookupResult; use omicron_common::api::external::UpdateResult; -use slog::Logger; +use omicron_uuid_kinds::BlueprintUuid; +use omicron_uuid_kinds::OmicronZoneUuid; +use slog::{error, info}; +use slog_error_chain::InlineErrorChain; +use std::collections::BTreeSet; use std::sync::Arc; +use std::time::Duration; use std::time::Instant; use tokio::sync::watch; @@ -41,19 +52,36 @@ impl super::Nexus { /// Describes the configuration and state around quiescing Nexus #[derive(Clone)] pub struct NexusQuiesceHandle { - log: Logger, datastore: Arc, + my_nexus_id: OmicronZoneUuid, sagas: SagaQuiesceHandle, + quiesce_opctx: Arc, + latest_blueprint: + watch::Receiver>>, state: watch::Sender, } impl NexusQuiesceHandle { - pub fn new(log: &Logger, datastore: Arc) -> NexusQuiesceHandle { - let my_log = log.new(o!("component" => "NexusQuiesceHandle")); - let saga_quiesce_log = log.new(o!("component" => "SagaQuiesceHandle")); + pub fn new( + datastore: Arc, + my_nexus_id: OmicronZoneUuid, + latest_blueprint: watch::Receiver< + Option>, + >, + quiesce_opctx: OpContext, + ) -> NexusQuiesceHandle { + let saga_quiesce_log = + quiesce_opctx.log.new(o!("component" => "SagaQuiesceHandle")); let sagas = SagaQuiesceHandle::new(saga_quiesce_log); let (state, _) = watch::channel(QuiesceState::Undetermined); - NexusQuiesceHandle { log: my_log, datastore, sagas, state } + NexusQuiesceHandle { + datastore, + my_nexus_id, + sagas, + quiesce_opctx: Arc::new(quiesce_opctx), + latest_blueprint, + state, + } } pub fn sagas(&self) -> SagaQuiesceHandle { @@ -73,16 +101,17 @@ impl NexusQuiesceHandle { QuiesceState::Running }; + let log = &self.quiesce_opctx.log; let changed = self.state.send_if_modified(|q| { match q { QuiesceState::Undetermined => { - info!(&self.log, "initial state"; "state" => ?new_state); + info!(log, "initial state"; "state" => ?new_state); *q = new_state; true } QuiesceState::Running => { if quiescing { - info!(&self.log, "quiesce starting"); + info!(log, "quiesce starting"); *q = new_state; true } else { @@ -112,31 +141,76 @@ impl NexusQuiesceHandle { } } -async fn do_quiesce(quiesce: NexusQuiesceHandle) { - let saga_quiesce = quiesce.sagas.clone(); +async fn do_quiesce(mut quiesce: NexusQuiesceHandle) { let datastore = quiesce.datastore.clone(); + let my_nexus_id = quiesce.my_nexus_id; + + /// minimal timeout used just to avoid tight loops when encountering + /// transient errors + const PAUSE_TIMEOUT: Duration = Duration::from_secs(5); - // NOTE: This sequence will change as we implement RFD 588. - // We will need to use the datastore to report our saga drain status and - // also to see when other Nexus instances have finished draining their - // sagas. For now, this implementation begins quiescing its database as - // soon as its sagas are locally drained. assert_matches!( *quiesce.state.borrow(), QuiesceState::DrainingSagas { .. } ); - // TODO per RFD 588, this is where we will enter a loop, pausing either on - // timeout or when our local quiesce state changes. At each pause: if we - // need to update our db_metadata_nexus record, do so. Then load the - // current blueprint and check the records for all nexus instances. This - // work is covered by a combination of oxidecomputer/omicron#8859, - // oxidecomputer/omicron#8857, and oxidecomputer/omicron#8796. + // We've recorded that we should be quiescing. This will prevent new sagas + // from starting. However, we can still wind up running new sagas if a + // blueprint execution re-assigns us saga from an expunged Nexus. + // + // So here's the plan: we keep track of the blueprint id as of which we have + // fully drained. That means that we've processed all Nexus expungements up + // to that blueprint, _and_ we've recovered all sagas assigned to us, _and_ + // finished running them all. When this blueprint id changes (because we've + // processed re-assignments for a new blueprint), we'll do two things: + // + // (1) update our `db_metadata_nexus` record to reflect the change + // + // (2) check the `db_metadata_nexus` records of the other active Nexus + // instances to see if they've drained as of the same blueprint. If so, + // then all of the active Nexus instances have finished all sagas in the + // system and we can proceed to quiesce the database. + // + // If the other Nexus instances aren't drained up through the same + // blueprint, we'll check again shortly. // - // For now, we skip the cross-Nexus coordination and simply wait for our own - // Nexus to finish what it's doing. - saga_quiesce.wait_for_drained().await; + // One obvious case that's not handled here is that it's possible that other + // Nexus instances are drained as of a *subsequent* blueprint than the one + // we're looking for. That's no problem. We'll check again with the latest + // blueprint on the next lap. For this to converge, we have to assume + // blueprints aren't changing faster than we're checking. That's a pretty + // safe assumption here. They shouldn't be changing at all at this point + // unless reacting to something unrelated to the upgrade (e.g., a sled + // expungement), and we are checking awfully frequently, too. + let mut last_recorded_blueprint_id: Option = None; + loop { + match check_all_sagas_drained( + &mut quiesce, + &mut last_recorded_blueprint_id, + PAUSE_TIMEOUT, + ) + .await + { + Err(error) => { + // Log the error, sleep a bit to avoid spinning rapidly when + // conditions haven't changed, then take another lap. + let log = &quiesce.quiesce_opctx.log; + warn!(log, "not yet quiesced"; InlineErrorChain::new(&*error)); + tokio::time::sleep(PAUSE_TIMEOUT).await; + continue; + } + Ok(_) => { + // We're done with this stage. + let log = &quiesce.quiesce_opctx.log; + info!(log, "sagas quiesced -- moving on"); + break; + } + } + } + // We're ready to hand off. + let quiesce_opctx = &quiesce.quiesce_opctx; + let log = &quiesce_opctx.log; quiesce.state.send_modify(|q| { let QuiesceState::DrainingSagas { time_requested, @@ -176,8 +250,27 @@ async fn do_quiesce(quiesce: NexusQuiesceHandle) { }; }); - // TODO per RFD 588, this is where we will enter a loop trying to update our - // database record for the last time. See oxidecomputer/omicron#8971. + loop { + match datastore.database_nexus_access_update_quiesced(my_nexus_id).await + { + Ok(count) => { + info!( + log, + "updated Nexus record to 'quiesced'"; + "count" => count, + ); + break; + } + Err(error) => { + warn!( + log, + "failed to update Nexus record to 'quiesced'"; + InlineErrorChain::new(&error) + ); + tokio::time::sleep(PAUSE_TIMEOUT).await; + } + } + } quiesce.state.send_modify(|q| { let QuiesceState::RecordingQuiesce { @@ -201,10 +294,222 @@ async fn do_quiesce(quiesce: NexusQuiesceHandle) { duration_total: finished - time_draining_sagas, }; }); + + // We're done! Thank you for service, Nexus `$nexus_id`. + // + // At this point, although most of the rest of Nexus is still running (e.g., + // background tasks) and it can even receive HTTP requests, everything that + // tries to use the database will fail. That's just about everything. + // + // This process will hang around until the new Nexus instances expunge this + // one. This is useful, since debugging tools and tests can still query + // this Nexus for its quiesce state. +} + +/// Determines whether the fleet of currently-active Nexus instances have +/// all become drained as of the same blueprint. If some, returns `Ok(())`. +/// Otherwise, returns an error describing why we're not drained. +/// +/// Invoked by the caller in a loop until this function returns that we're +/// quiesced. +async fn check_all_sagas_drained( + quiesce: &mut NexusQuiesceHandle, + last_recorded_blueprint_id: &mut Option, + pause_timeout: Duration, +) -> Result<(), anyhow::Error> { + // See the comment in our caller for an explanation of the big picture here. + // This is factored into a function so that we can more easily bail out with + // an error and handle those uniformly (in the caller). + + let saga_quiesce = quiesce.sagas.clone(); + let datastore = quiesce.datastore.clone(); + let my_nexus_id = quiesce.my_nexus_id; + let quiesce_opctx = &quiesce.quiesce_opctx; + let log = &quiesce_opctx.log; + debug!(log, "try_saga_quiesce(): enter"); + + // Grab the most recently loaded blueprint. + let current_blueprint = quiesce + .latest_blueprint + // Wait for a blueprint to be loaded, if necessary. + .wait_for(|value| value.is_some()) + .await + // This should be impossible + .context("latest_blueprint rx channel closed")? + .as_deref() + // unwrap(): wait_for() returns a value for which the closure + // returns true, and we checked that this is `Some` + .unwrap() + // extract just the blueprint part + .1 + // As usual, we clone to avoid locking the watch channel for the + // lifetime of this value. + .clone(); + debug!( + log, + "try_saga_quiesce(): blueprint"; + "blueprint_id" => %current_blueprint.id + ); + + // Determine our own Nexus generation number. + // + // This doesn't ever change once we've determined it once. But we don't + // know what the value is until we see our first blueprint. + let Some(my_generation) = current_blueprint + .all_omicron_zones(BlueprintZoneDisposition::is_in_service) + .find_map(|(_sled_id, zone)| { + if let BlueprintZoneType::Nexus(nexus) = &zone.zone_type { + (zone.id == my_nexus_id).then_some(nexus.nexus_generation) + } else { + None + } + }) + else { + // This case should generally be impossible. We *are* a working Nexus + // zone, so how do we not exist in the blueprint? Anyway, there's + // really not much we can do here. Sleep a little bit just to avoid + // spinning rapidly, then try again in hopes that this was some + // transient thing. + // + // (Panicking would not be better. That would likely put us into a + // restart loop until SMF gives up. Then nothing would clear that + // condition.) + let error = anyhow!( + "could not find self ({my_nexus_id}) in blueprint {}", + current_blueprint.id + ); + error!( + log, + "try_saga_quiesce(): impossible condition"; + InlineErrorChain::new(&*error) + ); + return Err(error); + }; + + // Wait a bounded amount of time for sagas to become drained as of this + // blueprint. + let Ok(_) = tokio::time::timeout( + pause_timeout, + saga_quiesce.wait_for_drained_blueprint(current_blueprint.id), + ) + .await + else { + // We're still not drained as of this blueprint (or we're drained as of + // a newer one). It's possible that the current target blueprint could + // have changed, so bail out and let the caller invoke us again. + bail!("not locally drained as of blueprint {}", current_blueprint.id); + }; + + // We're drained up through this blueprint. First, update our record to + // reflect that, if we haven't already. + match last_recorded_blueprint_id { + Some(blueprint_id) if *blueprint_id == current_blueprint.id => (), + _ => { + info!( + log, + "locally drained as of blueprint"; + "blueprint_id" => %current_blueprint.id + ); + let try_update = datastore + .database_nexus_access_update_blueprint( + quiesce_opctx, + my_nexus_id, + Some(current_blueprint.id), + ) + .await; + match try_update { + Err(error) => { + return Err(anyhow!(error).context( + "updating our db_metadata_nexus record's drained \ + blueprint", + )); + } + Ok(0) => (), + Ok(count) => { + info!( + log, + "updated our db_metadata_nexus record blueprint id"; + "nexus_id" => %my_nexus_id, + "blueprint_id" => %current_blueprint.id, + "count" => count, + ); + } + } + + *last_recorded_blueprint_id = Some(current_blueprint.id); + } + }; + + // Now, see if everybody else is drained up to the same point, or if any + // of them is already quiesced. That would mean *they* already saw that we + // were all drained up to the same point, which is good enough for us to + // proceed, too. + let our_gen_nexus_ids: BTreeSet = current_blueprint + .all_omicron_zones(BlueprintZoneDisposition::is_in_service) + .filter_map(|(_sled_id, zone)| { + if let BlueprintZoneType::Nexus(nexus) = &zone.zone_type { + (nexus.nexus_generation == my_generation).then_some(zone.id) + } else { + None + } + }) + .collect(); + assert!(!our_gen_nexus_ids.is_empty()); + let other_records = datastore + .database_nexus_access_all(&quiesce_opctx, &our_gen_nexus_ids) + .await + .context( + "loading db_metadata_nexus records for other Nexus instances", + )?; + + if let Some(other) = other_records + .iter() + .find(|r| r.state() == DbMetadataNexusState::Quiesced) + { + info!( + log, + "found other Nexus instance with 'quiesced' record"; + "other_nexus" => %other.nexus_id(), + ); + return Ok(()); + } + + if other_records.len() < our_gen_nexus_ids.len() { + // This shouldn't ever happen. But if it does, we don't want to + // proceed because we don't know if the missing Nexus zone really is + // drained. + bail!( + "found too few other Nexus records (expected {:?}, found {:?})", + our_gen_nexus_ids, + other_records, + ); + } + + match other_records + .iter() + .find(|r| r.last_drained_blueprint_id() != *last_recorded_blueprint_id) + { + Some(undrained) => Err(anyhow!( + "at least one Nexus instance is not drained as of blueprint \ + {:?}: Nexus {}", + last_recorded_blueprint_id, + undrained.nexus_id() + )), + None => { + info!( + log, + "all Nexus instances are drained as of blueprint {:?}", + last_recorded_blueprint_id + ); + Ok(()) + } + } } #[cfg(test)] mod test { + use crate::app::quiesce::NexusQuiesceHandle; + use crate::app::sagas::test_helpers::test_opctx; use assert_matches::assert_matches; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::DateTime; @@ -214,12 +519,24 @@ mod test { use http::StatusCode; use nexus_client::types::QuiesceState; use nexus_client::types::QuiesceStatus; + use nexus_db_model::DbMetadataNexusState; use nexus_test_interface::NexusServer; + use nexus_test_utils::db::TestDatabase; use nexus_test_utils_macros::nexus_test; + use nexus_types::deployment::BlueprintTarget; + use nexus_types::deployment::BlueprintTargetSet; + use nexus_types::deployment::BlueprintZoneDisposition; + use nexus_types::quiesce::SagaReassignmentDone; use omicron_test_utils::dev::poll::CondCheckError; use omicron_test_utils::dev::poll::wait_for_condition; + use omicron_test_utils::dev::test_setup_log; use slog::Logger; + use std::collections::BTreeMap; + use std::collections::BTreeSet; + use std::sync::Arc; use std::time::Duration; + use tokio::sync::watch; + use uuid::Uuid; type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; @@ -287,6 +604,27 @@ mod test { assert!(status.db_claims.is_empty()); } + async fn enable_blueprint_execution(cptestctx: &ControlPlaneTestContext) { + let opctx = test_opctx(&cptestctx); + let nexus = &cptestctx.server.server_context().nexus; + nexus + .blueprint_target_set_enabled( + &opctx, + BlueprintTargetSet { + target_id: nexus + .blueprint_target_view(&opctx) + .await + .expect("current blueprint target") + .target_id, + enabled: true, + }, + ) + .await + .expect("enable blueprint execution"); + } + + /// Exercise trivial case of app-level quiesce in an environment with just + /// one Nexus #[nexus_test(server = crate::Server)] async fn test_quiesce_easy(cptestctx: &ControlPlaneTestContext) { let log = &cptestctx.logctx.log; @@ -297,8 +635,12 @@ mod test { let nexus_client = nexus_client::Client::new(&nexus_internal_url, log.clone()); - // If we quiesce Nexus while it's not doing anything, that should - // complete quickly. + // We need to enable blueprint execution in order to complete a saga + // assignment pass, which is required for quiescing to work. + enable_blueprint_execution(&cptestctx).await; + + // If we quiesce the only Nexus while it's not doing anything, that + // should complete quickly. let before = Utc::now(); let _ = nexus_client .quiesce_start() @@ -310,6 +652,8 @@ mod test { verify_quiesced(before, after, rv); } + /// Exercise non-trivial app-level quiesce in an environment with just one + /// Nexus #[nexus_test(server = crate::Server)] async fn test_quiesce_full(cptestctx: &ControlPlaneTestContext) { let log = &cptestctx.logctx.log; @@ -320,6 +664,10 @@ mod test { let nexus_client = nexus_client::Client::new(&nexus_internal_url, log.clone()); + // We need to enable blueprint execution in order to complete a saga + // assignment pass, which is required for quiescing to work. + enable_blueprint_execution(&cptestctx).await; + // Kick off a demo saga that will block quiescing. let demo_saga = nexus_client .saga_demo_create() @@ -471,4 +819,208 @@ mod test { StatusCode::SERVICE_UNAVAILABLE ); } + + /// Test Nexus quiesce with multiple different Nexus instances + /// + /// Unlike the tests above, this is not an "app-level" test. There's not a + /// full Nexus here. We're testing with just a `NexusQuiesceHandle` and the + /// few things that it requires (e.g., the datastore). + #[tokio::test] + async fn test_quiesce_multi() { + use nexus_types::internal_api::views::QuiesceState; + + let logctx = test_setup_log("test_quiesce_multi"); + let log = &logctx.log; + let testdb = TestDatabase::new_with_datastore(log).await; + let opctx = testdb.opctx(); + let datastore = testdb.datastore(); + + // Set up. We need: + // + // - a datastore + // - a blueprint that includes several Nexus instances + // - a watch Receiver to provide this blueprint to the + // NexusQuiesceHandle + // - the datastore needs to be initialized with the Nexus access records + // for the Nexus instances in the blueprint + + let (_collection, _input, blueprint) = + nexus_reconfigurator_planning::example::example( + log, + "test_quiesce_multi", + ); + let nexus_ids = blueprint + .all_omicron_zones(BlueprintZoneDisposition::is_in_service) + .filter_map(|(_sled_id, z)| z.zone_type.is_nexus().then_some(z.id)) + .collect::>(); + // The example system creates three sleds. Each sled gets a Nexus zone. + assert_eq!( + nexus_ids.len(), + 3, + "Example system configuration has been changed. \ + Please update this test." + ); + + let mut nexus_id_iter = nexus_ids.iter(); + let nexus_id1 = *nexus_id_iter.next().expect("at least one Nexus id"); + let nexus_id2 = *nexus_id_iter.next().expect("at least two Nexus ids"); + let nexus_id3 = + *nexus_id_iter.next().expect("at least three Nexus ids"); + + // Fake up enough of what the blueprint loader produces. + let bp_target = BlueprintTarget { + target_id: blueprint.id, + enabled: false, + time_made_target: Utc::now(), + }; + let blueprint_id = blueprint.id; + let (_, blueprint_rx) = + watch::channel(Some(Arc::new((bp_target, blueprint)))); + + // Insert active records for the Nexus instances. + let conn = + datastore.pool_connection_for_tests().await.expect("db conn"); + datastore + .initialize_nexus_access_from_blueprint_on_connection( + &conn, + nexus_ids.into_iter().collect(), + ) + .await + .expect("initialize Nexus database access records"); + drop(conn); + + // Initialize our quiesce handles. + // + // Each needs its own DataStore, backed by its own Pool, in order to + // behave like three different Nexus instances. + let nexus_qq1 = NexusQuiesceHandle::new( + testdb.extra_datastore(log).await, + nexus_id1, + blueprint_rx.clone(), + opctx.child(BTreeMap::new()), + ); + let nexus_qq2 = NexusQuiesceHandle::new( + testdb.extra_datastore(log).await, + nexus_id2, + blueprint_rx.clone(), + opctx.child(BTreeMap::new()), + ); + let nexus_qq3 = NexusQuiesceHandle::new( + testdb.extra_datastore(log).await, + nexus_id3, + blueprint_rx.clone(), + opctx.child(BTreeMap::new()), + ); + + let handles = [&nexus_qq1, &nexus_qq2, &nexus_qq3]; + + // Verify that at start, each handle's quiesce state is undetermined. + // This is tested more exhaustively elsewhere (in the SagaQuiesceHandle + // tests). We're just checking our assumption. + for qq in &handles { + let state = qq.state(); + assert_matches!(state, QuiesceState::Undetermined); + } + + // Mark each handle as not-quiescing. + for qq in &handles { + qq.set_quiescing(false); + let state = qq.state(); + assert_matches!(state, QuiesceState::Running); + } + + // In order to quiesce, each handle will need to have completed a + // re-assignment pass for our blueprint and also one saga recovery pass. + // Do both of those now. + for qq in &handles { + let sagas = qq.sagas(); + sagas + .reassign_sagas(async || { + ( + (), + SagaReassignmentDone::ReassignedAllAsOf( + blueprint_id, + false, + ), + ) + }) + .await; + sagas.recover(async |_| ((), true)).await; + } + + // Before we actually start quiescing, create a saga in one of these + // handles. + let saga_ref = nexus_qq1 + .sagas() + .saga_create( + steno::SagaId(Uuid::new_v4()), + &steno::SagaName::new("test-saga"), + ) + .expect("create saga while not quiesced"); + + // Now, start quiescing them all. + for qq in &handles { + qq.set_quiescing(true); + let state = qq.state(); + assert_matches!(state, QuiesceState::DrainingSagas { .. }); + } + + // Importantly, *none* of these handles should quiesce while there's a + // saga running in any of them. It's hard to verify a negative. We'll + // wait a little while and make sure the state hasn't changed. + let _ = tokio::time::sleep(Duration::from_secs(10)).await; + for qq in &handles { + assert_matches!(qq.state(), QuiesceState::DrainingSagas { .. }); + } + let records = datastore + .database_nexus_access_all( + &opctx, + &BTreeSet::from([nexus_id1, nexus_id2, nexus_id3]), + ) + .await + .expect("reading access records"); + assert_eq!(records.len(), 3); + assert!( + records.iter().all(|r| r.state() == DbMetadataNexusState::Active) + ); + + // Now finish that saga. All three handles should quiesce. + drop(saga_ref); + wait_for_condition( + || async { + if !handles + .iter() + .all(|q| matches!(q.state(), QuiesceState::Quiesced { .. })) + { + return Err(CondCheckError::<()>::NotYet); + } + + Ok(()) + }, + &Duration::from_millis(250), + &Duration::from_secs(15), + ) + .await + .expect("did not quiesce within timeout"); + + // Each "Nexus" record should say that it's quiesced. + // + // Note that we're using a different datastore (backed by the same + // CockroachDB instance) than the quiesce handles are. That's important + // since their datastores are all quiesced! + let records = datastore + .database_nexus_access_all( + &opctx, + &BTreeSet::from([nexus_id1, nexus_id2, nexus_id3]), + ) + .await + .expect("reading access records"); + assert_eq!(records.len(), 3); + assert!( + records.iter().all(|r| r.state() == DbMetadataNexusState::Quiesced) + ); + + testdb.terminate().await; + logctx.cleanup_successful(); + } } diff --git a/nexus/types/src/quiesce.rs b/nexus/types/src/quiesce.rs index b336cd037ac..94b98a38dad 100644 --- a/nexus/types/src/quiesce.rs +++ b/nexus/types/src/quiesce.rs @@ -285,10 +285,36 @@ impl SagaQuiesceHandle { /// /// Note that new sagas can still be assigned to this Nexus, resulting in it /// no longer being fully drained. - pub async fn wait_for_drained(&self) { + #[cfg(test)] + async fn wait_for_drained(&self) { let _ = self.inner.subscribe().wait_for(|q| q.is_fully_drained()).await; } + /// Wait for sagas to become drained as of this blueprint id + /// + /// This is cancel-safe. + /// + /// It's possible this will never happen. This is generally to be invoked + /// with a timeout or in a select on some other conditions. + /// + /// Note that when this returns, `self.fully_drained_blueprint()` could + /// already be a different blueprint. + pub async fn wait_for_drained_blueprint( + &self, + blueprint_id: BlueprintUuid, + ) { + let _ = self + .inner + .subscribe() + .wait_for(|q| { + matches!( + q.drained_blueprint_id, + Some(id) if id == blueprint_id + ) + }) + .await; + } + /// Wait for the initial determination to be made about whether sagas are /// allowed or not. pub async fn wait_for_determination(&self) { @@ -761,6 +787,7 @@ impl Drop for NewlyPendingSagaRef { not recorded, and the saga is not still pending" ) }); + q.latch_blueprint_if_drained(); }); } }