From 2412bbdb6e284458a350cf4c92c82f9eb84b4e30 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 4 Mar 2025 20:29:21 +0000 Subject: [PATCH 1/7] server: extend API request queue's memory of prior requests Today the server's API request queue decides whether to accept or reject incoming requests by storing a fixed disposition for each kind of request and adjusting those dispositions as new requests are queued or as a VM's state changes. This is not always enough information to maintain the proper dispositions. Consider the following example: 1. A VM begins to start. Reboot requests are denied in this state (since a VM that hasn't booted yet can't be rebooted). 2. A caller queues a request to stop the VM after it starts. All subsequent reboot requests should now be denied because the VM will halt before they can be serviced. 3. The VM successfully starts. The correct reboot disposition is still "deny," because there's a pending stop request, but there's no way to discern this from the prior reboot disposition (it was also "deny" before the stop request was queued!). To address this sort of problem, and to pave the way for better handling of Crucible VCR replacements during instance start, refactor the queue as follows: - Remember what kinds of outstanding requests have not been processed and dispose of requests based on this state and the queue's notion of the instance's overall state. - Break external API requests into "state change" and "component change" requests. This will allow them to be dequeued independently when the state driver wants to handle component changes without changing the order of state change requests. - Tweak the language the state driver uses to communicate with the queue: instead of saying "the VM is now in state X," the driver says "I handled (or failed to handle) a request of type Y." Tests: cargo test, PHD. --- bin/propolis-server/src/lib/vm/active.rs | 15 +- .../src/lib/vm/request_queue.rs | 836 +++++++++++------- .../src/lib/vm/state_driver.rs | 147 +-- 3 files changed, 608 insertions(+), 390 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/active.rs b/bin/propolis-server/src/lib/vm/active.rs index 8a812bf27..1da8f7ea4 100644 --- a/bin/propolis-server/src/lib/vm/active.rs +++ b/bin/propolis-server/src/lib/vm/active.rs @@ -63,9 +63,9 @@ impl ActiveVm { self.state_driver_queue .queue_external_request(match requested { - InstanceStateRequested::Run => ExternalRequest::Start, - InstanceStateRequested::Stop => ExternalRequest::Stop, - InstanceStateRequested::Reboot => ExternalRequest::Reboot, + InstanceStateRequested::Run => ExternalRequest::start(), + InstanceStateRequested::Stop => ExternalRequest::stop(), + InstanceStateRequested::Reboot => ExternalRequest::reboot(), }) .map_err(Into::into) } @@ -79,10 +79,7 @@ impl ActiveVm { websock: dropshot::WebsocketConnection, ) -> Result<(), VmError> { Ok(self.state_driver_queue.queue_external_request( - ExternalRequest::MigrateAsSource { - migration_id, - websock: websock.into(), - }, + ExternalRequest::migrate_as_source(migration_id, websock), )?) } @@ -107,11 +104,11 @@ impl ActiveVm { ) -> Result<(), VmError> { self.state_driver_queue .queue_external_request( - ExternalRequest::ReconfigureCrucibleVolume { + ExternalRequest::reconfigure_crucible_volume( backend_id, new_vcr_json, result_tx, - }, + ), ) .map_err(Into::into) } diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 30d26f59c..1d53e67d1 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -2,29 +2,22 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! Handles requests to change a Propolis server's state via the external API. +//! Handles requests to change a Propolis server's state or component +//! configuration via the external API. //! -//! An instance accepts or rejects requests to change state based on a -//! combination of its current state and its knowledge of the requests it has -//! previously queued but not processed yet. The latter knowledge is used to -//! reject requests that will never be fulfilled (because they're preceded by an -//! action that will forbid them; consider rebooting after stopping) or that may -//! need be to redirected to a migration target. +//! The queue accepts or rejects requests based on a combination of its current +//! state and its knowledge of requests that it has previously queued but that +//! have not yet been processed. The latter knowledge is used to reject requests +//! that will never be fulfilled (because a prior request preempts them) or that +//! may need to be redirected to a migration target. //! -//! The queue maintains a disposition for each kind of request that can be sent -//! to it, which allows that request to be enqueued, denied, or silently ignored -//! (for idempotency purposes). These dispositions can change as new requests -//! are queued. The queue also provides callbacks to the VM state driver that -//! allow the driver to advise the queue of state changes that further affect -//! what requests should be accepted. -//! -//! Users who want to share a queue must wrap it in the synchronization objects -//! of their choice. +//! The queue contains no synchronization of its own. Users who want to share a +//! queue between multiple threads must wrap it in a synchronization object. use std::collections::VecDeque; use propolis_api_types::instance_spec::SpecKey; -use slog::{debug, info, Logger}; +use slog::{info, Logger}; use thiserror::Error; use uuid::Uuid; @@ -53,9 +46,8 @@ impl WebsocketConnection { } } -/// An external request made of a VM controller via the server API. Handled by -/// the controller's state driver thread. -pub enum ExternalRequest { +/// A request to change a VM's runtime state. +pub(crate) enum StateChangeRequest { /// Asks the state worker to start a brand-new VM (i.e. not one initialized /// by live migration, which implicitly starts the VM). Start, @@ -71,7 +63,29 @@ pub enum ExternalRequest { /// Halts the VM. Note that this is not a graceful shutdown and does not /// coordinate with guest software. Stop, +} + +impl std::fmt::Debug for StateChangeRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Start => write!(f, "Start"), + Self::MigrateAsSource { migration_id, .. } => f + .debug_struct("MigrateAsSource") + .field("migration_id", migration_id) + .finish(), + Self::Reboot => write!(f, "Reboot"), + Self::Stop => write!(f, "Stop"), + } + } +} +/// A request to reconfigure a VM's components. +/// +/// NOTE: Successfully queuing a component change request does not guarantee +/// that the request will be processed, because it may be preempted by a VM +/// state change. If this happens the request will fail and notify the +/// submitter using whatever channel is appropriate for the request's type. +pub enum ComponentChangeRequest { /// Attempts to update the volume construction request for the supplied /// Crucible volume. /// @@ -91,16 +105,9 @@ pub enum ExternalRequest { }, } -impl std::fmt::Debug for ExternalRequest { +impl std::fmt::Debug for ComponentChangeRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Start => write!(f, "Start"), - Self::MigrateAsSource { migration_id, .. } => f - .debug_struct("MigrateAsSource") - .field("migration_id", migration_id) - .finish(), - Self::Reboot => write!(f, "Reboot"), - Self::Stop => write!(f, "Stop"), Self::ReconfigureCrucibleVolume { backend_id, .. } => f .debug_struct("ReconfigureCrucibleVolume") .field("backend_id", backend_id) @@ -109,10 +116,64 @@ impl std::fmt::Debug for ExternalRequest { } } +/// An external request made of a VM controller via the server API. Handled by +/// the controller's state driver thread. +#[derive(Debug)] +pub(crate) enum ExternalRequest { + /// A request to change the VM's runtime state. + State(StateChangeRequest), + + /// A request to reconfigure one of the VM's components. + Component(ComponentChangeRequest), +} + +impl ExternalRequest { + /// Constructs a VM start request. + pub fn start() -> Self { + Self::State(StateChangeRequest::Start) + } + + /// Constructs a VM stop request. + pub fn stop() -> Self { + Self::State(StateChangeRequest::Stop) + } + + /// Constructs a VM reboot request. + pub fn reboot() -> Self { + Self::State(StateChangeRequest::Reboot) + } + + /// Constructs a request to migrate a VM to another Propolis instance, using + /// `ws_conn` as the websocket connection to the migration target. + pub fn migrate_as_source( + migration_id: Uuid, + ws_conn: dropshot::WebsocketConnection, + ) -> Self { + Self::State(StateChangeRequest::MigrateAsSource { + migration_id, + websock: WebsocketConnection(Some(ws_conn)), + }) + } + + /// Constructs a request to update a Crucible volume's construction request. + /// The result of this request will be sent to the supplied `result_tx`. + pub fn reconfigure_crucible_volume( + backend_id: SpecKey, + new_vcr_json: String, + result_tx: super::CrucibleReplaceResultTx, + ) -> Self { + Self::Component(ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id, + new_vcr_json, + result_tx, + }) + } +} + /// A set of reasons why a request to queue an external state transition can /// fail. #[derive(Copy, Clone, Debug, Error)] -pub enum RequestDeniedReason { +pub(crate) enum RequestDeniedReason { #[error("Operation requires an active instance")] InstanceNotActive, @@ -123,35 +184,24 @@ pub enum RequestDeniedReason { AlreadyMigrationSource, #[error("Operation cannot be performed on a migration source")] - InvalidRequestForMigrationSource, + InvalidForMigrationSource, #[error("Instance is preparing to stop")] HaltPending, - #[error("Instance failed to start or halted due to a failure")] - InstanceFailed, -} + #[error("Instance has migrated out and is being torn down")] + MigratedOut, -/// The set of instance state changes that should change the dispositions of -/// future requests to the queue. -#[derive(Copy, Clone, Debug)] -pub enum InstanceStateChange { - StartedRunning, - Rebooted, - Stopped, - Failed, -} + #[error("Instance has already halted")] + Halted, -/// A reason for a change in the queue's request dispositions. -#[derive(Debug)] -enum DispositionChangeReason<'a> { - ApiRequest(&'a ExternalRequest), - StateChange(InstanceStateChange), + #[error("Instance failed to start or halted due to a failure")] + InstanceFailed, } /// The possible methods of handling a request to queue a state change. #[derive(Copy, Clone, Debug)] -enum RequestDisposition { +enum Disposition { /// Put the state change on the queue. Enqueue, @@ -164,27 +214,83 @@ enum RequestDisposition { Deny(RequestDeniedReason), } -/// The current disposition for each kind of incoming request. +/// A kind of request that can be popped from the queue and then completed. #[derive(Copy, Clone, Debug)] -struct AllowedRequests { - start: RequestDisposition, - migrate_as_source: RequestDisposition, - reboot: RequestDisposition, - mutate: RequestDisposition, - stop: RequestDisposition, +pub(super) enum CompletedRequest { + Start { succeeded: bool }, + Reboot, + MigrationOut { succeeded: bool }, + Stop, +} + +/// The queue's internal notion of the VM's runtime state. +#[derive(Copy, Clone, Debug)] +enum QueueState { + /// The instance has not started yet and no one has asked it to start. + NotStarted, + + /// The instance is not running yet, but the state driver will eventually + /// try to start it. + StartPending, + + /// The instance has successfully started. + Running, + + /// The instance has stopped due to a migration out. + MigratedOut, + + /// The instance has shut down. + Stopped, + + /// The instance failed to start. + Failed, +} + +impl QueueState { + /// If `self` is a state in which new change requests should be denied + /// unconditionally, returns a `Some` containing an appropriate + /// [`RequestDeniedReason`]; returns `None` otherwise. + fn deny_reason(&self) -> Option { + match self { + Self::MigratedOut => Some(RequestDeniedReason::MigratedOut), + Self::Stopped => Some(RequestDeniedReason::Halted), + Self::Failed => Some(RequestDeniedReason::InstanceFailed), + _ => None, + } + } } -/// A queue for external requests to change an instance's state. #[derive(Debug)] -pub struct ExternalRequestQueue { - queue: VecDeque, - allowed: AllowedRequests, +pub(super) struct ExternalRequestQueue { + /// The queue of unprocessed state change requests. + state_queue: VecDeque, + + /// The queue of unprocessed component change requests. + component_queue: VecDeque, + + /// The "effective" (for purposes of deciding how to dispose of requests) + /// state of the instance associated with this queue. + state: QueueState, + + /// True if this queue has enqueued a reboot request that has not been + /// completed by the state driver. + awaiting_reboot: bool, + + /// True if this queue has enqueued a stop request that has not been + /// completed by the state driver. + awaiting_migration_out: bool, + + /// True if this queue has enqueued a request to migrate out that has not + /// been completed by the state driver. + awaiting_stop: bool, + + /// The queue's logger. log: Logger, } /// Indicates whether this queue's creator will start the relevant instance /// without waiting for a Start request from the queue. -pub enum InstanceAutoStart { +pub(super) enum InstanceAutoStart { Yes, No, } @@ -192,64 +298,155 @@ pub enum InstanceAutoStart { impl ExternalRequestQueue { /// Creates a new queue that logs to the supplied logger. pub fn new(log: Logger, auto_start: InstanceAutoStart) -> Self { - // If the queue is being created for an instance that will start - // automatically (e.g. due to a migration in), set the request - // disposition for future start requests to Ignore for idempotency. - let start = match auto_start { - InstanceAutoStart::Yes => RequestDisposition::Ignore, - InstanceAutoStart::No => RequestDisposition::Enqueue, + let instance_state = match auto_start { + InstanceAutoStart::Yes => QueueState::StartPending, + InstanceAutoStart::No => QueueState::NotStarted, }; Self { - queue: VecDeque::new(), - allowed: AllowedRequests { - start, - migrate_as_source: RequestDisposition::Deny( - RequestDeniedReason::InstanceNotActive, - ), - reboot: RequestDisposition::Deny( - RequestDeniedReason::InstanceNotActive, - ), - mutate: RequestDisposition::Deny( - RequestDeniedReason::InstanceNotActive, - ), - stop: RequestDisposition::Enqueue, - }, + state_queue: Default::default(), + component_queue: Default::default(), + + state: instance_state, + awaiting_reboot: false, + awaiting_migration_out: false, + awaiting_stop: false, log, } } - /// Pops the request at the front of the queue. + /// Pops the next request off of the queue. If the queue contains both state + /// change and component change requests, the next state change request is + /// popped first (even if it arrived later in time than the next component + /// change request). pub fn pop_front(&mut self) -> Option { - self.queue.pop_front() + if let Some(state_change) = self.state_queue.pop_front() { + Some(ExternalRequest::State(state_change)) + } else { + self.component_queue.pop_front().map(ExternalRequest::Component) + } } /// Indicates whether the queue is empty. #[cfg(test)] pub fn is_empty(&self) -> bool { - self.queue.is_empty() + self.state_queue.is_empty() && self.component_queue.is_empty() } - /// Asks to place the supplied request on the queue. If the request is - /// enqueued, updates the dispositions to use for future requests. + /// Attempts to replace the supplied `request` on the queue, returning `Ok` + /// if the request was accepted and an `Err` otherwise. In the latter case, + /// the error contains a [`RequestDeniedReason`] that describes why the + /// request was rejected. pub fn try_queue( &mut self, request: ExternalRequest, ) -> Result<(), RequestDeniedReason> { - let disposition = match request { - ExternalRequest::Start => self.allowed.start, - ExternalRequest::MigrateAsSource { .. } => { - self.allowed.migrate_as_source - } - ExternalRequest::Reboot => self.allowed.reboot, - ExternalRequest::ReconfigureCrucibleVolume { .. } => { - self.allowed.mutate + // If the queue is in a terminal state, deny the request straightaway + // (unless it's a stop request, which can be ignored for idempotency). + let disposition = if let Some(reason) = self.state.deny_reason() { + if matches!( + request, + ExternalRequest::State(StateChangeRequest::Stop) + ) { + Disposition::Ignore + } else { + Disposition::Deny(reason) } + } else { + // The instance hasn't stopped yet, so consider this request in + // light of its current state and the other as-yet unprocessed + // requests from the queue. + // + // In general, try to make state change requests idempotent by + // ignoring new requests when a request of the appropriate kind is + // already on the queue, and deny requests to reach a state that is + // precluded by an earlier state change request. + match request { + // Interpret start requests as requests to reach the Running + // state. + ExternalRequest::State(StateChangeRequest::Start) => { + if self.awaiting_stop { + Disposition::Deny(RequestDeniedReason::HaltPending) + } else if let QueueState::NotStarted = self.state { + Disposition::Enqueue + } else { + Disposition::Ignore + } + } + + // Only allow one attempt to migrate out at a time (if it works + // the VM can't migrate out again), and only allow migration out + // after an instance begins to run. + ExternalRequest::State( + StateChangeRequest::MigrateAsSource { .. }, + ) => { + if self.awaiting_migration_out { + Disposition::Deny( + RequestDeniedReason::AlreadyMigrationSource, + ) + } else if self.awaiting_stop { + Disposition::Deny(RequestDeniedReason::HaltPending) + } else if matches!(self.state, QueueState::NotStarted) { + Disposition::Deny( + RequestDeniedReason::InstanceNotActive, + ) + } else { + Disposition::Enqueue + } + } + + // Treat reboot requests as a request to take a VM that has + // already started, reset its state, and resume the VM. If the + // VM migrates out first, this request needs to be directed to + // the target, so reject it here to allow the caller to wait for + // the migration to resolve. + ExternalRequest::State(StateChangeRequest::Reboot) => { + if self.awaiting_migration_out { + Disposition::Deny( + RequestDeniedReason::InvalidForMigrationSource, + ) + } else if self.awaiting_stop { + Disposition::Deny(RequestDeniedReason::HaltPending) + } else if matches!(self.state, QueueState::NotStarted) { + Disposition::Deny( + RequestDeniedReason::InstanceNotActive, + ) + } else if matches!(self.state, QueueState::StartPending) { + Disposition::Deny(RequestDeniedReason::StartInProgress) + } else if self.awaiting_reboot { + Disposition::Ignore + } else { + Disposition::Enqueue + } + } + + // As with reboots, deny requests to stop a VM that might + // migrate out first, since the request may need to be directed + // to the migration target. + ExternalRequest::State(StateChangeRequest::Stop) => { + if self.awaiting_migration_out { + Disposition::Deny( + RequestDeniedReason::InvalidForMigrationSource, + ) + } else if self.awaiting_stop { + Disposition::Ignore + } else { + Disposition::Enqueue + } + } - // Requests to stop always succeed. Note that a request to stop a VM - // that hasn't started should still be queued to the state worker so - // that the worker can exit and drop its references to the instance. - ExternalRequest::Stop => self.allowed.stop, + // Always enqueue component change requests, even if the VM has + // a pending request to stop or migrate out. This allows the + // state driver to process these requests during a state change, + // which may be necessary to complete that state change. If the + // change request is canceled by a later state transition, the + // queue can use the request data to notify the requestor. + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + .. + }, + ) => Disposition::Enqueue, + } }; info!(&self.log, "Queuing external request"; @@ -257,155 +454,90 @@ impl ExternalRequestQueue { "disposition" => ?disposition); match disposition { - RequestDisposition::Enqueue => {} - RequestDisposition::Ignore => return Ok(()), - RequestDisposition::Deny(reason) => return Err(reason), - }; - - self.allowed = self.get_new_dispositions( - DispositionChangeReason::ApiRequest(&request), - ); - self.queue.push_back(request); - Ok(()) - } - - /// Notifies the queue that the instance's state has changed and that its - /// disposition should be updated accordingly. - pub fn notify_instance_state_change(&mut self, state: InstanceStateChange) { - self.allowed = self - .get_new_dispositions(DispositionChangeReason::StateChange(state)); - } - - /// Computes a new set of queue dispositions given the current state of the - /// queue and the event that is changing those dispositions. - fn get_new_dispositions( - &self, - reason: DispositionChangeReason, - ) -> AllowedRequests { - debug!(self.log, "Computing new queue dispositions"; - "reason" => ?reason); - - use DispositionChangeReason as ChangeReason; - use RequestDeniedReason as DenyReason; - use RequestDisposition as Disposition; - match reason { - ChangeReason::ApiRequest(ExternalRequest::Start) => { - let reason = DenyReason::StartInProgress; - AllowedRequests { - start: Disposition::Ignore, - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: self.allowed.stop, + Disposition::Ignore => Ok(()), + Disposition::Deny(reason) => Err(reason), + Disposition::Enqueue => { + match request { + ExternalRequest::State(StateChangeRequest::Start) => { + assert!(matches!(self.state, QueueState::NotStarted)); + self.state = QueueState::StartPending; + } + ExternalRequest::State( + StateChangeRequest::MigrateAsSource { .. }, + ) => { + assert!(!self.awaiting_migration_out); + self.awaiting_migration_out = true; + } + ExternalRequest::State(StateChangeRequest::Reboot) => { + assert!(!self.awaiting_reboot); + self.awaiting_reboot = true; + } + ExternalRequest::State(StateChangeRequest::Stop) => { + assert!(!self.awaiting_stop); + self.awaiting_stop = true; + } + ExternalRequest::Component(_) => {} } - } - ChangeReason::ApiRequest(ExternalRequest::MigrateAsSource { - .. - }) => { - assert!( - matches!(self.allowed.start, Disposition::Ignore), - "{:?}", - self.allowed - ); - - AllowedRequests { - start: self.allowed.start, - migrate_as_source: Disposition::Deny( - DenyReason::AlreadyMigrationSource, - ), - reboot: Disposition::Deny( - DenyReason::InvalidRequestForMigrationSource, - ), - mutate: Disposition::Deny( - DenyReason::InvalidRequestForMigrationSource, - ), - stop: self.allowed.stop, + + match request { + ExternalRequest::State(s) => self.state_queue.push_back(s), + ExternalRequest::Component(c) => { + self.component_queue.push_back(c) + } } - } - // Requests to reboot prevent additional reboot requests from being - // queued, but do not affect other operations. - ChangeReason::ApiRequest(ExternalRequest::Reboot) => { - assert!( - matches!(self.allowed.start, Disposition::Ignore), - "{:?}", - self.allowed - ); - AllowedRequests { reboot: Disposition::Ignore, ..self.allowed } + Ok(()) } + } + } - // Requests to stop the instance block other requests from being - // queued. Additional requests to stop are ignored for idempotency. - ChangeReason::ApiRequest(ExternalRequest::Stop) => { - let reason = DenyReason::HaltPending; - AllowedRequests { - start: Disposition::Deny(reason), - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: Disposition::Ignore, - } - } + /// Notifies this queue that the caller has finished processing a + /// previously-dequeued request, allowing the queue to adjust its + /// dispositions in response. + pub(super) fn notify_request_completed(&mut self, req: CompletedRequest) { + info!( + &self.log, + "queue notified of request completion"; + "request" => ?req + ); - // Requests to mutate VM configuration don't move the VM state - // machine and don't change any request dispositions. - ChangeReason::ApiRequest( - ExternalRequest::ReconfigureCrucibleVolume { .. }, - ) => self.allowed, - - // When an instance begins running, requests to migrate out of it or - // to reboot it become valid. - ChangeReason::StateChange(InstanceStateChange::StartedRunning) => { - AllowedRequests { - start: self.allowed.start, - migrate_as_source: Disposition::Enqueue, - reboot: Disposition::Enqueue, - mutate: Disposition::Enqueue, - stop: self.allowed.stop, + match req { + CompletedRequest::Start { succeeded } => { + assert!(matches!(self.state, QueueState::StartPending)); + if succeeded { + self.state = QueueState::Running; + } else { + self.state = QueueState::Failed; } } - - // When an instance finishes rebooting, allow new reboot requests to - // be queued again, unless reboot requests began to be denied in the - // meantime. - ChangeReason::StateChange(InstanceStateChange::Rebooted) => { - let new_reboot = - if let Disposition::Ignore = self.allowed.reboot { - Disposition::Enqueue - } else { - self.allowed.reboot - }; - - AllowedRequests { reboot: new_reboot, ..self.allowed } + CompletedRequest::Reboot => { + assert!(matches!(self.state, QueueState::Running)); + assert!(self.awaiting_reboot); + self.awaiting_reboot = false; } - - // When an instance stops or fails, requests to do anything other - // than stop it are denied with an appropriate deny reason. Note - // that an instance may stop or fail due to guest activity, so the - // previous dispositions for migrate and reboot requests may not be - // "deny". - ChangeReason::StateChange(InstanceStateChange::Stopped) => { - let reason = DenyReason::InstanceNotActive; - AllowedRequests { - start: Disposition::Deny(reason), - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: Disposition::Ignore, + CompletedRequest::MigrationOut { succeeded } => { + assert!(matches!(self.state, QueueState::Running)); + assert!(self.awaiting_migration_out); + self.awaiting_migration_out = false; + if succeeded { + self.state = QueueState::MigratedOut; } } - ChangeReason::StateChange(InstanceStateChange::Failed) => { - let reason = DenyReason::InstanceFailed; - AllowedRequests { - start: Disposition::Deny(reason), - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: self.allowed.stop, - } + CompletedRequest::Stop => { + assert!(self.awaiting_stop); + self.awaiting_stop = false; + self.state = QueueState::Stopped; } } } + + /// Notifies this queue that the instance has stopped. This routine is meant + /// to be used in cases where an instance stops for reasons other than an + /// external request (e.g., a guest-requested chipset-driven shutdown). + pub(super) fn notify_stopped(&mut self) { + info!(&self.log, "queue notified that VM has stopped"); + self.state = QueueState::Stopped; + } } // It's possible for an external request queue to be dropped with outstanding @@ -414,11 +546,22 @@ impl ExternalRequestQueue { // queue that the VM is gone. impl Drop for ExternalRequestQueue { fn drop(&mut self) { - for req in self.queue.drain(..) { + // No special handling is needed for the state change queue: + // + // - Requests to start, reboot, and stop are handled asynchronously + // (calls to change the instance's state return as soon as they're + // queued). + // - Requests to migrate out contain a connection to the migration + // target; dropping this connection tells the target the source is + // gone. + // + // Drain the component change request queue and send messages to + // requestors telling them that their requests have been canceled. + for req in self.component_queue.drain(..) { match req { // Crucible VCR change requestors wait for their requests to be // retired. - ExternalRequest::ReconfigureCrucibleVolume { + ComponentChangeRequest::ReconfigureCrucibleVolume { result_tx, .. } => { @@ -431,18 +574,6 @@ impl Drop for ExternalRequestQueue { hyper::StatusCode::GONE, ))); } - - // Requests to start, reboot, and stop are handled - // asynchronously (calls to change the instance's state return - // as soon as they're queued). - ExternalRequest::Start - | ExternalRequest::Reboot - | ExternalRequest::Stop => {} - - // Dropping a request to migrate out drops the embedded - // connection to the migration target, thus notifying it that - // the source is gone. - ExternalRequest::MigrateAsSource { .. } => {} } } } @@ -459,48 +590,89 @@ mod test { } fn make_migrate_as_source_request() -> ExternalRequest { - ExternalRequest::MigrateAsSource { + ExternalRequest::State(StateChangeRequest::MigrateAsSource { migration_id: Uuid::new_v4(), websock: WebsocketConnection(None), - } + }) } fn make_reconfigure_crucible_request() -> ExternalRequest { let (tx, _rx) = tokio::sync::oneshot::channel(); - ExternalRequest::ReconfigureCrucibleVolume { - backend_id: SpecKey::Uuid(Uuid::new_v4()), - new_vcr_json: "".to_string(), - result_tx: tx, + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id: SpecKey::Uuid(Uuid::new_v4()), + new_vcr_json: "".to_string(), + result_tx: tx, + }, + ) + } + + impl ExternalRequest { + fn assert_start(&self) { + assert!( + matches!(self, Self::State(StateChangeRequest::Start)), + "expected start request, got {self:?}" + ); + } + + fn assert_stop(&self) { + assert!( + matches!(self, Self::State(StateChangeRequest::Stop)), + "expected stop request, got {self:?}" + ); + } + + fn assert_reboot(&self) { + assert!( + matches!(self, Self::State(StateChangeRequest::Reboot)), + "expected reboot request, got {self:?}" + ); + } + + fn assert_migrate_as_source(&self) { + assert!( + matches!( + self, + Self::State(StateChangeRequest::MigrateAsSource { .. }) + ), + "expected migrate as source request, got {self:?}" + ); } } - #[tokio::test] - async fn start_requests_become_idempotent_after_first_request() { + #[test] + fn start_requests_become_idempotent_after_first_request() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); // The first request to start should succeed. - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); + assert!(queue.try_queue(ExternalRequest::start()).is_ok()); // The second one should too, but only for idempotency: the queue should // then have only one start request on it. - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Start))); - assert!(queue.pop_front().is_none()); + assert!(queue.try_queue(ExternalRequest::start()).is_ok()); + queue.pop_front().unwrap().assert_start(); + assert!(queue.is_empty()); // Start requests continue to be ignored even after the instance starts // to run. - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); - assert!(queue.pop_front().is_none()); + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); + + assert!(queue.try_queue(ExternalRequest::start()).is_ok()); + assert!(queue.is_empty()); } - #[tokio::test] - async fn migrate_as_source_is_not_idempotent() { + #[test] + fn migrate_as_source_is_not_idempotent() { // Simulate a running instance. let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); // Requests to migrate out should be allowed. assert!(queue.try_queue(make_migrate_as_source_request()).is_ok()); @@ -517,104 +689,122 @@ mod test { // If migration fails, the instance resumes running, and then another // request to migrate out should be allowed. - assert!(matches!( - queue.pop_front(), - Some(ExternalRequest::MigrateAsSource { .. }) - )); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + queue.pop_front().unwrap().assert_migrate_as_source(); + queue.notify_request_completed(CompletedRequest::MigrationOut { + succeeded: false, + }); + assert!(queue.try_queue(make_migrate_as_source_request()).is_ok()); // A successful migration stops the instance, which forecloses on future // requests to migrate out. queue.pop_front(); - queue.notify_instance_state_change(InstanceStateChange::Stopped); + queue.notify_request_completed(CompletedRequest::MigrationOut { + succeeded: true, + }); + assert!(queue.try_queue(make_migrate_as_source_request()).is_err()); } - #[tokio::test] - async fn stop_requests_enqueue_after_vm_failure() { + #[test] + fn stop_requests_are_idempotent() { let mut queue = - ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); - queue.notify_instance_state_change(InstanceStateChange::Failed); + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); - assert!(queue.try_queue(ExternalRequest::Stop).is_ok()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Stop))); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + queue.pop_front().unwrap().assert_stop(); + assert!(queue.is_empty()); } - #[tokio::test] - async fn reboot_requests_are_idempotent_except_when_stopping() { + #[test] + fn stop_requests_ignored_after_vm_failure() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: false, + }); + + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + assert!(queue.is_empty()); + } + + #[test] + fn reboot_requests_are_idempotent_except_when_stopping() { + let mut queue = + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); // Once the instance is started, reboot requests should be allowed, but // after the first, subsequent requests should be dropped for // idempotency. assert!(queue.is_empty()); for _ in 0..5 { - assert!(queue.try_queue(ExternalRequest::Reboot).is_ok()); + assert!(queue.try_queue(ExternalRequest::reboot()).is_ok()); } - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Reboot))); + queue.pop_front().unwrap().assert_reboot(); assert!(queue.is_empty()); // Once the instance has rebooted, new requests can be queued. - queue.notify_instance_state_change(InstanceStateChange::Rebooted); - assert!(queue.try_queue(ExternalRequest::Reboot).is_ok()); - assert!(!queue.is_empty()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Reboot))); - queue.notify_instance_state_change(InstanceStateChange::Rebooted); + queue.notify_request_completed(CompletedRequest::Reboot); + assert!(queue.try_queue(ExternalRequest::reboot()).is_ok()); + queue.pop_front().unwrap().assert_reboot(); + queue.notify_request_completed(CompletedRequest::Reboot); // If a request to reboot is queued, and then a request to stop is // queued, new requests to reboot should always fail, even after the // instance finishes rebooting. - assert!(queue.try_queue(ExternalRequest::Reboot).is_ok()); + assert!(queue.try_queue(ExternalRequest::reboot()).is_ok()); assert!(!queue.is_empty()); - assert!(queue.try_queue(ExternalRequest::Stop).is_ok()); - assert!(queue.try_queue(ExternalRequest::Reboot).is_err()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Reboot))); - queue.notify_instance_state_change(InstanceStateChange::Rebooted); - assert!(queue.try_queue(ExternalRequest::Reboot).is_err()); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + assert!(queue.try_queue(ExternalRequest::reboot()).is_err()); + queue.pop_front().unwrap().assert_reboot(); + queue.notify_request_completed(CompletedRequest::Reboot); + assert!(queue.try_queue(ExternalRequest::reboot()).is_err()); } - #[tokio::test] - async fn mutation_requires_running_and_not_migrating_out() { + #[test] + fn mutation_disallowed_after_stopped() { let mut queue = - ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - // Mutating a VM before it has started is not allowed. - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); - // Merely dequeuing the start request doesn't allow mutation; the VM - // actually has to be running. - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Start))); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_ok()); - assert!(matches!( - queue.pop_front(), - Some(ExternalRequest::ReconfigureCrucibleVolume { .. }) - )); - - // Successfully requesting migration out should block new mutation - // requests (they should wait for the migration to resolve and then go - // to the target). - assert!(queue.try_queue(make_migrate_as_source_request()).is_ok()); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + queue.notify_request_completed(CompletedRequest::Stop); assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); - - // But if the VM resumes (due to a failed migration out) these requests - // should succeed again. - assert!(queue.pop_front().is_some()); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_ok()); } #[tokio::test] - async fn mutation_disallowed_after_stop() { + async fn vcr_requests_canceled_when_queue_drops() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - queue.notify_instance_state_change(InstanceStateChange::Stopped); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let req = ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id: SpecKey::Uuid(Uuid::new_v4()), + new_vcr_json: "".to_string(), + result_tx: tx, + }, + ); + + assert!(queue.try_queue(req).is_ok()); + drop(queue); + let err = rx.await.unwrap().unwrap_err(); + assert_eq!(err.status_code, hyper::StatusCode::GONE); } } diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index e3ba3a4c8..a37967668 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -119,7 +119,10 @@ use super::{ }, guest_event::{self, GuestEvent}, objects::VmObjects, - request_queue::{self, ExternalRequest, InstanceAutoStart}, + request_queue::{ + self, CompletedRequest, ComponentChangeRequest, ExternalRequest, + InstanceAutoStart, StateChangeRequest, + }, state_publisher::{MigrationStateUpdate, StatePublisher}, InstanceEnsureResponseTx, }; @@ -227,15 +230,19 @@ impl InputQueue { } } - /// Notifies the external request queue that the instance's state has - /// changed so that it can change the dispositions for new state change - /// requests. - fn notify_instance_state_change( - &self, - state: request_queue::InstanceStateChange, - ) { + /// Notifies the external request queue that the state driver has completed + /// a request from that queue. + fn notify_request_completed(&self, state: CompletedRequest) { + let mut guard = self.inner.lock().unwrap(); + guard.external_requests.notify_request_completed(state); + } + + /// Notifies the external request queue that the instance has stopped. This + /// is used to stop the queue when the instance stops without a request from + /// the API (e.g. because the guest requested a chipset-driven shutdown). + fn notify_stopped(&self) { let mut guard = self.inner.lock().unwrap(); - guard.external_requests.notify_instance_state_change(state); + guard.external_requests.notify_stopped(); } /// Submits an external state change request to the queue. @@ -522,14 +529,23 @@ impl StateDriver { let start_result = self.objects.lock_exclusive().await.start(start_reason).await; + + self.input_queue.notify_request_completed(CompletedRequest::Start { + succeeded: start_result.is_ok(), + }); + match &start_result { Ok(()) => { - self.publish_steady_state(InstanceState::Running); + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Running, + )); } Err(e) => { error!(&self.log, "failed to start devices"; "error" => ?e); - self.publish_steady_state(InstanceState::Failed); + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Failed, + )); } } @@ -544,6 +560,11 @@ impl StateDriver { GuestEvent::VcpuSuspendHalt(_when) => { info!(self.log, "Halting due to VM suspend event",); self.do_halt().await; + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Stopped, + )); + + self.input_queue.notify_stopped(); HandleEventOutcome::Exit { final_state: InstanceState::Destroyed, } @@ -564,6 +585,11 @@ impl StateDriver { GuestEvent::ChipsetHalt => { info!(self.log, "Halting due to chipset-driven halt"); self.do_halt().await; + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Stopped, + )); + + self.input_queue.notify_stopped(); HandleEventOutcome::Exit { final_state: InstanceState::Destroyed, } @@ -581,7 +607,7 @@ impl StateDriver { request: ExternalRequest, ) -> HandleEventOutcome { match request { - ExternalRequest::Start => { + ExternalRequest::State(StateChangeRequest::Start) => { match self.start_vm(VmStartReason::ExplicitRequest).await { Ok(_) => HandleEventOutcome::Continue, Err(_) => HandleEventOutcome::Exit { @@ -589,31 +615,50 @@ impl StateDriver { }, } } - ExternalRequest::MigrateAsSource { migration_id, websock } => { - self.migrate_as_source(migration_id, websock.into_inner()) - .await; - - // The callee either queues its own stop request (on a - // successful migration out) or resumes the VM (on a failed - // migration out). Either way, the main loop can just proceed to - // process the queue as normal. - HandleEventOutcome::Continue + ExternalRequest::State(StateChangeRequest::MigrateAsSource { + migration_id, + websock, + }) => { + if self + .migrate_as_source(migration_id, websock.into_inner()) + .await + .is_ok() + { + self.do_halt().await; + HandleEventOutcome::Exit { + final_state: InstanceState::Destroyed, + } + } else { + HandleEventOutcome::Continue + } } - ExternalRequest::Reboot => { + ExternalRequest::State(StateChangeRequest::Reboot) => { self.do_reboot().await; + self.input_queue + .notify_request_completed(CompletedRequest::Reboot); + HandleEventOutcome::Continue } - ExternalRequest::Stop => { + ExternalRequest::State(StateChangeRequest::Stop) => { self.do_halt().await; + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Stopped, + )); + + self.input_queue + .notify_request_completed(CompletedRequest::Stop); + HandleEventOutcome::Exit { final_state: InstanceState::Destroyed, } } - ExternalRequest::ReconfigureCrucibleVolume { - backend_id, - new_vcr_json, - result_tx, - } => { + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id, + new_vcr_json, + result_tx, + }, + ) => { let _ = result_tx.send( self.reconfigure_crucible_volume(&backend_id, new_vcr_json) .await, @@ -633,9 +678,6 @@ impl StateDriver { // Notify other consumers that the instance successfully rebooted and is // now back to Running. - self.input_queue.notify_instance_state_change( - request_queue::InstanceStateChange::Rebooted, - ); self.external_state .update(ExternalStateUpdate::Instance(InstanceState::Running)); } @@ -658,34 +700,13 @@ impl StateDriver { guard.halt().await; } - - self.publish_steady_state(InstanceState::Stopped); - } - - fn publish_steady_state(&mut self, state: InstanceState) { - let change = match state { - InstanceState::Running => { - request_queue::InstanceStateChange::StartedRunning - } - InstanceState::Stopped => { - request_queue::InstanceStateChange::Stopped - } - InstanceState::Failed => request_queue::InstanceStateChange::Failed, - _ => panic!( - "Called publish_steady_state on non-terminal state {:?}", - state - ), - }; - - self.input_queue.notify_instance_state_change(change); - self.external_state.update(ExternalStateUpdate::Instance(state)); } async fn migrate_as_source( &mut self, migration_id: Uuid, websock: dropshot::WebsocketConnection, - ) { + ) -> Result<(), ()> { let conn = tokio_tungstenite::WebSocketStream::from_raw_socket( websock.into_inner(), tokio_tungstenite::tungstenite::protocol::Role::Server, @@ -712,7 +733,7 @@ impl StateDriver { }, )); - return; + return Err(()); } }; @@ -740,15 +761,25 @@ impl StateDriver { // On a successful migration out, the protocol promises to leave // the VM objects in a paused state, so don't pause them again. self.paused = true; - self.input_queue - .queue_external_request(ExternalRequest::Stop) - .expect("can always queue a request to stop"); + self.input_queue.notify_request_completed( + CompletedRequest::MigrationOut { succeeded: true }, + ); + + Ok(()) } Err(e) => { info!(self.log, "migration out failed, resuming"; "error" => ?e); - self.publish_steady_state(InstanceState::Running); + self.input_queue.notify_request_completed( + CompletedRequest::MigrationOut { succeeded: false }, + ); + + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Running, + )); + + Err(()) } } } From 5be01dd24d40c3bc80c38f1ff94f31aca2aac5a4 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Mon, 10 Mar 2025 23:31:05 +0000 Subject: [PATCH 2/7] server: minor PR feedback --- .../src/lib/vm/request_queue.rs | 57 ++++++++++--------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 1d53e67d1..86535c173 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -69,7 +69,7 @@ impl std::fmt::Debug for StateChangeRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Start => write!(f, "Start"), - Self::MigrateAsSource { migration_id, .. } => f + Self::MigrateAsSource { migration_id, websock: _ } => f .debug_struct("MigrateAsSource") .field("migration_id", migration_id) .finish(), @@ -83,7 +83,7 @@ impl std::fmt::Debug for StateChangeRequest { /// /// NOTE: Successfully queuing a component change request does not guarantee /// that the request will be processed, because it may be preempted by a VM -/// state change. If this happens the request will fail and notify the +/// state change. If this happens, the request will fail and notify the /// submitter using whatever channel is appropriate for the request's type. pub enum ComponentChangeRequest { /// Attempts to update the volume construction request for the supplied @@ -129,17 +129,17 @@ pub(crate) enum ExternalRequest { impl ExternalRequest { /// Constructs a VM start request. - pub fn start() -> Self { + pub const fn start() -> Self { Self::State(StateChangeRequest::Start) } /// Constructs a VM stop request. - pub fn stop() -> Self { + pub const fn stop() -> Self { Self::State(StateChangeRequest::Stop) } /// Constructs a VM reboot request. - pub fn reboot() -> Self { + pub const fn reboot() -> Self { Self::State(StateChangeRequest::Reboot) } @@ -168,6 +168,10 @@ impl ExternalRequest { result_tx, }) } + + fn is_stop(&self) -> bool { + matches!(self, Self::State(StateChangeRequest::Stop)) + } } /// A set of reasons why a request to queue an external state transition can @@ -224,13 +228,14 @@ pub(super) enum CompletedRequest { } /// The queue's internal notion of the VM's runtime state. -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] enum QueueState { /// The instance has not started yet and no one has asked it to start. NotStarted, - /// The instance is not running yet, but the state driver will eventually - /// try to start it. + /// The instance is not running yet, but it's on its way there: either the + /// state driver is actively trying to start it, or there is a request that, + /// when processed, will direct the driver to start the instance. StartPending, /// The instance has successfully started. @@ -276,12 +281,12 @@ pub(super) struct ExternalRequestQueue { /// completed by the state driver. awaiting_reboot: bool, - /// True if this queue has enqueued a stop request that has not been - /// completed by the state driver. - awaiting_migration_out: bool, - /// True if this queue has enqueued a request to migrate out that has not /// been completed by the state driver. + awaiting_migration_out: bool, + + /// True if this queue has enqueued a stop request that has not been + /// completed by the state driver. awaiting_stop: bool, /// The queue's logger. @@ -344,10 +349,7 @@ impl ExternalRequestQueue { // If the queue is in a terminal state, deny the request straightaway // (unless it's a stop request, which can be ignored for idempotency). let disposition = if let Some(reason) = self.state.deny_reason() { - if matches!( - request, - ExternalRequest::State(StateChangeRequest::Stop) - ) { + if request.is_stop() { Disposition::Ignore } else { Disposition::Deny(reason) @@ -386,7 +388,7 @@ impl ExternalRequestQueue { ) } else if self.awaiting_stop { Disposition::Deny(RequestDeniedReason::HaltPending) - } else if matches!(self.state, QueueState::NotStarted) { + } else if self.state == QueueState::NotStarted { Disposition::Deny( RequestDeniedReason::InstanceNotActive, ) @@ -407,11 +409,11 @@ impl ExternalRequestQueue { ) } else if self.awaiting_stop { Disposition::Deny(RequestDeniedReason::HaltPending) - } else if matches!(self.state, QueueState::NotStarted) { + } else if self.state == QueueState::NotStarted { Disposition::Deny( RequestDeniedReason::InstanceNotActive, ) - } else if matches!(self.state, QueueState::StartPending) { + } else if self.state == QueueState::StartPending { Disposition::Deny(RequestDeniedReason::StartInProgress) } else if self.awaiting_reboot { Disposition::Ignore @@ -459,7 +461,7 @@ impl ExternalRequestQueue { Disposition::Enqueue => { match request { ExternalRequest::State(StateChangeRequest::Start) => { - assert!(matches!(self.state, QueueState::NotStarted)); + assert_eq!(self.state, QueueState::NotStarted); self.state = QueueState::StartPending; } ExternalRequest::State( @@ -503,7 +505,7 @@ impl ExternalRequestQueue { match req { CompletedRequest::Start { succeeded } => { - assert!(matches!(self.state, QueueState::StartPending)); + assert_eq!(self.state, QueueState::StartPending); if succeeded { self.state = QueueState::Running; } else { @@ -511,12 +513,12 @@ impl ExternalRequestQueue { } } CompletedRequest::Reboot => { - assert!(matches!(self.state, QueueState::Running)); + assert_eq!(self.state, QueueState::Running); assert!(self.awaiting_reboot); self.awaiting_reboot = false; } CompletedRequest::MigrationOut { succeeded } => { - assert!(matches!(self.state, QueueState::Running)); + assert_eq!(self.state, QueueState::Running); assert!(self.awaiting_migration_out); self.awaiting_migration_out = false; if succeeded { @@ -608,6 +610,7 @@ mod test { } impl ExternalRequest { + #[track_caller] fn assert_start(&self) { assert!( matches!(self, Self::State(StateChangeRequest::Start)), @@ -615,13 +618,12 @@ mod test { ); } + #[track_caller] fn assert_stop(&self) { - assert!( - matches!(self, Self::State(StateChangeRequest::Stop)), - "expected stop request, got {self:?}" - ); + assert!(self.is_stop(), "expected stop request, got {self:?}"); } + #[track_caller] fn assert_reboot(&self) { assert!( matches!(self, Self::State(StateChangeRequest::Reboot)), @@ -629,6 +631,7 @@ mod test { ); } + #[track_caller] fn assert_migrate_as_source(&self) { assert!( matches!( From 29ea5fc11d03b1d456bc7736d44c2b801d72efcb Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Mon, 10 Mar 2025 23:41:06 +0000 Subject: [PATCH 3/7] server: requiescat in pace, Disposition --- .../src/lib/vm/request_queue.rs | 170 +++++++++--------- 1 file changed, 85 insertions(+), 85 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 86535c173..8890d03f3 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -203,21 +203,6 @@ pub(crate) enum RequestDeniedReason { InstanceFailed, } -/// The possible methods of handling a request to queue a state change. -#[derive(Copy, Clone, Debug)] -enum Disposition { - /// Put the state change on the queue. - Enqueue, - - /// Drop the state change silently. This is used to make requests appear - /// idempotent to callers without making the state driver deal with the - /// consequences of queuing the same state change request twice. - Ignore, - - /// Deny the request to change state. - Deny(RequestDeniedReason), -} - /// A kind of request that can be popped from the queue and then completed. #[derive(Copy, Clone, Debug)] pub(super) enum CompletedRequest { @@ -346,13 +331,52 @@ impl ExternalRequestQueue { &mut self, request: ExternalRequest, ) -> Result<(), RequestDeniedReason> { + let request_str = format!("{request:?}"); + let res = self.try_queue_internal(request); + match res { + Ok(true) => { + info!( + &self.log, + "enqueued external request"; + "request" => request_str + ) + } + Ok(false) => { + info!( + &self.log, + "ignored external request"; + "request" => request_str + ) + } + Err(reason) => { + info!( + &self.log, + "denied external request"; + "request" => request_str, + "reason" => %reason + ) + } + } + + res.map(|_| ()) + } + + /// Attempts to place `request` on the queue, returning + /// + /// - `Ok(true)` if the request was enqueued, + /// - `Ok(false)` if the request was ignored, and + /// - `Err(reason)` if the request was denied. + fn try_queue_internal( + &mut self, + request: ExternalRequest, + ) -> Result { // If the queue is in a terminal state, deny the request straightaway // (unless it's a stop request, which can be ignored for idempotency). - let disposition = if let Some(reason) = self.state.deny_reason() { + if let Some(reason) = self.state.deny_reason() { if request.is_stop() { - Disposition::Ignore + return Ok(false); } else { - Disposition::Deny(reason) + return Err(reason); } } else { // The instance hasn't stopped yet, so consider this request in @@ -368,11 +392,9 @@ impl ExternalRequestQueue { // state. ExternalRequest::State(StateChangeRequest::Start) => { if self.awaiting_stop { - Disposition::Deny(RequestDeniedReason::HaltPending) - } else if let QueueState::NotStarted = self.state { - Disposition::Enqueue - } else { - Disposition::Ignore + return Err(RequestDeniedReason::HaltPending); + } else if self.state != QueueState::NotStarted { + return Ok(false); } } @@ -383,17 +405,13 @@ impl ExternalRequestQueue { StateChangeRequest::MigrateAsSource { .. }, ) => { if self.awaiting_migration_out { - Disposition::Deny( + return Err( RequestDeniedReason::AlreadyMigrationSource, - ) + ); } else if self.awaiting_stop { - Disposition::Deny(RequestDeniedReason::HaltPending) + return Err(RequestDeniedReason::HaltPending); } else if self.state == QueueState::NotStarted { - Disposition::Deny( - RequestDeniedReason::InstanceNotActive, - ) - } else { - Disposition::Enqueue + return Err(RequestDeniedReason::InstanceNotActive); } } @@ -404,21 +422,17 @@ impl ExternalRequestQueue { // the migration to resolve. ExternalRequest::State(StateChangeRequest::Reboot) => { if self.awaiting_migration_out { - Disposition::Deny( + return Err( RequestDeniedReason::InvalidForMigrationSource, - ) + ); } else if self.awaiting_stop { - Disposition::Deny(RequestDeniedReason::HaltPending) + return Err(RequestDeniedReason::HaltPending); } else if self.state == QueueState::NotStarted { - Disposition::Deny( - RequestDeniedReason::InstanceNotActive, - ) + return Err(RequestDeniedReason::InstanceNotActive); } else if self.state == QueueState::StartPending { - Disposition::Deny(RequestDeniedReason::StartInProgress) + return Err(RequestDeniedReason::StartInProgress); } else if self.awaiting_reboot { - Disposition::Ignore - } else { - Disposition::Enqueue + return Ok(false); } } @@ -427,13 +441,11 @@ impl ExternalRequestQueue { // to the migration target. ExternalRequest::State(StateChangeRequest::Stop) => { if self.awaiting_migration_out { - Disposition::Deny( + return Err( RequestDeniedReason::InvalidForMigrationSource, - ) + ); } else if self.awaiting_stop { - Disposition::Ignore - } else { - Disposition::Enqueue + return Ok(false); } } @@ -447,50 +459,38 @@ impl ExternalRequestQueue { ComponentChangeRequest::ReconfigureCrucibleVolume { .. }, - ) => Disposition::Enqueue, + ) => {} } }; - info!(&self.log, "Queuing external request"; - "request" => ?request, - "disposition" => ?disposition); - - match disposition { - Disposition::Ignore => Ok(()), - Disposition::Deny(reason) => Err(reason), - Disposition::Enqueue => { - match request { - ExternalRequest::State(StateChangeRequest::Start) => { - assert_eq!(self.state, QueueState::NotStarted); - self.state = QueueState::StartPending; - } - ExternalRequest::State( - StateChangeRequest::MigrateAsSource { .. }, - ) => { - assert!(!self.awaiting_migration_out); - self.awaiting_migration_out = true; - } - ExternalRequest::State(StateChangeRequest::Reboot) => { - assert!(!self.awaiting_reboot); - self.awaiting_reboot = true; - } - ExternalRequest::State(StateChangeRequest::Stop) => { - assert!(!self.awaiting_stop); - self.awaiting_stop = true; - } - ExternalRequest::Component(_) => {} - } - - match request { - ExternalRequest::State(s) => self.state_queue.push_back(s), - ExternalRequest::Component(c) => { - self.component_queue.push_back(c) - } - } - - Ok(()) + match request { + ExternalRequest::State(StateChangeRequest::Start) => { + assert_eq!(self.state, QueueState::NotStarted); + self.state = QueueState::StartPending; + } + ExternalRequest::State(StateChangeRequest::MigrateAsSource { + .. + }) => { + assert!(!self.awaiting_migration_out); + self.awaiting_migration_out = true; + } + ExternalRequest::State(StateChangeRequest::Reboot) => { + assert!(!self.awaiting_reboot); + self.awaiting_reboot = true; + } + ExternalRequest::State(StateChangeRequest::Stop) => { + assert!(!self.awaiting_stop); + self.awaiting_stop = true; } + ExternalRequest::Component(_) => {} } + + match request { + ExternalRequest::State(s) => self.state_queue.push_back(s), + ExternalRequest::Component(c) => self.component_queue.push_back(c), + } + + Ok(true) } /// Notifies this queue that the caller has finished processing a From 58cb957014eea0ddf3d13292519ddc829d3d1d0d Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 11 Mar 2025 00:09:27 +0000 Subject: [PATCH 4/7] server: aww yiss let's proptest some stuff --- Cargo.lock | 1 + bin/propolis-server/Cargo.toml | 2 +- .../src/lib/vm/request_queue.rs | 239 ++++++++++++++++++ .../proptest-regressions/vm/request_queue.txt | 11 + 4 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 bin/propolis-server/src/proptest-regressions/vm/request_queue.txt diff --git a/Cargo.lock b/Cargo.lock index b2203c19d..63cab2dbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4609,6 +4609,7 @@ dependencies = [ "propolis", "propolis_api_types", "propolis_types", + "proptest", "reqwest 0.12.7", "rfb", "rgb_frame", diff --git a/bin/propolis-server/Cargo.toml b/bin/propolis-server/Cargo.toml index 98add8228..e9e7b1ced 100644 --- a/bin/propolis-server/Cargo.toml +++ b/bin/propolis-server/Cargo.toml @@ -73,6 +73,7 @@ ring.workspace = true slog = { workspace = true, features = [ "max_level_trace", "release_max_level_debug" ] } expectorate.workspace = true mockall.workspace = true +proptest.workspace = true [features] default = [] @@ -83,7 +84,6 @@ omicron-build = ["propolis/omicron-build"] # Falcon builds require corresponding bits turned on in the dependency libs falcon = ["propolis/falcon"] - # Testing necessitates injecting failures which should hopefully be rare or even # never occur on real otherwise-unperturbed systems. We conditionally compile # code supporting failure injection to avoid the risk of somehow injecting diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 8890d03f3..20ba520ce 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -585,6 +585,7 @@ impl Drop for ExternalRequestQueue { mod test { use super::*; + use proptest::prelude::*; use uuid::Uuid; fn test_logger() -> slog::Logger { @@ -641,6 +642,19 @@ mod test { "expected migrate as source request, got {self:?}" ); } + + #[track_caller] + fn assert_reconfigure_crucible(&self) { + assert!( + matches!( + self, + Self::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { .. } + ) + ), + "expected Crucible reconfiguration request, got {self:?}" + ); + } } #[test] @@ -810,4 +824,229 @@ mod test { let err = rx.await.unwrap().unwrap_err(); assert_eq!(err.status_code, hyper::StatusCode::GONE); } + + /// A helper for generating requests as part of a property testing strategy. + /// `proptest` requires values that are the output of a `Strategy` to be + /// `Clone`, which `ExternalRequest` is not. To get around this, create a + /// strategy that returns variants of this enum and have a `From` impl that + /// then creates requests of the appropriate kind. + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + enum RequestKind { + Start { will_succeed: bool }, + Stop, + Reboot, + Migrate { will_succeed: bool }, + ReconfigureCrucible, + } + + impl From for ExternalRequest { + fn from(value: RequestKind) -> Self { + match value { + RequestKind::Start { will_succeed: _ } => { + ExternalRequest::start() + } + RequestKind::Stop => ExternalRequest::stop(), + RequestKind::Reboot => ExternalRequest::reboot(), + RequestKind::Migrate { will_succeed: _ } => { + make_migrate_as_source_request() + } + RequestKind::ReconfigureCrucible => { + make_reconfigure_crucible_request() + } + } + } + } + + fn request_strategy() -> impl Strategy { + prop_oneof![ + Just(RequestKind::Start { will_succeed: true }), + Just(RequestKind::Start { will_succeed: false }), + Just(RequestKind::Stop), + Just(RequestKind::Reboot), + Just(RequestKind::Migrate { will_succeed: true }), + Just(RequestKind::Migrate { will_succeed: false }), + Just(RequestKind::ReconfigureCrucible), + ] + } + + proptest! { + // Tests the behavior of the request queue in circumstances where start + // requests are queued, but never actually acknowledged. + #[test] + fn request_queuing_before_start_acknowledged( + reqs in prop::collection::vec(request_strategy(), 0..100) + ) { + let mut queue = + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); + + let mut started = false; + let mut stop_requested = false; + let mut migrating_out = false; + for req in reqs { + let result = queue.try_queue(req.into()); + match req { + RequestKind::Start { .. } => { + if !stop_requested { + assert!(result.is_ok()); + started = true; + } else { + assert!(result.is_err()); + } + } + + RequestKind::Stop => { + if !migrating_out { + assert!(result.is_ok()); + stop_requested = true; + } else { + assert!(result.is_err()); + } + } + + RequestKind::Reboot => { + assert!(result.is_err()); + } + + RequestKind::Migrate { .. } => { + if started && !stop_requested && !migrating_out { + assert!(result.is_ok()); + migrating_out = true; + } else { + assert!(result.is_err()); + } + } + + RequestKind::ReconfigureCrucible => { + assert!(result.is_ok()); + } + } + } + } + + // Tests the behavior of the request queue in circumstances where every + // request made of the state driver completes immediately. + #[test] + fn request_queuing_with_immediate_dequeueing( + reqs in prop::collection::vec(request_strategy(), 0..100) + ) { + let mut queue = + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); + + // True once a start request has been queued. + let mut start_requested = false; + + // True once the VM reaches a terminal state (stopped, failed, + // migrated out). + let mut halted = false; + for req in reqs { + let result = queue.try_queue(req.into()); + match req { + // Start requests always succeed (though they may be + // ignored and not queued) on a non-halted VM. + RequestKind::Start { will_succeed } => { + if !halted { + assert!(result.is_ok()); + + // This request is only enqueued if it is the first + // request to start. + if !start_requested { + start_requested = true; + queue.pop_front().unwrap().assert_start(); + let completed = CompletedRequest::Start { + succeeded: will_succeed + }; + + queue.notify_request_completed(completed); + + // Telling the queue that a start attempt failed + // moves the queue to a terminal state. + if !will_succeed { + halted = true; + } + } else { + assert!(queue.is_empty()); + } + } else { + assert!(result.is_err()); + assert!(queue.is_empty()); + } + } + + // Stop requests always succeed (they are never denied), but + // they are ignored for VMs that have already halted. + RequestKind::Stop => { + assert!(result.is_ok()); + if !halted { + queue.pop_front().unwrap().assert_stop(); + queue.notify_request_completed( + CompletedRequest::Stop + ); + + halted = true; + } else { + assert!(queue.is_empty()); + } + } + + // Reboot requests are always enqueued if the VM is active. + // They are ignored if there's a pending migration out, but + // in this test there is never a *pending* migration out, + // since all requests are dequeued and processed + // immediately. + RequestKind::Reboot => { + if start_requested && !halted { + assert!(result.is_ok()); + queue.pop_front().unwrap().assert_reboot(); + queue.notify_request_completed( + CompletedRequest::Reboot + ); + } else { + assert!(result.is_err()); + assert!(queue.is_empty()); + } + } + + // Migration requests have the same disposition as reboot + // requests. + RequestKind::Migrate { will_succeed } => { + if start_requested && !halted { + assert!(result.is_ok()); + queue + .pop_front() + .unwrap() + .assert_migrate_as_source(); + + + let completed = CompletedRequest::MigrationOut { + succeeded: will_succeed + }; + + queue.notify_request_completed(completed); + if will_succeed { + halted = true; + } + } else { + assert!(result.is_err()); + assert!(queue.is_empty()); + } + } + + // Crucible reconfiguration requests are always queued for + // unhalted VMs. + RequestKind::ReconfigureCrucible => { + if !halted { + assert!(result.is_ok()); + queue + .pop_front() + .unwrap() + .assert_reconfigure_crucible(); + } else { + assert!(result.is_err()); + assert!(queue.is_empty()); + } + } + } + } + } + } } diff --git a/bin/propolis-server/src/proptest-regressions/vm/request_queue.txt b/bin/propolis-server/src/proptest-regressions/vm/request_queue.txt new file mode 100644 index 000000000..7eb5819d9 --- /dev/null +++ b/bin/propolis-server/src/proptest-regressions/vm/request_queue.txt @@ -0,0 +1,11 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 467749978aea2988f7790844904751ed5f0797f700949e702db74ae430a659e0 # shrinks to reqs = [Migrate, Stop] +cc 03ba07e9b5a99141bddd9b878bff86845a6da9eb1aa015b3afc7b3ebfed7a6d1 # shrinks to reqs = [Start, Stop, Migrate] +cc 67a067444d475068e86b43528884319ff178d6d9038a3d9223c32789f871baa3 # shrinks to reqs = [Start, Migrate] +cc b3df4b82bdb87e3533f4bd47f0a3ee8be21893c0afc15b472281b2a79006aadf # shrinks to reqs = [Migrate] +cc 3430b43ba860946e5feb7b3b0246623708efb1465dd4fe7a604ddf479d4dc3ae # shrinks to reqs = [Start { will_succeed: true }, Migrate { will_succeed: false }, Reboot] From 81dde6126b6af4b3568ad90380faff71f1a3aae5 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 11 Mar 2025 03:52:27 +0000 Subject: [PATCH 5/7] server: even more proptesting --- .../src/lib/vm/request_queue.rs | 252 ++++++++++++++++++ .../proptest-regressions/vm/request_queue.txt | 1 + 2 files changed, 253 insertions(+) diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 20ba520ce..4299fb6b5 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -1049,4 +1049,256 @@ mod test { } } } + + /// An operation that can be performed during a [`QueueDequeueTest`]. + #[derive(Clone, Copy, Debug)] + enum QueueOp { + Enqueue(RequestKind), + Dequeue, + } + + fn queue_op_strategy() -> impl Strategy { + prop_oneof![ + request_strategy().prop_map(QueueOp::Enqueue), + Just(QueueOp::Dequeue) + ] + } + + /// A helper that queues and dequeues requests in a proptest-generated + /// order and that sends fake completion notifications back to the request + /// queue. + struct QueueDequeueTest { + /// The external request queue under test. + queue: ExternalRequestQueue, + + /// The set of state change requests that the helper expects to see from + /// the external queue. + expected_state: VecDeque, + + /// The set of component change requests that the helper expects to see + /// from the external queue. + expected_component: VecDeque, + + /// True if the helper has queued a request to start its fake VM. + start_requested: bool, + + /// True if the helper has successfully started its fake VM. + started: bool, + + /// True if the helper has queued a request to stop its fake VM. + stop_requested: bool, + + /// True if the helper has an outstanding request to reboot its fake VM. + reboot_requested: bool, + + /// True if the helper has an outstanding request to migrate its fake + /// VM. + migrate_out_requested: bool, + + /// True if the fake VM is halted (for any reason). + halted: bool, + } + + impl QueueDequeueTest { + fn new() -> Self { + Self { + queue: ExternalRequestQueue::new( + test_logger(), + InstanceAutoStart::No, + ), + expected_state: Default::default(), + expected_component: Default::default(), + start_requested: false, + started: false, + stop_requested: false, + reboot_requested: false, + migrate_out_requested: false, + halted: false, + } + } + + fn run(&mut self, ops: Vec) { + for op in ops { + match op { + QueueOp::Enqueue(request) => self.queue_request(request), + QueueOp::Dequeue => { + self.dequeue_request(); + if self.halted { + return; + } + } + } + } + } + + /// Submits the supplied `request` to the external request queue, + /// determines the expected result of that submission based on the + /// helper's current flags, and asserts that the result matches the + /// helper's expectation. If the helper expects the request to be + /// queued, it pushes an entry to its internal expected-change queues. + fn queue_request(&mut self, request: RequestKind) { + let result = self.queue.try_queue(request.into()); + match request { + RequestKind::Start { .. } => { + if self.halted || self.stop_requested { + assert!(result.is_err()); + return; + } + + assert!(result.is_ok()); + if !self.start_requested { + self.start_requested = true; + self.expected_state.push_back(request); + } + } + RequestKind::Stop => { + if self.halted || self.stop_requested { + assert!(result.is_ok()); + return; + } + + if self.migrate_out_requested { + assert!(result.is_err()); + return; + } + + self.stop_requested = true; + self.expected_state.push_back(request); + } + RequestKind::Reboot => { + if !self.started + || self.halted + || self.stop_requested + || self.migrate_out_requested + { + assert!(result.is_err()); + return; + } + + assert!(result.is_ok()); + if !self.reboot_requested { + self.reboot_requested = true; + self.expected_state.push_back(request); + } + } + RequestKind::Migrate { .. } => { + if (!self.started && !self.start_requested) + || self.halted + || self.stop_requested + || self.migrate_out_requested + { + assert!(result.is_err()); + return; + } + + assert!(result.is_ok()); + self.expected_state.push_back(request); + self.migrate_out_requested = true; + } + RequestKind::ReconfigureCrucible => { + if self.halted { + assert!(result.is_err()); + return; + } + + assert!(result.is_ok()); + self.expected_component.push_back(request); + } + } + } + + /// Pops a request from the helper's external queue and verifies that it + /// matches the first request on the helper's expected-change queue. If + /// the requests do match, sends a completion notification to the + /// external queue. + fn dequeue_request(&mut self) { + let (dequeued, expected) = match ( + self.queue.pop_front(), + self.expected_state + .pop_front() + .or_else(|| self.expected_component.pop_front()), + ) { + (None, None) => return, + (Some(d), None) => { + panic!("dequeued request {d:?} but expected nothing") + } + (None, Some(e)) => { + panic!("expected request {e:?} but dequeued nothing") + } + (Some(d), Some(e)) => (d, e), + }; + + match (dequeued, expected) { + ( + ExternalRequest::State(StateChangeRequest::Start), + RequestKind::Start { will_succeed }, + ) => { + self.queue.notify_request_completed( + CompletedRequest::Start { succeeded: will_succeed }, + ); + if will_succeed { + self.started = true; + } else { + self.halted = true; + } + } + ( + ExternalRequest::State(StateChangeRequest::Stop), + RequestKind::Stop, + ) => { + self.queue.notify_request_completed(CompletedRequest::Stop); + self.halted = true; + } + ( + ExternalRequest::State(StateChangeRequest::Reboot), + RequestKind::Reboot, + ) => { + self.queue + .notify_request_completed(CompletedRequest::Reboot); + self.reboot_requested = false; + } + ( + ExternalRequest::State( + StateChangeRequest::MigrateAsSource { .. }, + ), + RequestKind::Migrate { will_succeed }, + ) => { + self.queue.notify_request_completed( + CompletedRequest::MigrationOut { + succeeded: will_succeed, + }, + ); + self.migrate_out_requested = false; + if will_succeed { + self.halted = true; + } + } + ( + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + .. + }, + ), + RequestKind::ReconfigureCrucible, + ) => {} + (d, e) => panic!( + "dequeued request {d:?} but expected to dequeue {e:?}\n\ + remaining queue: {:#?}\n\ + remaining expected (state): {:#?}\n\ + remaining expected (components): {:#?}", + self.queue, self.expected_state, self.expected_component + ), + } + } + } + + proptest! { + #[test] + fn request_queue_dequeue( + ops in prop::collection::vec(queue_op_strategy(), 0..100) + ) { + let mut test = QueueDequeueTest::new(); + test.run(ops); + } + } } diff --git a/bin/propolis-server/src/proptest-regressions/vm/request_queue.txt b/bin/propolis-server/src/proptest-regressions/vm/request_queue.txt index 7eb5819d9..123702a47 100644 --- a/bin/propolis-server/src/proptest-regressions/vm/request_queue.txt +++ b/bin/propolis-server/src/proptest-regressions/vm/request_queue.txt @@ -9,3 +9,4 @@ cc 03ba07e9b5a99141bddd9b878bff86845a6da9eb1aa015b3afc7b3ebfed7a6d1 # shrinks to cc 67a067444d475068e86b43528884319ff178d6d9038a3d9223c32789f871baa3 # shrinks to reqs = [Start, Migrate] cc b3df4b82bdb87e3533f4bd47f0a3ee8be21893c0afc15b472281b2a79006aadf # shrinks to reqs = [Migrate] cc 3430b43ba860946e5feb7b3b0246623708efb1465dd4fe7a604ddf479d4dc3ae # shrinks to reqs = [Start { will_succeed: true }, Migrate { will_succeed: false }, Reboot] +cc 2e8b284223a88421aaed16749309839818c16efda4bc4d8d930a35cbdce018cd # shrinks to ops = [Enqueue(ReconfigureCrucible), Enqueue(Start { will_succeed: true }), Dequeue] From 0178609bedee3811262dd9504efbe91d137b3454 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 11 Mar 2025 16:24:38 +0000 Subject: [PATCH 6/7] server: fix arrangement of queuing routines --- .../src/lib/vm/request_queue.rs | 83 ++++++++++--------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 4299fb6b5..a403bdc57 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -331,44 +331,74 @@ impl ExternalRequestQueue { &mut self, request: ExternalRequest, ) -> Result<(), RequestDeniedReason> { - let request_str = format!("{request:?}"); - let res = self.try_queue_internal(request); - match res { + let should_queue = self.should_queue(&request); + match should_queue { Ok(true) => { info!( &self.log, "enqueued external request"; - "request" => request_str - ) + "request" => ?request + ); } Ok(false) => { info!( &self.log, "ignored external request"; - "request" => request_str - ) + "request" => ?request + ); + + return Ok(()); } Err(reason) => { info!( &self.log, "denied external request"; - "request" => request_str, + "request" => ?request, "reason" => %reason - ) + ); + + return Err(reason); } } - res.map(|_| ()) + match request { + ExternalRequest::State(StateChangeRequest::Start) => { + assert_eq!(self.state, QueueState::NotStarted); + self.state = QueueState::StartPending; + } + ExternalRequest::State(StateChangeRequest::MigrateAsSource { + .. + }) => { + assert!(!self.awaiting_migration_out); + self.awaiting_migration_out = true; + } + ExternalRequest::State(StateChangeRequest::Reboot) => { + assert!(!self.awaiting_reboot); + self.awaiting_reboot = true; + } + ExternalRequest::State(StateChangeRequest::Stop) => { + assert!(!self.awaiting_stop); + self.awaiting_stop = true; + } + ExternalRequest::Component(_) => {} + } + + match request { + ExternalRequest::State(s) => self.state_queue.push_back(s), + ExternalRequest::Component(c) => self.component_queue.push_back(c), + } + + Ok(()) } - /// Attempts to place `request` on the queue, returning + /// Determines whether the supplied `request` should be queued, returning: /// /// - `Ok(true)` if the request was enqueued, /// - `Ok(false)` if the request was ignored, and /// - `Err(reason)` if the request was denied. - fn try_queue_internal( + fn should_queue( &mut self, - request: ExternalRequest, + request: &ExternalRequest, ) -> Result { // If the queue is in a terminal state, deny the request straightaway // (unless it's a stop request, which can be ignored for idempotency). @@ -463,33 +493,6 @@ impl ExternalRequestQueue { } }; - match request { - ExternalRequest::State(StateChangeRequest::Start) => { - assert_eq!(self.state, QueueState::NotStarted); - self.state = QueueState::StartPending; - } - ExternalRequest::State(StateChangeRequest::MigrateAsSource { - .. - }) => { - assert!(!self.awaiting_migration_out); - self.awaiting_migration_out = true; - } - ExternalRequest::State(StateChangeRequest::Reboot) => { - assert!(!self.awaiting_reboot); - self.awaiting_reboot = true; - } - ExternalRequest::State(StateChangeRequest::Stop) => { - assert!(!self.awaiting_stop); - self.awaiting_stop = true; - } - ExternalRequest::Component(_) => {} - } - - match request { - ExternalRequest::State(s) => self.state_queue.push_back(s), - ExternalRequest::Component(c) => self.component_queue.push_back(c), - } - Ok(true) } From 152287d987df166e41c3c227e15460908732458a Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 11 Mar 2025 16:39:12 +0000 Subject: [PATCH 7/7] server: allow stop requests with a pending migration out This was the request queue's previous behavior. It ensures that a request to stop drives the current Propolis to a halted state, but does not guarantee that the "logical" VM isn't running somewhere else (i.e. in a migration target). --- .../src/lib/vm/request_queue.rs | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index a403bdc57..9a2b58c9c 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -466,15 +466,16 @@ impl ExternalRequestQueue { } } - // As with reboots, deny requests to stop a VM that might - // migrate out first, since the request may need to be directed - // to the migration target. + // Always queue requests to stop a VM unless one is already + // present. + // + // Note that if the VM migrates out before this request is + // processed, then the "logical" VM is still running (in another + // Propolis). The client is responsible for tracking any + // outstanding migrations and directing its stop requests + // accordingly. ExternalRequest::State(StateChangeRequest::Stop) => { - if self.awaiting_migration_out { - return Err( - RequestDeniedReason::InvalidForMigrationSource, - ); - } else if self.awaiting_stop { + if self.awaiting_stop { return Ok(false); } } @@ -898,12 +899,8 @@ mod test { } RequestKind::Stop => { - if !migrating_out { - assert!(result.is_ok()); - stop_requested = true; - } else { - assert!(result.is_err()); - } + assert!(result.is_ok()); + stop_requested = true; } RequestKind::Reboot => { @@ -1155,13 +1152,8 @@ mod test { } } RequestKind::Stop => { + assert!(result.is_ok()); if self.halted || self.stop_requested { - assert!(result.is_ok()); - return; - } - - if self.migrate_out_requested { - assert!(result.is_err()); return; }