diff --git a/bin/mock-server/src/lib/api_types.rs b/bin/mock-server/src/lib/api_types.rs index c39b8ea33..42b778ebe 100644 --- a/bin/mock-server/src/lib/api_types.rs +++ b/bin/mock-server/src/lib/api_types.rs @@ -51,3 +51,16 @@ pub struct InstanceSerialHistoryParams { /// `max_bytes`. pub max_bytes: Option, } + +#[derive( + Copy, Clone, Debug, PartialEq, Eq, JsonSchema, Serialize, Deserialize, +)] +pub enum MockMode { + /// The mock server should run freely, advancing the state every time the + /// instance_state_monitor endpoint is requested while new state + /// transitions are queued. + Run, + /// The mock server should only advance the current state when the + /// /mock/step endpoint is requested. + SingleStep, +} diff --git a/bin/mock-server/src/lib/lib.rs b/bin/mock-server/src/lib/lib.rs index 66da7a496..ce12e2c0f 100644 --- a/bin/mock-server/src/lib/lib.rs +++ b/bin/mock-server/src/lib/lib.rs @@ -22,6 +22,7 @@ use tokio_tungstenite::WebSocketStream; mod api_types; use api_types::types::{self as api, InstanceEnsureRequest}; +pub use api_types::MockMode; #[derive(Debug, Eq, PartialEq, Error)] pub enum Error { @@ -41,22 +42,29 @@ pub struct InstanceContext { /// The instance's current generation last observed by the /// `instance-state-monitor` endpoint. curr_gen: u64, - /// The next generation to use when inserting new state(s) into the queue. - next_queue_gen: u64, pub properties: api::InstanceProperties, serial: Arc, serial_task: serial::SerialTask, - state_watcher_rx: - watch::Receiver>, - state_watcher_tx: - watch::Sender>, + state_watcher_rx: watch::Receiver, + state_watcher_tx: watch::Sender, +} + +struct MockState { + queue: BTreeMap, + /// The next generation to use when inserting new state(s) into the queue. + next_queue_gen: u64, + /// Current generation when single-stepping. + /// + /// This is set when setting the single-step mock mode, and unset if not in + /// that mode. + single_step_gen: Option, } impl InstanceContext { pub fn new(properties: api::InstanceProperties, _log: &Logger) -> Self { let (state_watcher_tx, state_watcher_rx) = { - let mut states = BTreeMap::new(); - states.insert( + let mut queue = BTreeMap::new(); + queue.insert( 0, api::InstanceStateMonitorResponse { gen: 0, @@ -67,7 +75,11 @@ impl InstanceContext { }, }, ); - watch::channel(states) + watch::channel(MockState { + queue, + single_step_gen: None, + next_queue_gen: 1, + }) }; let serial = serial::Serial::new(&properties.name); @@ -75,7 +87,6 @@ impl InstanceContext { Self { curr_gen: 0, - next_queue_gen: 1, properties, serial, serial_task, @@ -157,8 +168,7 @@ impl InstanceContext { } fn current_state(&self) -> api::InstanceState { - self.state_watcher_rx - .borrow() + self.state_watcher_rx.borrow().queue .get(&self.curr_gen) .expect("current generation must be in the queue, this is weird 'n' bad") .state @@ -169,11 +179,11 @@ impl InstanceContext { log: &Logger, states: &[api::InstanceState], ) { - self.state_watcher_tx.send_modify(|queue| { + self.state_watcher_tx.send_modify(|mock_state| { for &state in states { - let generation = self.next_queue_gen; - self.next_queue_gen += 1; - queue.insert( + let generation = mock_state.next_queue_gen; + mock_state.next_queue_gen += 1; + mock_state.queue.insert( generation, api::InstanceStateMonitorResponse { gen: generation, @@ -275,11 +285,42 @@ async fn instance_state_monitor( (state_watcher, gen) }; + slog::debug!( + rqctx.log, + "instance state monitor request"; + "request_gen" => gen, + ); loop { - let next_gen = gen + 1; - let state = state_watcher.borrow().get(&next_gen).cloned(); + let state = { + let mock_state = state_watcher.borrow_and_update(); + match mock_state.single_step_gen { + // We are single-stepping, and have not yet reached the + // requested generation. Keep waiting until single-stepped to + // where we need to be. + Some(g) if gen > g => { + slog::info!( + rqctx.log, + "instance state monitor: wait for single step..."; + "request_gen" => gen, + "current_gen" => g, + ); + None + } + // Otherwise, if we have stepped to the requested generation, or + // if we are not in single-step mode, just return the current + // thing. + _ => mock_state.queue.get(&gen).cloned(), + } + }; + if let Some(state) = state { - // Advance to the state with the generation we showed to the + slog::info!( + rqctx.log, + "instance state monitor"; + "request_gen" => gen, + "state" => ?state.state, + ); + // Advance to the state with the generation we showed to the // watcher, for use in `instance_get` and when determining what // state transitions are valid. rqctx @@ -289,8 +330,8 @@ async fn instance_state_monitor( .await .as_mut() .expect("if we didn't have an instance, we shouldn't have gotten here") - .curr_gen = next_gen; - return Ok(HttpResponseOk(state.clone())); + .curr_gen = gen; + return Ok(HttpResponseOk(state)); } state_watcher.changed().await.unwrap(); @@ -424,6 +465,98 @@ async fn instance_serial_history_get( })) } +#[endpoint { + method = GET, + path = "/mock/mode" +}] +async fn mock_mode_get( + rqctx: RequestContext>, +) -> Result, HttpError> { + let instance = rqctx.context().instance.lock().await; + let instance = instance.as_ref().ok_or_else(|| { + HttpError::for_internal_error( + "Server not initialized (no instance)".to_string(), + ) + })?; + let mode = if instance.state_watcher_rx.borrow().single_step_gen.is_some() { + MockMode::SingleStep + } else { + MockMode::Run + }; + Ok(HttpResponseOk(mode)) +} + +#[endpoint { + method = PUT, + path = "/mock/mode" +}] +async fn mock_mode_set( + rqctx: RequestContext>, + request: TypedBody, +) -> Result { + let instance = rqctx.context().instance.lock().await; + let instance = instance.as_ref().ok_or_else(|| { + HttpError::for_internal_error( + "Server not initialized (no instance)".to_string(), + ) + })?; + let mode = request.into_inner(); + instance.state_watcher_tx.send_if_modified(|mock_state| { + match mode { + MockMode::Run => { + mock_state.single_step_gen = None; + true + } + // If we're already in single-step mode, don't clobber the existing + // single-step generation. + MockMode::SingleStep if mock_state.single_step_gen.is_some() => { + false + } + // Otherwise, start single-stepping from the current generation. + MockMode::SingleStep => { + mock_state.single_step_gen = Some(instance.curr_gen); + true + } + } + }); + Ok(HttpResponseUpdatedNoContent()) +} + +#[endpoint { + method = PUT, + path = "/mock/step" +}] +async fn mock_step( + rqctx: RequestContext>, +) -> Result { + let instance = rqctx.context().instance.lock().await; + let instance = instance.as_ref().ok_or_else(|| { + HttpError::for_internal_error( + "Server not initialized (no instance)".to_string(), + ) + })?; + if instance.state_watcher_rx.borrow().single_step_gen.is_none() { + return Err(HttpError::for_bad_request( + None, + "not in single-step mode".to_string(), + )); + } + + instance.state_watcher_tx.send_modify(|state| { + let g = state + .single_step_gen + .as_mut() + .expect("we just checked that it's set"); + *g += 1; + slog::info!( + rqctx.log, + "instance state stepped to generation {g}"; + "gen" => *g, + ); + }); + Ok(HttpResponseUpdatedNoContent()) +} + mod serial { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -699,6 +832,9 @@ pub fn api() -> ApiDescription> { api.register(instance_state_put).unwrap(); api.register(instance_serial).unwrap(); api.register(instance_serial_history_get).unwrap(); + api.register(mock_mode_get).unwrap(); + api.register(mock_mode_set).unwrap(); + api.register(mock_step).unwrap(); api }