diff --git a/Cargo.lock b/Cargo.lock index b2203c19d..63cab2dbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4609,6 +4609,7 @@ dependencies = [ "propolis", "propolis_api_types", "propolis_types", + "proptest", "reqwest 0.12.7", "rfb", "rgb_frame", diff --git a/bin/propolis-server/Cargo.toml b/bin/propolis-server/Cargo.toml index 98add8228..e9e7b1ced 100644 --- a/bin/propolis-server/Cargo.toml +++ b/bin/propolis-server/Cargo.toml @@ -73,6 +73,7 @@ ring.workspace = true slog = { workspace = true, features = [ "max_level_trace", "release_max_level_debug" ] } expectorate.workspace = true mockall.workspace = true +proptest.workspace = true [features] default = [] @@ -83,7 +84,6 @@ omicron-build = ["propolis/omicron-build"] # Falcon builds require corresponding bits turned on in the dependency libs falcon = ["propolis/falcon"] - # Testing necessitates injecting failures which should hopefully be rare or even # never occur on real otherwise-unperturbed systems. We conditionally compile # code supporting failure injection to avoid the risk of somehow injecting diff --git a/bin/propolis-server/src/lib/vm/active.rs b/bin/propolis-server/src/lib/vm/active.rs index 8a812bf27..1da8f7ea4 100644 --- a/bin/propolis-server/src/lib/vm/active.rs +++ b/bin/propolis-server/src/lib/vm/active.rs @@ -63,9 +63,9 @@ impl ActiveVm { self.state_driver_queue .queue_external_request(match requested { - InstanceStateRequested::Run => ExternalRequest::Start, - InstanceStateRequested::Stop => ExternalRequest::Stop, - InstanceStateRequested::Reboot => ExternalRequest::Reboot, + InstanceStateRequested::Run => ExternalRequest::start(), + InstanceStateRequested::Stop => ExternalRequest::stop(), + InstanceStateRequested::Reboot => ExternalRequest::reboot(), }) .map_err(Into::into) } @@ -79,10 +79,7 @@ impl ActiveVm { websock: dropshot::WebsocketConnection, ) -> Result<(), VmError> { Ok(self.state_driver_queue.queue_external_request( - ExternalRequest::MigrateAsSource { - migration_id, - websock: websock.into(), - }, + ExternalRequest::migrate_as_source(migration_id, websock), )?) } @@ -107,11 +104,11 @@ impl ActiveVm { ) -> Result<(), VmError> { self.state_driver_queue .queue_external_request( - ExternalRequest::ReconfigureCrucibleVolume { + ExternalRequest::reconfigure_crucible_volume( backend_id, new_vcr_json, result_tx, - }, + ), ) .map_err(Into::into) } diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 30d26f59c..9a2b58c9c 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -2,29 +2,22 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! Handles requests to change a Propolis server's state via the external API. +//! Handles requests to change a Propolis server's state or component +//! configuration via the external API. //! -//! An instance accepts or rejects requests to change state based on a -//! combination of its current state and its knowledge of the requests it has -//! previously queued but not processed yet. The latter knowledge is used to -//! reject requests that will never be fulfilled (because they're preceded by an -//! action that will forbid them; consider rebooting after stopping) or that may -//! need be to redirected to a migration target. +//! The queue accepts or rejects requests based on a combination of its current +//! state and its knowledge of requests that it has previously queued but that +//! have not yet been processed. The latter knowledge is used to reject requests +//! that will never be fulfilled (because a prior request preempts them) or that +//! may need to be redirected to a migration target. //! -//! The queue maintains a disposition for each kind of request that can be sent -//! to it, which allows that request to be enqueued, denied, or silently ignored -//! (for idempotency purposes). These dispositions can change as new requests -//! are queued. The queue also provides callbacks to the VM state driver that -//! allow the driver to advise the queue of state changes that further affect -//! what requests should be accepted. -//! -//! Users who want to share a queue must wrap it in the synchronization objects -//! of their choice. +//! The queue contains no synchronization of its own. Users who want to share a +//! queue between multiple threads must wrap it in a synchronization object. use std::collections::VecDeque; use propolis_api_types::instance_spec::SpecKey; -use slog::{debug, info, Logger}; +use slog::{info, Logger}; use thiserror::Error; use uuid::Uuid; @@ -53,9 +46,8 @@ impl WebsocketConnection { } } -/// An external request made of a VM controller via the server API. Handled by -/// the controller's state driver thread. -pub enum ExternalRequest { +/// A request to change a VM's runtime state. +pub(crate) enum StateChangeRequest { /// Asks the state worker to start a brand-new VM (i.e. not one initialized /// by live migration, which implicitly starts the VM). Start, @@ -71,7 +63,29 @@ pub enum ExternalRequest { /// Halts the VM. Note that this is not a graceful shutdown and does not /// coordinate with guest software. Stop, +} + +impl std::fmt::Debug for StateChangeRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Start => write!(f, "Start"), + Self::MigrateAsSource { migration_id, websock: _ } => f + .debug_struct("MigrateAsSource") + .field("migration_id", migration_id) + .finish(), + Self::Reboot => write!(f, "Reboot"), + Self::Stop => write!(f, "Stop"), + } + } +} +/// A request to reconfigure a VM's components. +/// +/// NOTE: Successfully queuing a component change request does not guarantee +/// that the request will be processed, because it may be preempted by a VM +/// state change. If this happens, the request will fail and notify the +/// submitter using whatever channel is appropriate for the request's type. +pub enum ComponentChangeRequest { /// Attempts to update the volume construction request for the supplied /// Crucible volume. /// @@ -91,16 +105,9 @@ pub enum ExternalRequest { }, } -impl std::fmt::Debug for ExternalRequest { +impl std::fmt::Debug for ComponentChangeRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Start => write!(f, "Start"), - Self::MigrateAsSource { migration_id, .. } => f - .debug_struct("MigrateAsSource") - .field("migration_id", migration_id) - .finish(), - Self::Reboot => write!(f, "Reboot"), - Self::Stop => write!(f, "Stop"), Self::ReconfigureCrucibleVolume { backend_id, .. } => f .debug_struct("ReconfigureCrucibleVolume") .field("backend_id", backend_id) @@ -109,10 +116,68 @@ impl std::fmt::Debug for ExternalRequest { } } +/// An external request made of a VM controller via the server API. Handled by +/// the controller's state driver thread. +#[derive(Debug)] +pub(crate) enum ExternalRequest { + /// A request to change the VM's runtime state. + State(StateChangeRequest), + + /// A request to reconfigure one of the VM's components. + Component(ComponentChangeRequest), +} + +impl ExternalRequest { + /// Constructs a VM start request. + pub const fn start() -> Self { + Self::State(StateChangeRequest::Start) + } + + /// Constructs a VM stop request. + pub const fn stop() -> Self { + Self::State(StateChangeRequest::Stop) + } + + /// Constructs a VM reboot request. + pub const fn reboot() -> Self { + Self::State(StateChangeRequest::Reboot) + } + + /// Constructs a request to migrate a VM to another Propolis instance, using + /// `ws_conn` as the websocket connection to the migration target. + pub fn migrate_as_source( + migration_id: Uuid, + ws_conn: dropshot::WebsocketConnection, + ) -> Self { + Self::State(StateChangeRequest::MigrateAsSource { + migration_id, + websock: WebsocketConnection(Some(ws_conn)), + }) + } + + /// Constructs a request to update a Crucible volume's construction request. + /// The result of this request will be sent to the supplied `result_tx`. + pub fn reconfigure_crucible_volume( + backend_id: SpecKey, + new_vcr_json: String, + result_tx: super::CrucibleReplaceResultTx, + ) -> Self { + Self::Component(ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id, + new_vcr_json, + result_tx, + }) + } + + fn is_stop(&self) -> bool { + matches!(self, Self::State(StateChangeRequest::Stop)) + } +} + /// A set of reasons why a request to queue an external state transition can /// fail. #[derive(Copy, Clone, Debug, Error)] -pub enum RequestDeniedReason { +pub(crate) enum RequestDeniedReason { #[error("Operation requires an active instance")] InstanceNotActive, @@ -123,68 +188,99 @@ pub enum RequestDeniedReason { AlreadyMigrationSource, #[error("Operation cannot be performed on a migration source")] - InvalidRequestForMigrationSource, + InvalidForMigrationSource, #[error("Instance is preparing to stop")] HaltPending, + #[error("Instance has migrated out and is being torn down")] + MigratedOut, + + #[error("Instance has already halted")] + Halted, + #[error("Instance failed to start or halted due to a failure")] InstanceFailed, } -/// The set of instance state changes that should change the dispositions of -/// future requests to the queue. +/// A kind of request that can be popped from the queue and then completed. #[derive(Copy, Clone, Debug)] -pub enum InstanceStateChange { - StartedRunning, - Rebooted, - Stopped, - Failed, +pub(super) enum CompletedRequest { + Start { succeeded: bool }, + Reboot, + MigrationOut { succeeded: bool }, + Stop, } -/// A reason for a change in the queue's request dispositions. -#[derive(Debug)] -enum DispositionChangeReason<'a> { - ApiRequest(&'a ExternalRequest), - StateChange(InstanceStateChange), -} +/// The queue's internal notion of the VM's runtime state. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +enum QueueState { + /// The instance has not started yet and no one has asked it to start. + NotStarted, -/// The possible methods of handling a request to queue a state change. -#[derive(Copy, Clone, Debug)] -enum RequestDisposition { - /// Put the state change on the queue. - Enqueue, + /// The instance is not running yet, but it's on its way there: either the + /// state driver is actively trying to start it, or there is a request that, + /// when processed, will direct the driver to start the instance. + StartPending, + + /// The instance has successfully started. + Running, - /// Drop the state change silently. This is used to make requests appear - /// idempotent to callers without making the state driver deal with the - /// consequences of queuing the same state change request twice. - Ignore, + /// The instance has stopped due to a migration out. + MigratedOut, + + /// The instance has shut down. + Stopped, - /// Deny the request to change state. - Deny(RequestDeniedReason), + /// The instance failed to start. + Failed, } -/// The current disposition for each kind of incoming request. -#[derive(Copy, Clone, Debug)] -struct AllowedRequests { - start: RequestDisposition, - migrate_as_source: RequestDisposition, - reboot: RequestDisposition, - mutate: RequestDisposition, - stop: RequestDisposition, +impl QueueState { + /// If `self` is a state in which new change requests should be denied + /// unconditionally, returns a `Some` containing an appropriate + /// [`RequestDeniedReason`]; returns `None` otherwise. + fn deny_reason(&self) -> Option { + match self { + Self::MigratedOut => Some(RequestDeniedReason::MigratedOut), + Self::Stopped => Some(RequestDeniedReason::Halted), + Self::Failed => Some(RequestDeniedReason::InstanceFailed), + _ => None, + } + } } -/// A queue for external requests to change an instance's state. #[derive(Debug)] -pub struct ExternalRequestQueue { - queue: VecDeque, - allowed: AllowedRequests, +pub(super) struct ExternalRequestQueue { + /// The queue of unprocessed state change requests. + state_queue: VecDeque, + + /// The queue of unprocessed component change requests. + component_queue: VecDeque, + + /// The "effective" (for purposes of deciding how to dispose of requests) + /// state of the instance associated with this queue. + state: QueueState, + + /// True if this queue has enqueued a reboot request that has not been + /// completed by the state driver. + awaiting_reboot: bool, + + /// True if this queue has enqueued a request to migrate out that has not + /// been completed by the state driver. + awaiting_migration_out: bool, + + /// True if this queue has enqueued a stop request that has not been + /// completed by the state driver. + awaiting_stop: bool, + + /// The queue's logger. log: Logger, } /// Indicates whether this queue's creator will start the relevant instance /// without waiting for a Start request from the queue. -pub enum InstanceAutoStart { +pub(super) enum InstanceAutoStart { Yes, No, } @@ -192,220 +288,262 @@ pub enum InstanceAutoStart { impl ExternalRequestQueue { /// Creates a new queue that logs to the supplied logger. pub fn new(log: Logger, auto_start: InstanceAutoStart) -> Self { - // If the queue is being created for an instance that will start - // automatically (e.g. due to a migration in), set the request - // disposition for future start requests to Ignore for idempotency. - let start = match auto_start { - InstanceAutoStart::Yes => RequestDisposition::Ignore, - InstanceAutoStart::No => RequestDisposition::Enqueue, + let instance_state = match auto_start { + InstanceAutoStart::Yes => QueueState::StartPending, + InstanceAutoStart::No => QueueState::NotStarted, }; Self { - queue: VecDeque::new(), - allowed: AllowedRequests { - start, - migrate_as_source: RequestDisposition::Deny( - RequestDeniedReason::InstanceNotActive, - ), - reboot: RequestDisposition::Deny( - RequestDeniedReason::InstanceNotActive, - ), - mutate: RequestDisposition::Deny( - RequestDeniedReason::InstanceNotActive, - ), - stop: RequestDisposition::Enqueue, - }, + state_queue: Default::default(), + component_queue: Default::default(), + + state: instance_state, + awaiting_reboot: false, + awaiting_migration_out: false, + awaiting_stop: false, log, } } - /// Pops the request at the front of the queue. + /// Pops the next request off of the queue. If the queue contains both state + /// change and component change requests, the next state change request is + /// popped first (even if it arrived later in time than the next component + /// change request). pub fn pop_front(&mut self) -> Option { - self.queue.pop_front() + if let Some(state_change) = self.state_queue.pop_front() { + Some(ExternalRequest::State(state_change)) + } else { + self.component_queue.pop_front().map(ExternalRequest::Component) + } } /// Indicates whether the queue is empty. #[cfg(test)] pub fn is_empty(&self) -> bool { - self.queue.is_empty() + self.state_queue.is_empty() && self.component_queue.is_empty() } - /// Asks to place the supplied request on the queue. If the request is - /// enqueued, updates the dispositions to use for future requests. + /// Attempts to replace the supplied `request` on the queue, returning `Ok` + /// if the request was accepted and an `Err` otherwise. In the latter case, + /// the error contains a [`RequestDeniedReason`] that describes why the + /// request was rejected. pub fn try_queue( &mut self, request: ExternalRequest, ) -> Result<(), RequestDeniedReason> { - let disposition = match request { - ExternalRequest::Start => self.allowed.start, - ExternalRequest::MigrateAsSource { .. } => { - self.allowed.migrate_as_source + let should_queue = self.should_queue(&request); + match should_queue { + Ok(true) => { + info!( + &self.log, + "enqueued external request"; + "request" => ?request + ); } - ExternalRequest::Reboot => self.allowed.reboot, - ExternalRequest::ReconfigureCrucibleVolume { .. } => { - self.allowed.mutate + Ok(false) => { + info!( + &self.log, + "ignored external request"; + "request" => ?request + ); + + return Ok(()); } + Err(reason) => { + info!( + &self.log, + "denied external request"; + "request" => ?request, + "reason" => %reason + ); - // Requests to stop always succeed. Note that a request to stop a VM - // that hasn't started should still be queued to the state worker so - // that the worker can exit and drop its references to the instance. - ExternalRequest::Stop => self.allowed.stop, - }; + return Err(reason); + } + } - info!(&self.log, "Queuing external request"; - "request" => ?request, - "disposition" => ?disposition); + match request { + ExternalRequest::State(StateChangeRequest::Start) => { + assert_eq!(self.state, QueueState::NotStarted); + self.state = QueueState::StartPending; + } + ExternalRequest::State(StateChangeRequest::MigrateAsSource { + .. + }) => { + assert!(!self.awaiting_migration_out); + self.awaiting_migration_out = true; + } + ExternalRequest::State(StateChangeRequest::Reboot) => { + assert!(!self.awaiting_reboot); + self.awaiting_reboot = true; + } + ExternalRequest::State(StateChangeRequest::Stop) => { + assert!(!self.awaiting_stop); + self.awaiting_stop = true; + } + ExternalRequest::Component(_) => {} + } - match disposition { - RequestDisposition::Enqueue => {} - RequestDisposition::Ignore => return Ok(()), - RequestDisposition::Deny(reason) => return Err(reason), - }; + match request { + ExternalRequest::State(s) => self.state_queue.push_back(s), + ExternalRequest::Component(c) => self.component_queue.push_back(c), + } - self.allowed = self.get_new_dispositions( - DispositionChangeReason::ApiRequest(&request), - ); - self.queue.push_back(request); Ok(()) } - /// Notifies the queue that the instance's state has changed and that its - /// disposition should be updated accordingly. - pub fn notify_instance_state_change(&mut self, state: InstanceStateChange) { - self.allowed = self - .get_new_dispositions(DispositionChangeReason::StateChange(state)); - } - - /// Computes a new set of queue dispositions given the current state of the - /// queue and the event that is changing those dispositions. - fn get_new_dispositions( - &self, - reason: DispositionChangeReason, - ) -> AllowedRequests { - debug!(self.log, "Computing new queue dispositions"; - "reason" => ?reason); - - use DispositionChangeReason as ChangeReason; - use RequestDeniedReason as DenyReason; - use RequestDisposition as Disposition; - match reason { - ChangeReason::ApiRequest(ExternalRequest::Start) => { - let reason = DenyReason::StartInProgress; - AllowedRequests { - start: Disposition::Ignore, - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: self.allowed.stop, - } + /// Determines whether the supplied `request` should be queued, returning: + /// + /// - `Ok(true)` if the request was enqueued, + /// - `Ok(false)` if the request was ignored, and + /// - `Err(reason)` if the request was denied. + fn should_queue( + &mut self, + request: &ExternalRequest, + ) -> Result { + // If the queue is in a terminal state, deny the request straightaway + // (unless it's a stop request, which can be ignored for idempotency). + if let Some(reason) = self.state.deny_reason() { + if request.is_stop() { + return Ok(false); + } else { + return Err(reason); } - ChangeReason::ApiRequest(ExternalRequest::MigrateAsSource { - .. - }) => { - assert!( - matches!(self.allowed.start, Disposition::Ignore), - "{:?}", - self.allowed - ); - - AllowedRequests { - start: self.allowed.start, - migrate_as_source: Disposition::Deny( - DenyReason::AlreadyMigrationSource, - ), - reboot: Disposition::Deny( - DenyReason::InvalidRequestForMigrationSource, - ), - mutate: Disposition::Deny( - DenyReason::InvalidRequestForMigrationSource, - ), - stop: self.allowed.stop, + } else { + // The instance hasn't stopped yet, so consider this request in + // light of its current state and the other as-yet unprocessed + // requests from the queue. + // + // In general, try to make state change requests idempotent by + // ignoring new requests when a request of the appropriate kind is + // already on the queue, and deny requests to reach a state that is + // precluded by an earlier state change request. + match request { + // Interpret start requests as requests to reach the Running + // state. + ExternalRequest::State(StateChangeRequest::Start) => { + if self.awaiting_stop { + return Err(RequestDeniedReason::HaltPending); + } else if self.state != QueueState::NotStarted { + return Ok(false); + } } - } - // Requests to reboot prevent additional reboot requests from being - // queued, but do not affect other operations. - ChangeReason::ApiRequest(ExternalRequest::Reboot) => { - assert!( - matches!(self.allowed.start, Disposition::Ignore), - "{:?}", - self.allowed - ); - AllowedRequests { reboot: Disposition::Ignore, ..self.allowed } - } + // Only allow one attempt to migrate out at a time (if it works + // the VM can't migrate out again), and only allow migration out + // after an instance begins to run. + ExternalRequest::State( + StateChangeRequest::MigrateAsSource { .. }, + ) => { + if self.awaiting_migration_out { + return Err( + RequestDeniedReason::AlreadyMigrationSource, + ); + } else if self.awaiting_stop { + return Err(RequestDeniedReason::HaltPending); + } else if self.state == QueueState::NotStarted { + return Err(RequestDeniedReason::InstanceNotActive); + } + } - // Requests to stop the instance block other requests from being - // queued. Additional requests to stop are ignored for idempotency. - ChangeReason::ApiRequest(ExternalRequest::Stop) => { - let reason = DenyReason::HaltPending; - AllowedRequests { - start: Disposition::Deny(reason), - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: Disposition::Ignore, + // Treat reboot requests as a request to take a VM that has + // already started, reset its state, and resume the VM. If the + // VM migrates out first, this request needs to be directed to + // the target, so reject it here to allow the caller to wait for + // the migration to resolve. + ExternalRequest::State(StateChangeRequest::Reboot) => { + if self.awaiting_migration_out { + return Err( + RequestDeniedReason::InvalidForMigrationSource, + ); + } else if self.awaiting_stop { + return Err(RequestDeniedReason::HaltPending); + } else if self.state == QueueState::NotStarted { + return Err(RequestDeniedReason::InstanceNotActive); + } else if self.state == QueueState::StartPending { + return Err(RequestDeniedReason::StartInProgress); + } else if self.awaiting_reboot { + return Ok(false); + } } - } - // Requests to mutate VM configuration don't move the VM state - // machine and don't change any request dispositions. - ChangeReason::ApiRequest( - ExternalRequest::ReconfigureCrucibleVolume { .. }, - ) => self.allowed, - - // When an instance begins running, requests to migrate out of it or - // to reboot it become valid. - ChangeReason::StateChange(InstanceStateChange::StartedRunning) => { - AllowedRequests { - start: self.allowed.start, - migrate_as_source: Disposition::Enqueue, - reboot: Disposition::Enqueue, - mutate: Disposition::Enqueue, - stop: self.allowed.stop, + // Always queue requests to stop a VM unless one is already + // present. + // + // Note that if the VM migrates out before this request is + // processed, then the "logical" VM is still running (in another + // Propolis). The client is responsible for tracking any + // outstanding migrations and directing its stop requests + // accordingly. + ExternalRequest::State(StateChangeRequest::Stop) => { + if self.awaiting_stop { + return Ok(false); + } } + + // Always enqueue component change requests, even if the VM has + // a pending request to stop or migrate out. This allows the + // state driver to process these requests during a state change, + // which may be necessary to complete that state change. If the + // change request is canceled by a later state transition, the + // queue can use the request data to notify the requestor. + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + .. + }, + ) => {} } + }; - // When an instance finishes rebooting, allow new reboot requests to - // be queued again, unless reboot requests began to be denied in the - // meantime. - ChangeReason::StateChange(InstanceStateChange::Rebooted) => { - let new_reboot = - if let Disposition::Ignore = self.allowed.reboot { - Disposition::Enqueue - } else { - self.allowed.reboot - }; + Ok(true) + } - AllowedRequests { reboot: new_reboot, ..self.allowed } - } + /// Notifies this queue that the caller has finished processing a + /// previously-dequeued request, allowing the queue to adjust its + /// dispositions in response. + pub(super) fn notify_request_completed(&mut self, req: CompletedRequest) { + info!( + &self.log, + "queue notified of request completion"; + "request" => ?req + ); - // When an instance stops or fails, requests to do anything other - // than stop it are denied with an appropriate deny reason. Note - // that an instance may stop or fail due to guest activity, so the - // previous dispositions for migrate and reboot requests may not be - // "deny". - ChangeReason::StateChange(InstanceStateChange::Stopped) => { - let reason = DenyReason::InstanceNotActive; - AllowedRequests { - start: Disposition::Deny(reason), - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: Disposition::Ignore, + match req { + CompletedRequest::Start { succeeded } => { + assert_eq!(self.state, QueueState::StartPending); + if succeeded { + self.state = QueueState::Running; + } else { + self.state = QueueState::Failed; } } - ChangeReason::StateChange(InstanceStateChange::Failed) => { - let reason = DenyReason::InstanceFailed; - AllowedRequests { - start: Disposition::Deny(reason), - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: self.allowed.stop, + CompletedRequest::Reboot => { + assert_eq!(self.state, QueueState::Running); + assert!(self.awaiting_reboot); + self.awaiting_reboot = false; + } + CompletedRequest::MigrationOut { succeeded } => { + assert_eq!(self.state, QueueState::Running); + assert!(self.awaiting_migration_out); + self.awaiting_migration_out = false; + if succeeded { + self.state = QueueState::MigratedOut; } } + CompletedRequest::Stop => { + assert!(self.awaiting_stop); + self.awaiting_stop = false; + self.state = QueueState::Stopped; + } } } + + /// Notifies this queue that the instance has stopped. This routine is meant + /// to be used in cases where an instance stops for reasons other than an + /// external request (e.g., a guest-requested chipset-driven shutdown). + pub(super) fn notify_stopped(&mut self) { + info!(&self.log, "queue notified that VM has stopped"); + self.state = QueueState::Stopped; + } } // It's possible for an external request queue to be dropped with outstanding @@ -414,11 +552,22 @@ impl ExternalRequestQueue { // queue that the VM is gone. impl Drop for ExternalRequestQueue { fn drop(&mut self) { - for req in self.queue.drain(..) { + // No special handling is needed for the state change queue: + // + // - Requests to start, reboot, and stop are handled asynchronously + // (calls to change the instance's state return as soon as they're + // queued). + // - Requests to migrate out contain a connection to the migration + // target; dropping this connection tells the target the source is + // gone. + // + // Drain the component change request queue and send messages to + // requestors telling them that their requests have been canceled. + for req in self.component_queue.drain(..) { match req { // Crucible VCR change requestors wait for their requests to be // retired. - ExternalRequest::ReconfigureCrucibleVolume { + ComponentChangeRequest::ReconfigureCrucibleVolume { result_tx, .. } => { @@ -431,18 +580,6 @@ impl Drop for ExternalRequestQueue { hyper::StatusCode::GONE, ))); } - - // Requests to start, reboot, and stop are handled - // asynchronously (calls to change the instance's state return - // as soon as they're queued). - ExternalRequest::Start - | ExternalRequest::Reboot - | ExternalRequest::Stop => {} - - // Dropping a request to migrate out drops the embedded - // connection to the migration target, thus notifying it that - // the source is gone. - ExternalRequest::MigrateAsSource { .. } => {} } } } @@ -452,6 +589,7 @@ impl Drop for ExternalRequestQueue { mod test { use super::*; + use proptest::prelude::*; use uuid::Uuid; fn test_logger() -> slog::Logger { @@ -459,48 +597,103 @@ mod test { } fn make_migrate_as_source_request() -> ExternalRequest { - ExternalRequest::MigrateAsSource { + ExternalRequest::State(StateChangeRequest::MigrateAsSource { migration_id: Uuid::new_v4(), websock: WebsocketConnection(None), - } + }) } fn make_reconfigure_crucible_request() -> ExternalRequest { let (tx, _rx) = tokio::sync::oneshot::channel(); - ExternalRequest::ReconfigureCrucibleVolume { - backend_id: SpecKey::Uuid(Uuid::new_v4()), - new_vcr_json: "".to_string(), - result_tx: tx, + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id: SpecKey::Uuid(Uuid::new_v4()), + new_vcr_json: "".to_string(), + result_tx: tx, + }, + ) + } + + impl ExternalRequest { + #[track_caller] + fn assert_start(&self) { + assert!( + matches!(self, Self::State(StateChangeRequest::Start)), + "expected start request, got {self:?}" + ); + } + + #[track_caller] + fn assert_stop(&self) { + assert!(self.is_stop(), "expected stop request, got {self:?}"); + } + + #[track_caller] + fn assert_reboot(&self) { + assert!( + matches!(self, Self::State(StateChangeRequest::Reboot)), + "expected reboot request, got {self:?}" + ); + } + + #[track_caller] + fn assert_migrate_as_source(&self) { + assert!( + matches!( + self, + Self::State(StateChangeRequest::MigrateAsSource { .. }) + ), + "expected migrate as source request, got {self:?}" + ); + } + + #[track_caller] + fn assert_reconfigure_crucible(&self) { + assert!( + matches!( + self, + Self::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { .. } + ) + ), + "expected Crucible reconfiguration request, got {self:?}" + ); } } - #[tokio::test] - async fn start_requests_become_idempotent_after_first_request() { + #[test] + fn start_requests_become_idempotent_after_first_request() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); // The first request to start should succeed. - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); + assert!(queue.try_queue(ExternalRequest::start()).is_ok()); // The second one should too, but only for idempotency: the queue should // then have only one start request on it. - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Start))); - assert!(queue.pop_front().is_none()); + assert!(queue.try_queue(ExternalRequest::start()).is_ok()); + queue.pop_front().unwrap().assert_start(); + assert!(queue.is_empty()); // Start requests continue to be ignored even after the instance starts // to run. - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); - assert!(queue.pop_front().is_none()); + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); + + assert!(queue.try_queue(ExternalRequest::start()).is_ok()); + assert!(queue.is_empty()); } - #[tokio::test] - async fn migrate_as_source_is_not_idempotent() { + #[test] + fn migrate_as_source_is_not_idempotent() { // Simulate a running instance. let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); // Requests to migrate out should be allowed. assert!(queue.try_queue(make_migrate_as_source_request()).is_ok()); @@ -517,104 +710,590 @@ mod test { // If migration fails, the instance resumes running, and then another // request to migrate out should be allowed. - assert!(matches!( - queue.pop_front(), - Some(ExternalRequest::MigrateAsSource { .. }) - )); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + queue.pop_front().unwrap().assert_migrate_as_source(); + queue.notify_request_completed(CompletedRequest::MigrationOut { + succeeded: false, + }); + assert!(queue.try_queue(make_migrate_as_source_request()).is_ok()); // A successful migration stops the instance, which forecloses on future // requests to migrate out. queue.pop_front(); - queue.notify_instance_state_change(InstanceStateChange::Stopped); + queue.notify_request_completed(CompletedRequest::MigrationOut { + succeeded: true, + }); + assert!(queue.try_queue(make_migrate_as_source_request()).is_err()); } - #[tokio::test] - async fn stop_requests_enqueue_after_vm_failure() { + #[test] + fn stop_requests_are_idempotent() { let mut queue = - ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); - queue.notify_instance_state_change(InstanceStateChange::Failed); + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); - assert!(queue.try_queue(ExternalRequest::Stop).is_ok()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Stop))); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + queue.pop_front().unwrap().assert_stop(); + assert!(queue.is_empty()); } - #[tokio::test] - async fn reboot_requests_are_idempotent_except_when_stopping() { + #[test] + fn stop_requests_ignored_after_vm_failure() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: false, + }); + + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + assert!(queue.is_empty()); + } + + #[test] + fn reboot_requests_are_idempotent_except_when_stopping() { + let mut queue = + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); // Once the instance is started, reboot requests should be allowed, but // after the first, subsequent requests should be dropped for // idempotency. assert!(queue.is_empty()); for _ in 0..5 { - assert!(queue.try_queue(ExternalRequest::Reboot).is_ok()); + assert!(queue.try_queue(ExternalRequest::reboot()).is_ok()); } - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Reboot))); + queue.pop_front().unwrap().assert_reboot(); assert!(queue.is_empty()); // Once the instance has rebooted, new requests can be queued. - queue.notify_instance_state_change(InstanceStateChange::Rebooted); - assert!(queue.try_queue(ExternalRequest::Reboot).is_ok()); - assert!(!queue.is_empty()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Reboot))); - queue.notify_instance_state_change(InstanceStateChange::Rebooted); + queue.notify_request_completed(CompletedRequest::Reboot); + assert!(queue.try_queue(ExternalRequest::reboot()).is_ok()); + queue.pop_front().unwrap().assert_reboot(); + queue.notify_request_completed(CompletedRequest::Reboot); // If a request to reboot is queued, and then a request to stop is // queued, new requests to reboot should always fail, even after the // instance finishes rebooting. - assert!(queue.try_queue(ExternalRequest::Reboot).is_ok()); + assert!(queue.try_queue(ExternalRequest::reboot()).is_ok()); assert!(!queue.is_empty()); - assert!(queue.try_queue(ExternalRequest::Stop).is_ok()); - assert!(queue.try_queue(ExternalRequest::Reboot).is_err()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Reboot))); - queue.notify_instance_state_change(InstanceStateChange::Rebooted); - assert!(queue.try_queue(ExternalRequest::Reboot).is_err()); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + assert!(queue.try_queue(ExternalRequest::reboot()).is_err()); + queue.pop_front().unwrap().assert_reboot(); + queue.notify_request_completed(CompletedRequest::Reboot); + assert!(queue.try_queue(ExternalRequest::reboot()).is_err()); } - #[tokio::test] - async fn mutation_requires_running_and_not_migrating_out() { + #[test] + fn mutation_disallowed_after_stopped() { let mut queue = - ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - // Mutating a VM before it has started is not allowed. - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); - // Merely dequeuing the start request doesn't allow mutation; the VM - // actually has to be running. - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Start))); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + queue.notify_request_completed(CompletedRequest::Stop); assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_ok()); - assert!(matches!( - queue.pop_front(), - Some(ExternalRequest::ReconfigureCrucibleVolume { .. }) - )); - - // Successfully requesting migration out should block new mutation - // requests (they should wait for the migration to resolve and then go - // to the target). - assert!(queue.try_queue(make_migrate_as_source_request()).is_ok()); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); - - // But if the VM resumes (due to a failed migration out) these requests - // should succeed again. - assert!(queue.pop_front().is_some()); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_ok()); } #[tokio::test] - async fn mutation_disallowed_after_stop() { + async fn vcr_requests_canceled_when_queue_drops() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - queue.notify_instance_state_change(InstanceStateChange::Stopped); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let req = ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id: SpecKey::Uuid(Uuid::new_v4()), + new_vcr_json: "".to_string(), + result_tx: tx, + }, + ); + + assert!(queue.try_queue(req).is_ok()); + drop(queue); + let err = rx.await.unwrap().unwrap_err(); + assert_eq!(err.status_code, hyper::StatusCode::GONE); + } + + /// A helper for generating requests as part of a property testing strategy. + /// `proptest` requires values that are the output of a `Strategy` to be + /// `Clone`, which `ExternalRequest` is not. To get around this, create a + /// strategy that returns variants of this enum and have a `From` impl that + /// then creates requests of the appropriate kind. + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + enum RequestKind { + Start { will_succeed: bool }, + Stop, + Reboot, + Migrate { will_succeed: bool }, + ReconfigureCrucible, + } + + impl From for ExternalRequest { + fn from(value: RequestKind) -> Self { + match value { + RequestKind::Start { will_succeed: _ } => { + ExternalRequest::start() + } + RequestKind::Stop => ExternalRequest::stop(), + RequestKind::Reboot => ExternalRequest::reboot(), + RequestKind::Migrate { will_succeed: _ } => { + make_migrate_as_source_request() + } + RequestKind::ReconfigureCrucible => { + make_reconfigure_crucible_request() + } + } + } + } + + fn request_strategy() -> impl Strategy { + prop_oneof![ + Just(RequestKind::Start { will_succeed: true }), + Just(RequestKind::Start { will_succeed: false }), + Just(RequestKind::Stop), + Just(RequestKind::Reboot), + Just(RequestKind::Migrate { will_succeed: true }), + Just(RequestKind::Migrate { will_succeed: false }), + Just(RequestKind::ReconfigureCrucible), + ] + } + + proptest! { + // Tests the behavior of the request queue in circumstances where start + // requests are queued, but never actually acknowledged. + #[test] + fn request_queuing_before_start_acknowledged( + reqs in prop::collection::vec(request_strategy(), 0..100) + ) { + let mut queue = + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); + + let mut started = false; + let mut stop_requested = false; + let mut migrating_out = false; + for req in reqs { + let result = queue.try_queue(req.into()); + match req { + RequestKind::Start { .. } => { + if !stop_requested { + assert!(result.is_ok()); + started = true; + } else { + assert!(result.is_err()); + } + } + + RequestKind::Stop => { + assert!(result.is_ok()); + stop_requested = true; + } + + RequestKind::Reboot => { + assert!(result.is_err()); + } + + RequestKind::Migrate { .. } => { + if started && !stop_requested && !migrating_out { + assert!(result.is_ok()); + migrating_out = true; + } else { + assert!(result.is_err()); + } + } + + RequestKind::ReconfigureCrucible => { + assert!(result.is_ok()); + } + } + } + } + + // Tests the behavior of the request queue in circumstances where every + // request made of the state driver completes immediately. + #[test] + fn request_queuing_with_immediate_dequeueing( + reqs in prop::collection::vec(request_strategy(), 0..100) + ) { + let mut queue = + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); + + // True once a start request has been queued. + let mut start_requested = false; + + // True once the VM reaches a terminal state (stopped, failed, + // migrated out). + let mut halted = false; + for req in reqs { + let result = queue.try_queue(req.into()); + match req { + // Start requests always succeed (though they may be + // ignored and not queued) on a non-halted VM. + RequestKind::Start { will_succeed } => { + if !halted { + assert!(result.is_ok()); + + // This request is only enqueued if it is the first + // request to start. + if !start_requested { + start_requested = true; + queue.pop_front().unwrap().assert_start(); + let completed = CompletedRequest::Start { + succeeded: will_succeed + }; + + queue.notify_request_completed(completed); + + // Telling the queue that a start attempt failed + // moves the queue to a terminal state. + if !will_succeed { + halted = true; + } + } else { + assert!(queue.is_empty()); + } + } else { + assert!(result.is_err()); + assert!(queue.is_empty()); + } + } + + // Stop requests always succeed (they are never denied), but + // they are ignored for VMs that have already halted. + RequestKind::Stop => { + assert!(result.is_ok()); + if !halted { + queue.pop_front().unwrap().assert_stop(); + queue.notify_request_completed( + CompletedRequest::Stop + ); + + halted = true; + } else { + assert!(queue.is_empty()); + } + } + + // Reboot requests are always enqueued if the VM is active. + // They are ignored if there's a pending migration out, but + // in this test there is never a *pending* migration out, + // since all requests are dequeued and processed + // immediately. + RequestKind::Reboot => { + if start_requested && !halted { + assert!(result.is_ok()); + queue.pop_front().unwrap().assert_reboot(); + queue.notify_request_completed( + CompletedRequest::Reboot + ); + } else { + assert!(result.is_err()); + assert!(queue.is_empty()); + } + } + + // Migration requests have the same disposition as reboot + // requests. + RequestKind::Migrate { will_succeed } => { + if start_requested && !halted { + assert!(result.is_ok()); + queue + .pop_front() + .unwrap() + .assert_migrate_as_source(); + + + let completed = CompletedRequest::MigrationOut { + succeeded: will_succeed + }; + + queue.notify_request_completed(completed); + if will_succeed { + halted = true; + } + } else { + assert!(result.is_err()); + assert!(queue.is_empty()); + } + } + + // Crucible reconfiguration requests are always queued for + // unhalted VMs. + RequestKind::ReconfigureCrucible => { + if !halted { + assert!(result.is_ok()); + queue + .pop_front() + .unwrap() + .assert_reconfigure_crucible(); + } else { + assert!(result.is_err()); + assert!(queue.is_empty()); + } + } + } + } + } + } + + /// An operation that can be performed during a [`QueueDequeueTest`]. + #[derive(Clone, Copy, Debug)] + enum QueueOp { + Enqueue(RequestKind), + Dequeue, + } + + fn queue_op_strategy() -> impl Strategy { + prop_oneof![ + request_strategy().prop_map(QueueOp::Enqueue), + Just(QueueOp::Dequeue) + ] + } + + /// A helper that queues and dequeues requests in a proptest-generated + /// order and that sends fake completion notifications back to the request + /// queue. + struct QueueDequeueTest { + /// The external request queue under test. + queue: ExternalRequestQueue, + + /// The set of state change requests that the helper expects to see from + /// the external queue. + expected_state: VecDeque, + + /// The set of component change requests that the helper expects to see + /// from the external queue. + expected_component: VecDeque, + + /// True if the helper has queued a request to start its fake VM. + start_requested: bool, + + /// True if the helper has successfully started its fake VM. + started: bool, + + /// True if the helper has queued a request to stop its fake VM. + stop_requested: bool, + + /// True if the helper has an outstanding request to reboot its fake VM. + reboot_requested: bool, + + /// True if the helper has an outstanding request to migrate its fake + /// VM. + migrate_out_requested: bool, + + /// True if the fake VM is halted (for any reason). + halted: bool, + } + + impl QueueDequeueTest { + fn new() -> Self { + Self { + queue: ExternalRequestQueue::new( + test_logger(), + InstanceAutoStart::No, + ), + expected_state: Default::default(), + expected_component: Default::default(), + start_requested: false, + started: false, + stop_requested: false, + reboot_requested: false, + migrate_out_requested: false, + halted: false, + } + } + + fn run(&mut self, ops: Vec) { + for op in ops { + match op { + QueueOp::Enqueue(request) => self.queue_request(request), + QueueOp::Dequeue => { + self.dequeue_request(); + if self.halted { + return; + } + } + } + } + } + + /// Submits the supplied `request` to the external request queue, + /// determines the expected result of that submission based on the + /// helper's current flags, and asserts that the result matches the + /// helper's expectation. If the helper expects the request to be + /// queued, it pushes an entry to its internal expected-change queues. + fn queue_request(&mut self, request: RequestKind) { + let result = self.queue.try_queue(request.into()); + match request { + RequestKind::Start { .. } => { + if self.halted || self.stop_requested { + assert!(result.is_err()); + return; + } + + assert!(result.is_ok()); + if !self.start_requested { + self.start_requested = true; + self.expected_state.push_back(request); + } + } + RequestKind::Stop => { + assert!(result.is_ok()); + if self.halted || self.stop_requested { + return; + } + + self.stop_requested = true; + self.expected_state.push_back(request); + } + RequestKind::Reboot => { + if !self.started + || self.halted + || self.stop_requested + || self.migrate_out_requested + { + assert!(result.is_err()); + return; + } + + assert!(result.is_ok()); + if !self.reboot_requested { + self.reboot_requested = true; + self.expected_state.push_back(request); + } + } + RequestKind::Migrate { .. } => { + if (!self.started && !self.start_requested) + || self.halted + || self.stop_requested + || self.migrate_out_requested + { + assert!(result.is_err()); + return; + } + + assert!(result.is_ok()); + self.expected_state.push_back(request); + self.migrate_out_requested = true; + } + RequestKind::ReconfigureCrucible => { + if self.halted { + assert!(result.is_err()); + return; + } + + assert!(result.is_ok()); + self.expected_component.push_back(request); + } + } + } + + /// Pops a request from the helper's external queue and verifies that it + /// matches the first request on the helper's expected-change queue. If + /// the requests do match, sends a completion notification to the + /// external queue. + fn dequeue_request(&mut self) { + let (dequeued, expected) = match ( + self.queue.pop_front(), + self.expected_state + .pop_front() + .or_else(|| self.expected_component.pop_front()), + ) { + (None, None) => return, + (Some(d), None) => { + panic!("dequeued request {d:?} but expected nothing") + } + (None, Some(e)) => { + panic!("expected request {e:?} but dequeued nothing") + } + (Some(d), Some(e)) => (d, e), + }; + + match (dequeued, expected) { + ( + ExternalRequest::State(StateChangeRequest::Start), + RequestKind::Start { will_succeed }, + ) => { + self.queue.notify_request_completed( + CompletedRequest::Start { succeeded: will_succeed }, + ); + if will_succeed { + self.started = true; + } else { + self.halted = true; + } + } + ( + ExternalRequest::State(StateChangeRequest::Stop), + RequestKind::Stop, + ) => { + self.queue.notify_request_completed(CompletedRequest::Stop); + self.halted = true; + } + ( + ExternalRequest::State(StateChangeRequest::Reboot), + RequestKind::Reboot, + ) => { + self.queue + .notify_request_completed(CompletedRequest::Reboot); + self.reboot_requested = false; + } + ( + ExternalRequest::State( + StateChangeRequest::MigrateAsSource { .. }, + ), + RequestKind::Migrate { will_succeed }, + ) => { + self.queue.notify_request_completed( + CompletedRequest::MigrationOut { + succeeded: will_succeed, + }, + ); + self.migrate_out_requested = false; + if will_succeed { + self.halted = true; + } + } + ( + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + .. + }, + ), + RequestKind::ReconfigureCrucible, + ) => {} + (d, e) => panic!( + "dequeued request {d:?} but expected to dequeue {e:?}\n\ + remaining queue: {:#?}\n\ + remaining expected (state): {:#?}\n\ + remaining expected (components): {:#?}", + self.queue, self.expected_state, self.expected_component + ), + } + } + } + + proptest! { + #[test] + fn request_queue_dequeue( + ops in prop::collection::vec(queue_op_strategy(), 0..100) + ) { + let mut test = QueueDequeueTest::new(); + test.run(ops); + } } } diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index e3ba3a4c8..a37967668 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -119,7 +119,10 @@ use super::{ }, guest_event::{self, GuestEvent}, objects::VmObjects, - request_queue::{self, ExternalRequest, InstanceAutoStart}, + request_queue::{ + self, CompletedRequest, ComponentChangeRequest, ExternalRequest, + InstanceAutoStart, StateChangeRequest, + }, state_publisher::{MigrationStateUpdate, StatePublisher}, InstanceEnsureResponseTx, }; @@ -227,15 +230,19 @@ impl InputQueue { } } - /// Notifies the external request queue that the instance's state has - /// changed so that it can change the dispositions for new state change - /// requests. - fn notify_instance_state_change( - &self, - state: request_queue::InstanceStateChange, - ) { + /// Notifies the external request queue that the state driver has completed + /// a request from that queue. + fn notify_request_completed(&self, state: CompletedRequest) { + let mut guard = self.inner.lock().unwrap(); + guard.external_requests.notify_request_completed(state); + } + + /// Notifies the external request queue that the instance has stopped. This + /// is used to stop the queue when the instance stops without a request from + /// the API (e.g. because the guest requested a chipset-driven shutdown). + fn notify_stopped(&self) { let mut guard = self.inner.lock().unwrap(); - guard.external_requests.notify_instance_state_change(state); + guard.external_requests.notify_stopped(); } /// Submits an external state change request to the queue. @@ -522,14 +529,23 @@ impl StateDriver { let start_result = self.objects.lock_exclusive().await.start(start_reason).await; + + self.input_queue.notify_request_completed(CompletedRequest::Start { + succeeded: start_result.is_ok(), + }); + match &start_result { Ok(()) => { - self.publish_steady_state(InstanceState::Running); + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Running, + )); } Err(e) => { error!(&self.log, "failed to start devices"; "error" => ?e); - self.publish_steady_state(InstanceState::Failed); + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Failed, + )); } } @@ -544,6 +560,11 @@ impl StateDriver { GuestEvent::VcpuSuspendHalt(_when) => { info!(self.log, "Halting due to VM suspend event",); self.do_halt().await; + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Stopped, + )); + + self.input_queue.notify_stopped(); HandleEventOutcome::Exit { final_state: InstanceState::Destroyed, } @@ -564,6 +585,11 @@ impl StateDriver { GuestEvent::ChipsetHalt => { info!(self.log, "Halting due to chipset-driven halt"); self.do_halt().await; + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Stopped, + )); + + self.input_queue.notify_stopped(); HandleEventOutcome::Exit { final_state: InstanceState::Destroyed, } @@ -581,7 +607,7 @@ impl StateDriver { request: ExternalRequest, ) -> HandleEventOutcome { match request { - ExternalRequest::Start => { + ExternalRequest::State(StateChangeRequest::Start) => { match self.start_vm(VmStartReason::ExplicitRequest).await { Ok(_) => HandleEventOutcome::Continue, Err(_) => HandleEventOutcome::Exit { @@ -589,31 +615,50 @@ impl StateDriver { }, } } - ExternalRequest::MigrateAsSource { migration_id, websock } => { - self.migrate_as_source(migration_id, websock.into_inner()) - .await; - - // The callee either queues its own stop request (on a - // successful migration out) or resumes the VM (on a failed - // migration out). Either way, the main loop can just proceed to - // process the queue as normal. - HandleEventOutcome::Continue + ExternalRequest::State(StateChangeRequest::MigrateAsSource { + migration_id, + websock, + }) => { + if self + .migrate_as_source(migration_id, websock.into_inner()) + .await + .is_ok() + { + self.do_halt().await; + HandleEventOutcome::Exit { + final_state: InstanceState::Destroyed, + } + } else { + HandleEventOutcome::Continue + } } - ExternalRequest::Reboot => { + ExternalRequest::State(StateChangeRequest::Reboot) => { self.do_reboot().await; + self.input_queue + .notify_request_completed(CompletedRequest::Reboot); + HandleEventOutcome::Continue } - ExternalRequest::Stop => { + ExternalRequest::State(StateChangeRequest::Stop) => { self.do_halt().await; + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Stopped, + )); + + self.input_queue + .notify_request_completed(CompletedRequest::Stop); + HandleEventOutcome::Exit { final_state: InstanceState::Destroyed, } } - ExternalRequest::ReconfigureCrucibleVolume { - backend_id, - new_vcr_json, - result_tx, - } => { + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id, + new_vcr_json, + result_tx, + }, + ) => { let _ = result_tx.send( self.reconfigure_crucible_volume(&backend_id, new_vcr_json) .await, @@ -633,9 +678,6 @@ impl StateDriver { // Notify other consumers that the instance successfully rebooted and is // now back to Running. - self.input_queue.notify_instance_state_change( - request_queue::InstanceStateChange::Rebooted, - ); self.external_state .update(ExternalStateUpdate::Instance(InstanceState::Running)); } @@ -658,34 +700,13 @@ impl StateDriver { guard.halt().await; } - - self.publish_steady_state(InstanceState::Stopped); - } - - fn publish_steady_state(&mut self, state: InstanceState) { - let change = match state { - InstanceState::Running => { - request_queue::InstanceStateChange::StartedRunning - } - InstanceState::Stopped => { - request_queue::InstanceStateChange::Stopped - } - InstanceState::Failed => request_queue::InstanceStateChange::Failed, - _ => panic!( - "Called publish_steady_state on non-terminal state {:?}", - state - ), - }; - - self.input_queue.notify_instance_state_change(change); - self.external_state.update(ExternalStateUpdate::Instance(state)); } async fn migrate_as_source( &mut self, migration_id: Uuid, websock: dropshot::WebsocketConnection, - ) { + ) -> Result<(), ()> { let conn = tokio_tungstenite::WebSocketStream::from_raw_socket( websock.into_inner(), tokio_tungstenite::tungstenite::protocol::Role::Server, @@ -712,7 +733,7 @@ impl StateDriver { }, )); - return; + return Err(()); } }; @@ -740,15 +761,25 @@ impl StateDriver { // On a successful migration out, the protocol promises to leave // the VM objects in a paused state, so don't pause them again. self.paused = true; - self.input_queue - .queue_external_request(ExternalRequest::Stop) - .expect("can always queue a request to stop"); + self.input_queue.notify_request_completed( + CompletedRequest::MigrationOut { succeeded: true }, + ); + + Ok(()) } Err(e) => { info!(self.log, "migration out failed, resuming"; "error" => ?e); - self.publish_steady_state(InstanceState::Running); + self.input_queue.notify_request_completed( + CompletedRequest::MigrationOut { succeeded: false }, + ); + + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Running, + )); + + Err(()) } } } diff --git a/bin/propolis-server/src/proptest-regressions/vm/request_queue.txt b/bin/propolis-server/src/proptest-regressions/vm/request_queue.txt new file mode 100644 index 000000000..123702a47 --- /dev/null +++ b/bin/propolis-server/src/proptest-regressions/vm/request_queue.txt @@ -0,0 +1,12 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 467749978aea2988f7790844904751ed5f0797f700949e702db74ae430a659e0 # shrinks to reqs = [Migrate, Stop] +cc 03ba07e9b5a99141bddd9b878bff86845a6da9eb1aa015b3afc7b3ebfed7a6d1 # shrinks to reqs = [Start, Stop, Migrate] +cc 67a067444d475068e86b43528884319ff178d6d9038a3d9223c32789f871baa3 # shrinks to reqs = [Start, Migrate] +cc b3df4b82bdb87e3533f4bd47f0a3ee8be21893c0afc15b472281b2a79006aadf # shrinks to reqs = [Migrate] +cc 3430b43ba860946e5feb7b3b0246623708efb1465dd4fe7a604ddf479d4dc3ae # shrinks to reqs = [Start { will_succeed: true }, Migrate { will_succeed: false }, Reboot] +cc 2e8b284223a88421aaed16749309839818c16efda4bc4d8d930a35cbdce018cd # shrinks to ops = [Enqueue(ReconfigureCrucible), Enqueue(Start { will_succeed: true }), Dequeue]