From 873acf104ab49d9c0183506860ac4afbb1d88eb8 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 20 Feb 2025 12:05:10 -0800 Subject: [PATCH 1/4] mock-server: add single-step API --- bin/mock-server/src/lib/api_types.rs | 13 +++ bin/mock-server/src/lib/lib.rs | 152 +++++++++++++++++++++++---- 2 files changed, 146 insertions(+), 19 deletions(-) diff --git a/bin/mock-server/src/lib/api_types.rs b/bin/mock-server/src/lib/api_types.rs index c39b8ea33..42b778ebe 100644 --- a/bin/mock-server/src/lib/api_types.rs +++ b/bin/mock-server/src/lib/api_types.rs @@ -51,3 +51,16 @@ pub struct InstanceSerialHistoryParams { /// `max_bytes`. pub max_bytes: Option, } + +#[derive( + Copy, Clone, Debug, PartialEq, Eq, JsonSchema, Serialize, Deserialize, +)] +pub enum MockMode { + /// The mock server should run freely, advancing the state every time the + /// instance_state_monitor endpoint is requested while new state + /// transitions are queued. + Run, + /// The mock server should only advance the current state when the + /// /mock/step endpoint is requested. + SingleStep, +} diff --git a/bin/mock-server/src/lib/lib.rs b/bin/mock-server/src/lib/lib.rs index 66da7a496..453d0e841 100644 --- a/bin/mock-server/src/lib/lib.rs +++ b/bin/mock-server/src/lib/lib.rs @@ -41,22 +41,29 @@ pub struct InstanceContext { /// The instance's current generation last observed by the /// `instance-state-monitor` endpoint. curr_gen: u64, - /// The next generation to use when inserting new state(s) into the queue. - next_queue_gen: u64, pub properties: api::InstanceProperties, serial: Arc, serial_task: serial::SerialTask, - state_watcher_rx: - watch::Receiver>, - state_watcher_tx: - watch::Sender>, + state_watcher_rx: watch::Receiver, + state_watcher_tx: watch::Sender, +} + +struct MockState { + queue: BTreeMap, + /// The next generation to use when inserting new state(s) into the queue. + next_queue_gen: u64, + /// Current generation when single-stepping. + /// + /// This is set when setting the single-step mock mode, and unset if not in + /// that mode. + single_step_gen: Option, } impl InstanceContext { pub fn new(properties: api::InstanceProperties, _log: &Logger) -> Self { let (state_watcher_tx, state_watcher_rx) = { - let mut states = BTreeMap::new(); - states.insert( + let mut queue = BTreeMap::new(); + queue.insert( 0, api::InstanceStateMonitorResponse { gen: 0, @@ -67,7 +74,11 @@ impl InstanceContext { }, }, ); - watch::channel(states) + watch::channel(MockState { + queue, + single_step_gen: None, + next_queue_gen: 1, + }) }; let serial = serial::Serial::new(&properties.name); @@ -75,7 +86,6 @@ impl InstanceContext { Self { curr_gen: 0, - next_queue_gen: 1, properties, serial, serial_task, @@ -157,8 +167,7 @@ impl InstanceContext { } fn current_state(&self) -> api::InstanceState { - self.state_watcher_rx - .borrow() + self.state_watcher_rx.borrow().queue .get(&self.curr_gen) .expect("current generation must be in the queue, this is weird 'n' bad") .state @@ -169,11 +178,11 @@ impl InstanceContext { log: &Logger, states: &[api::InstanceState], ) { - self.state_watcher_tx.send_modify(|queue| { + self.state_watcher_tx.send_modify(|mock_state| { for &state in states { - let generation = self.next_queue_gen; - self.next_queue_gen += 1; - queue.insert( + let generation = mock_state.next_queue_gen; + mock_state.next_queue_gen += 1; + mock_state.queue.insert( generation, api::InstanceStateMonitorResponse { gen: generation, @@ -277,9 +286,22 @@ async fn instance_state_monitor( loop { let next_gen = gen + 1; - let state = state_watcher.borrow().get(&next_gen).cloned(); + let state = { + let mock_state = state_watcher.borrow(); + match mock_state.single_step_gen { + // We are single-stepping, and have not yet reached the + // requested generation. Keep waiting until single-stepped to + // where we need to be. + Some(g) if next_gen > g => None, + // Otherwise, if we have stepped to the requested generation, or + // if we are not in single-step mode, just return the current + // thing. + _ => mock_state.queue.get(&next_gen).cloned(), + } + }; + if let Some(state) = state { - // Advance to the state with the generation we showed to the + // Advance to the state with the generation we showed to the // watcher, for use in `instance_get` and when determining what // state transitions are valid. rqctx @@ -290,7 +312,7 @@ async fn instance_state_monitor( .as_mut() .expect("if we didn't have an instance, we shouldn't have gotten here") .curr_gen = next_gen; - return Ok(HttpResponseOk(state.clone())); + return Ok(HttpResponseOk(state)); } state_watcher.changed().await.unwrap(); @@ -424,6 +446,95 @@ async fn instance_serial_history_get( })) } +#[endpoint { + method = GET, + path = "/mock/mode" +}] +async fn mock_mode_get( + rqctx: RequestContext>, +) -> Result, HttpError> { + let instance = rqctx.context().instance.lock().await; + let instance = instance.as_ref().ok_or_else(|| { + HttpError::for_internal_error( + "Server not initialized (no instance)".to_string(), + ) + })?; + let mode = if instance.state_watcher_rx.borrow().single_step_gen.is_some() { + api_types::MockMode::SingleStep + } else { + api_types::MockMode::Run + }; + Ok(HttpResponseOk(mode)) +} + +#[endpoint { + method = PUT, + path = "/mock/mode" +}] +async fn mock_mode_set( + rqctx: RequestContext>, + request: TypedBody, +) -> Result { + let instance = rqctx.context().instance.lock().await; + let instance = instance.as_ref().ok_or_else(|| { + HttpError::for_internal_error( + "Server not initialized (no instance)".to_string(), + ) + })?; + let mode = request.into_inner(); + instance.state_watcher_tx.send_if_modified(|mock_state| { + match mode { + api_types::MockMode::Run => { + mock_state.single_step_gen = None; + true + } + // If we're already in single-step mode, don't clobber the existing + // single-step generation. + api_types::MockMode::SingleStep + if mock_state.single_step_gen.is_some() => + { + false + } + // Otherwise, start single-stepping from the current generation. + api_types::MockMode::SingleStep => { + mock_state.single_step_gen = Some(instance.curr_gen); + true + } + } + }); + Ok(HttpResponseUpdatedNoContent()) +} + +#[endpoint { + method = PUT, + path = "/mock/step" +}] +async fn mock_step( + rqctx: RequestContext>, +) -> Result { + let instance = rqctx.context().instance.lock().await; + let instance = instance.as_ref().ok_or_else(|| { + HttpError::for_internal_error( + "Server not initialized (no instance)".to_string(), + ) + })?; + if instance.state_watcher_rx.borrow().single_step_gen.is_none() { + return Err(HttpError::for_bad_request( + None, + "not in single-step mode".to_string(), + )); + } + + instance.state_watcher_tx.send_modify(|state| { + let g = state + .single_step_gen + .as_mut() + .expect("we just checked that it's set"); + *g += 1; + }); + Ok(HttpResponseUpdatedNoContent()) +} + mod serial { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -699,6 +810,9 @@ pub fn api() -> ApiDescription> { api.register(instance_state_put).unwrap(); api.register(instance_serial).unwrap(); api.register(instance_serial_history_get).unwrap(); + api.register(mock_mode_get).unwrap(); + api.register(mock_mode_set).unwrap(); + api.register(mock_step).unwrap(); api } From 30e1fe25d093da04e2aeda397b27103743dd55fd Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 20 Feb 2025 14:59:19 -0800 Subject: [PATCH 2/4] publicly export `MockMode` --- bin/mock-server/src/lib/lib.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/bin/mock-server/src/lib/lib.rs b/bin/mock-server/src/lib/lib.rs index 453d0e841..bf3c862f5 100644 --- a/bin/mock-server/src/lib/lib.rs +++ b/bin/mock-server/src/lib/lib.rs @@ -22,6 +22,7 @@ use tokio_tungstenite::WebSocketStream; mod api_types; use api_types::types::{self as api, InstanceEnsureRequest}; +pub use api_types::MockMode; #[derive(Debug, Eq, PartialEq, Error)] pub enum Error { @@ -452,7 +453,7 @@ async fn instance_serial_history_get( }] async fn mock_mode_get( rqctx: RequestContext>, -) -> Result, HttpError> { +) -> Result, HttpError> { let instance = rqctx.context().instance.lock().await; let instance = instance.as_ref().ok_or_else(|| { HttpError::for_internal_error( @@ -460,9 +461,9 @@ async fn mock_mode_get( ) })?; let mode = if instance.state_watcher_rx.borrow().single_step_gen.is_some() { - api_types::MockMode::SingleStep + MockMode::SingleStep } else { - api_types::MockMode::Run + MockMode::Run }; Ok(HttpResponseOk(mode)) } @@ -473,7 +474,7 @@ async fn mock_mode_get( }] async fn mock_mode_set( rqctx: RequestContext>, - request: TypedBody, + request: TypedBody, ) -> Result { let instance = rqctx.context().instance.lock().await; let instance = instance.as_ref().ok_or_else(|| { @@ -484,19 +485,17 @@ async fn mock_mode_set( let mode = request.into_inner(); instance.state_watcher_tx.send_if_modified(|mock_state| { match mode { - api_types::MockMode::Run => { + MockMode::Run => { mock_state.single_step_gen = None; true } // If we're already in single-step mode, don't clobber the existing // single-step generation. - api_types::MockMode::SingleStep - if mock_state.single_step_gen.is_some() => - { + MockMode::SingleStep if mock_state.single_step_gen.is_some() => { false } // Otherwise, start single-stepping from the current generation. - api_types::MockMode::SingleStep => { + MockMode::SingleStep => { mock_state.single_step_gen = Some(instance.curr_gen); true } From efbd3e4a87bc2e6c4e4f8cb506285ff60d7e6431 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 20 Feb 2025 15:42:03 -0800 Subject: [PATCH 3/4] add logging --- bin/mock-server/src/lib/lib.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/bin/mock-server/src/lib/lib.rs b/bin/mock-server/src/lib/lib.rs index bf3c862f5..8b77c4121 100644 --- a/bin/mock-server/src/lib/lib.rs +++ b/bin/mock-server/src/lib/lib.rs @@ -285,15 +285,28 @@ async fn instance_state_monitor( (state_watcher, gen) }; + slog::debug!( + rqctx.log, + "instance state monitor request"; + "request_gen" => gen, + ); + let next_gen = gen + 1; loop { - let next_gen = gen + 1; let state = { - let mock_state = state_watcher.borrow(); + let mock_state = state_watcher.borrow_and_update(); match mock_state.single_step_gen { // We are single-stepping, and have not yet reached the // requested generation. Keep waiting until single-stepped to // where we need to be. - Some(g) if next_gen > g => None, + Some(g) if next_gen > g => { + slog::info!( + rqctx.log, + "instance state monitor: wait for single step..."; + "request_gen" => gen, + "current_gen" => g, + ); + None + } // Otherwise, if we have stepped to the requested generation, or // if we are not in single-step mode, just return the current // thing. @@ -302,6 +315,13 @@ async fn instance_state_monitor( }; if let Some(state) = state { + slog::info!( + rqctx.log, + "instance state monitor"; + "request_gen" => gen, + "current_gen" => next_gen, + "state" => ?state.state, + ); // Advance to the state with the generation we showed to the // watcher, for use in `instance_get` and when determining what // state transitions are valid. @@ -530,6 +550,11 @@ async fn mock_step( .as_mut() .expect("we just checked that it's set"); *g += 1; + slog::info!( + rqctx.log, + "instance state stepped to generation {g}"; + "gen" => *g, + ); }); Ok(HttpResponseUpdatedNoContent()) } From 4f80f53b6fd1858e812bb81613dccf4e981726a2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 20 Feb 2025 15:58:37 -0800 Subject: [PATCH 4/4] agh --- bin/mock-server/src/lib/lib.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/bin/mock-server/src/lib/lib.rs b/bin/mock-server/src/lib/lib.rs index 8b77c4121..ce12e2c0f 100644 --- a/bin/mock-server/src/lib/lib.rs +++ b/bin/mock-server/src/lib/lib.rs @@ -290,7 +290,6 @@ async fn instance_state_monitor( "instance state monitor request"; "request_gen" => gen, ); - let next_gen = gen + 1; loop { let state = { let mock_state = state_watcher.borrow_and_update(); @@ -298,7 +297,7 @@ async fn instance_state_monitor( // We are single-stepping, and have not yet reached the // requested generation. Keep waiting until single-stepped to // where we need to be. - Some(g) if next_gen > g => { + Some(g) if gen > g => { slog::info!( rqctx.log, "instance state monitor: wait for single step..."; @@ -310,7 +309,7 @@ async fn instance_state_monitor( // Otherwise, if we have stepped to the requested generation, or // if we are not in single-step mode, just return the current // thing. - _ => mock_state.queue.get(&next_gen).cloned(), + _ => mock_state.queue.get(&gen).cloned(), } }; @@ -319,7 +318,6 @@ async fn instance_state_monitor( rqctx.log, "instance state monitor"; "request_gen" => gen, - "current_gen" => next_gen, "state" => ?state.state, ); // Advance to the state with the generation we showed to the @@ -332,7 +330,7 @@ async fn instance_state_monitor( .await .as_mut() .expect("if we didn't have an instance, we shouldn't have gotten here") - .curr_gen = next_gen; + .curr_gen = gen; return Ok(HttpResponseOk(state)); }