Skip to content
Merged
73 changes: 73 additions & 0 deletions nexus/db-queries/src/db/datastore/db_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ use nexus_db_model::SchemaUpgradeStep;
use nexus_db_model::SchemaVersion;
use nexus_types::deployment::BlueprintZoneDisposition;
use omicron_common::api::external::Error;
use omicron_uuid_kinds::BlueprintUuid;
use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::OmicronZoneUuid;
use semver::Version;
use slog::{Logger, error, info, o};
use std::collections::BTreeSet;
use std::ops::Bound;
use std::str::FromStr;

Expand Down Expand Up @@ -686,6 +688,77 @@ impl DataStore {
Ok(())
}

/// Returns information about access for all of the given Nexus ids
///
/// This set is assumed to be pretty small.
pub async fn database_nexus_access_all(
&self,
opctx: &OpContext,
nexus_ids: &BTreeSet<OmicronZoneUuid>,
) -> Result<Vec<DbMetadataNexus>, Error> {
use nexus_db_schema::schema::db_metadata_nexus::dsl;

let db_nexus_ids: BTreeSet<_> = nexus_ids
.iter()
.copied()
.map(nexus_db_model::to_db_typed_uuid)
.collect();
dsl::db_metadata_nexus
.filter(dsl::nexus_id.eq_any(db_nexus_ids))
.load_async(&*self.pool_connection_authorized(opctx).await?)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// Updates the "last_drained_blueprint_id" for the given Nexus id

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, should we update this to return a bool? It's always going to be 0 or 1 rows updated, right?

Alternatively - we could just return an error if the "count == 0", right?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to return an error for the 0 case because that could just mean we've already updated it to this blueprint id and that's fine. The caller needs to retry errors, but not that case.

I can see the appeal of the bool. I want to log the value either way, which feels slightly more idiomatic at the caller level, so I'm going to leave it.

pub async fn database_nexus_access_update_blueprint(
&self,
opctx: &OpContext,
nexus_id: OmicronZoneUuid,
blueprint_id: Option<BlueprintUuid>,
) -> Result<usize, Error> {
use nexus_db_schema::schema::db_metadata_nexus::dsl;

let nexus_id = nexus_db_model::to_db_typed_uuid(nexus_id);
let blueprint_id = blueprint_id.map(nexus_db_model::to_db_typed_uuid);

let conn = self.pool_connection_authorized(opctx).await?;
let count = diesel::update(dsl::db_metadata_nexus)
.filter(dsl::nexus_id.eq(nexus_id))
// To be conservative, we'll only update this value if the record is
// currently active. There's no reason it should ever not be active
// if we're calling this function and if there were an easy way to
// return an error in that case, we'd just do that.
.filter(dsl::state.eq(DbMetadataNexusState::Active))
.set(dsl::last_drained_blueprint_id.eq(blueprint_id))
.execute_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;
Ok(count)
}

/// Updates the state for the given Nexus id to "quiesced"
pub async fn database_nexus_access_update_quiesced(
&self,
nexus_id: OmicronZoneUuid,
) -> Result<usize, Error> {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here about returning a usize - since we're indexing on nexus_id already, seems like we'll either perform the update successfully or not, and can return that more idiomatically than a usize.

// A traditional authz check is not possible here because we've quiesced
// the DataStore, so no further connections are ordinarily available.
// (We use the lower-level pool interface to bypass that.)
let conn = self.pool.claim_quiesced().await?;

use nexus_db_schema::schema::db_metadata_nexus::dsl;

let nexus_id = nexus_db_model::to_db_typed_uuid(nexus_id);
let count = diesel::update(dsl::db_metadata_nexus)
.filter(dsl::nexus_id.eq(nexus_id))
.set(dsl::state.eq(DbMetadataNexusState::Quiesced))
.execute_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;
Ok(count)
}

// Returns the access this Nexus has to the database
async fn database_nexus_access(
&self,
Expand Down
12 changes: 12 additions & 0 deletions nexus/db-queries/src/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,18 @@ impl Pool {
})
}

/// Returns a connection from the pool, bypassing the quiesce check
///
/// This is only intended for use *during* quiesce to update our final
/// database record.
pub async fn claim_quiesced(
&self,
) -> Result<qorb::claim::Handle<AsyncConnection>, Error> {
self.inner.claim().await.map_err(|err| {
Error::unavail(&format!("Failed to access DB connection: {err}"))
})
}

/// Disables creation of all new database claims
///
/// This is currently a one-way trip. The pool cannot be un-quiesced.
Expand Down
15 changes: 15 additions & 0 deletions nexus/db-queries/src/db/pub_test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,21 @@ impl TestDatabase {
}
}

/// Returns a new independent datastore atop a new pool atop the same
/// database
///
/// This is normally not necessary. You can clone the `Arc<DataStore>`
/// returned by `datastore()`. However, this is important for tests that
/// need separate datastores to test their separate quiesce behaviors.
pub async fn extra_datastore(&self, log: &Logger) -> Arc<DataStore> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this, as opposed to cloning the return value of datastore()?

@davepacheco davepacheco Sep 16, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The datastore (really, the underlying pool) has in-memory state associated with being quiesced. In our test, we want independent instances of this so that they can be quiesced independently.

edit: I'll add a comment about this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, fair enough. Probably worth at least a doc comment noting how this is different? I'm tempted to suggest a more descriptive name too, but I'm not sure what. independent_datastore() maybe?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added in e8eb204.

let pool = new_pool(log, &self.db);
Arc::new(
DataStore::new(&log, pool, None, IdentityCheckPolicy::DontCare)
.await
.unwrap(),
)
}

pub fn opctx(&self) -> &OpContext {
match &self.kind {
TestKind::NoPool
Expand Down
11 changes: 9 additions & 2 deletions nexus/src/app/background/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ use nexus_config::DnsTasksConfig;
use nexus_db_model::DnsGroup;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::DataStore;
use nexus_types::deployment::Blueprint;
use nexus_types::deployment::BlueprintTarget;
use nexus_types::deployment::PendingMgsUpdates;
use omicron_uuid_kinds::OmicronZoneUuid;
use oximeter::types::ProducerRegistry;
Expand Down Expand Up @@ -426,8 +428,10 @@ impl BackgroundTasksInitializer {
// Background task: blueprint loader
//
// Registration is below so that it can watch the planner.
let blueprint_loader =
blueprint_load::TargetBlueprintLoader::new(datastore.clone());
let blueprint_loader = blueprint_load::TargetBlueprintLoader::new(
datastore.clone(),
args.blueprint_load_tx,
);
let rx_blueprint = blueprint_loader.watcher();

// Background task: blueprint executor
Expand Down Expand Up @@ -1023,6 +1027,9 @@ pub struct BackgroundTasksData {
pub saga_recovery: saga_recovery::SagaRecoveryHelpers<Arc<Nexus>>,
/// Channel for TUF repository artifacts to be replicated out to sleds
pub tuf_artifact_replication_rx: mpsc::Receiver<ArtifactsWithPlan>,
/// Channel for exposing the latest loaded blueprint
pub blueprint_load_tx:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure I understand: we have this field now because in nexus/app/mod.rs, we need to construct this channel to get a handle to the receiver before we've set up the background task system?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

watch::Sender<Option<Arc<(BlueprintTarget, Blueprint)>>>,
/// `reqwest::Client` for webhook delivery requests.
///
/// This is shared with the external API as it's also used when sending
Expand Down
9 changes: 7 additions & 2 deletions nexus/src/app/background/tasks/blueprint_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,16 @@ mod test {
let mut task = BlueprintExecutor::new(
datastore.clone(),
resolver.clone(),
blueprint_rx,
blueprint_rx.clone(),
OmicronZoneUuid::new_v4(),
Activator::new(),
dummy_tx,
NexusQuiesceHandle::new(&opctx.log, datastore.clone()),
NexusQuiesceHandle::new(
datastore.clone(),
OmicronZoneUuid::new_v4(),
blueprint_rx,
opctx.child(BTreeMap::new()),
),
);

// Now we're ready.
Expand Down
9 changes: 6 additions & 3 deletions nexus/src/app/background/tasks/blueprint_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ pub struct TargetBlueprintLoader {
}

impl TargetBlueprintLoader {
pub fn new(datastore: Arc<DataStore>) -> TargetBlueprintLoader {
let (tx, _) = watch::channel(None);
pub fn new(
datastore: Arc<DataStore>,
tx: watch::Sender<Option<Arc<(BlueprintTarget, Blueprint)>>>,
) -> TargetBlueprintLoader {
TargetBlueprintLoader { datastore, last: None, tx }
}

Expand Down Expand Up @@ -256,7 +258,8 @@ mod test {
datastore.clone(),
);

let mut task = TargetBlueprintLoader::new(datastore.clone());
let (tx, _) = watch::channel(None);
let mut task = TargetBlueprintLoader::new(datastore.clone(), tx);
let mut rx = task.watcher();

// We expect to see the initial blueprint set up by nexus-test-utils
Expand Down
12 changes: 10 additions & 2 deletions nexus/src/app/background/tasks/blueprint_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ mod test {
ReconfiguratorConfigView,
};
use omicron_uuid_kinds::OmicronZoneUuid;
use std::collections::BTreeMap;

type ControlPlaneTestContext =
nexus_test_utils::ControlPlaneTestContext<crate::Server>;
Expand All @@ -305,7 +306,9 @@ mod test {
);

// Spin up the blueprint loader background task.
let mut loader = TargetBlueprintLoader::new(datastore.clone());
let (tx_loader, _) = watch::channel(None);
let mut loader =
TargetBlueprintLoader::new(datastore.clone(), tx_loader);
let mut rx_loader = loader.watcher();
loader.activate(&opctx).await;
let (_initial_target, initial_blueprint) = &*rx_loader
Expand Down Expand Up @@ -427,7 +430,12 @@ mod test {
OmicronZoneUuid::new_v4(),
Activator::new(),
dummy_tx,
NexusQuiesceHandle::new(&opctx.log, datastore.clone()),
NexusQuiesceHandle::new(
datastore.clone(),
OmicronZoneUuid::new_v4(),
rx_loader.clone(),
opctx.child(BTreeMap::new()),
),
);
let value = executor.activate(&opctx).await;
let value = value.as_object().expect("response is not a JSON object");
Expand Down
16 changes: 15 additions & 1 deletion nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,20 @@ impl Nexus {
sec_store,
));

let quiesce = NexusQuiesceHandle::new(&log, db_datastore.clone());
let (blueprint_load_tx, blueprint_load_rx) = watch::channel(None);
let quiesce_log = log.new(o!("component" => "NexusQuiesceHandle"));
let quiesce_opctx = OpContext::for_background(
quiesce_log,
Arc::clone(&authz),
authn::Context::internal_api(),
Arc::clone(&db_datastore) as Arc<dyn nexus_auth::storage::Storage>,
);
let quiesce = NexusQuiesceHandle::new(
db_datastore.clone(),
config.deployment.id,
blueprint_load_rx,
quiesce_opctx,
);

// It's a bit of a red flag to use an unbounded channel.
//
Expand Down Expand Up @@ -601,6 +614,7 @@ impl Nexus {
},
tuf_artifact_replication_rx,
mgs_updates_tx,
blueprint_load_tx,
},
);

Expand Down
Loading
Loading