Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bin/mock-server/src/lib/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,16 @@ pub struct InstanceSerialHistoryParams {
/// `max_bytes`.
pub max_bytes: Option<u64>,
}

#[derive(
Copy, Clone, Debug, PartialEq, Eq, JsonSchema, Serialize, Deserialize,
)]
pub enum MockMode {
/// The mock server should run freely, advancing the state every time the
/// instance_state_monitor endpoint is requested while new state
/// transitions are queued.
Run,
/// The mock server should only advance the current state when the
/// /mock/step endpoint is requested.
SingleStep,
}
178 changes: 157 additions & 21 deletions bin/mock-server/src/lib/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tokio_tungstenite::WebSocketStream;

mod api_types;
use api_types::types::{self as api, InstanceEnsureRequest};
pub use api_types::MockMode;

#[derive(Debug, Eq, PartialEq, Error)]
pub enum Error {
Expand All @@ -41,22 +42,29 @@ pub struct InstanceContext {
/// The instance's current generation last observed by the
/// `instance-state-monitor` endpoint.
curr_gen: u64,
/// The next generation to use when inserting new state(s) into the queue.
next_queue_gen: u64,
pub properties: api::InstanceProperties,
serial: Arc<serial::Serial>,
serial_task: serial::SerialTask,
state_watcher_rx:
watch::Receiver<BTreeMap<u64, api::InstanceStateMonitorResponse>>,
state_watcher_tx:
watch::Sender<BTreeMap<u64, api::InstanceStateMonitorResponse>>,
state_watcher_rx: watch::Receiver<MockState>,
state_watcher_tx: watch::Sender<MockState>,
}

struct MockState {
queue: BTreeMap<u64, api::InstanceStateMonitorResponse>,
/// The next generation to use when inserting new state(s) into the queue.
next_queue_gen: u64,
/// Current generation when single-stepping.
///
/// This is set when setting the single-step mock mode, and unset if not in
/// that mode.
single_step_gen: Option<u64>,
}

impl InstanceContext {
pub fn new(properties: api::InstanceProperties, _log: &Logger) -> Self {
let (state_watcher_tx, state_watcher_rx) = {
let mut states = BTreeMap::new();
states.insert(
let mut queue = BTreeMap::new();
queue.insert(
0,
api::InstanceStateMonitorResponse {
gen: 0,
Expand All @@ -67,15 +75,18 @@ impl InstanceContext {
},
},
);
watch::channel(states)
watch::channel(MockState {
queue,
single_step_gen: None,
next_queue_gen: 1,
})
};
let serial = serial::Serial::new(&properties.name);

let serial_task = serial::SerialTask::spawn();

Self {
curr_gen: 0,
next_queue_gen: 1,
properties,
serial,
serial_task,
Expand Down Expand Up @@ -157,8 +168,7 @@ impl InstanceContext {
}

fn current_state(&self) -> api::InstanceState {
self.state_watcher_rx
.borrow()
self.state_watcher_rx.borrow().queue
.get(&self.curr_gen)
.expect("current generation must be in the queue, this is weird 'n' bad")
.state
Expand All @@ -169,11 +179,11 @@ impl InstanceContext {
log: &Logger,
states: &[api::InstanceState],
) {
self.state_watcher_tx.send_modify(|queue| {
self.state_watcher_tx.send_modify(|mock_state| {
for &state in states {
let generation = self.next_queue_gen;
self.next_queue_gen += 1;
queue.insert(
let generation = mock_state.next_queue_gen;
mock_state.next_queue_gen += 1;
mock_state.queue.insert(
generation,
api::InstanceStateMonitorResponse {
gen: generation,
Expand Down Expand Up @@ -275,11 +285,42 @@ async fn instance_state_monitor(
(state_watcher, gen)
};

slog::debug!(
rqctx.log,
"instance state monitor request";
"request_gen" => gen,
);
loop {
let next_gen = gen + 1;
let state = state_watcher.borrow().get(&next_gen).cloned();
let state = {
let mock_state = state_watcher.borrow_and_update();
match mock_state.single_step_gen {
// We are single-stepping, and have not yet reached the
// requested generation. Keep waiting until single-stepped to
// where we need to be.
Some(g) if gen > g => {
slog::info!(
rqctx.log,
"instance state monitor: wait for single step...";
"request_gen" => gen,
"current_gen" => g,
);
None
}
// Otherwise, if we have stepped to the requested generation, or
// if we are not in single-step mode, just return the current

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is subtly different from the real server's behavior, I think--should the generation number "floor" still apply in automatic mode?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I've misread this--if the requested gen hasn't been published yet, then the get on line 312 will return None and we'll end up getting the expected behavior.

I think there is still a (different) subtle difference here: if you ask this API for generation J, and the state machine has advanced to generation K > J, then the mock will return the state as it was at generation J, but the "real" state machine will return whatever data is latest. But I think this kind of difference is OK to have if it makes the test double more useful when writing sled-agent tests. (If/when we refactor so that the mock state machine is based on propolis-server's state driver, we might want to put this behavior into a different API instead of changing the semantics of instance-state-monitor, but that's a problem for another day.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we can make this behavior more realistic, but it's a bit annoying. I'd like to do that in a follow up.

// thing.
_ => mock_state.queue.get(&gen).cloned(),
}
};

if let Some(state) = state {
// Advance to the state with the generation we showed to the
slog::info!(
rqctx.log,
"instance state monitor";
"request_gen" => gen,
"state" => ?state.state,
);
// Advance to the state with the generation we showed to the
// watcher, for use in `instance_get` and when determining what
// state transitions are valid.
rqctx
Expand All @@ -289,8 +330,8 @@ async fn instance_state_monitor(
.await
.as_mut()
.expect("if we didn't have an instance, we shouldn't have gotten here")
.curr_gen = next_gen;
return Ok(HttpResponseOk(state.clone()));
.curr_gen = gen;
return Ok(HttpResponseOk(state));
}

state_watcher.changed().await.unwrap();
Expand Down Expand Up @@ -424,6 +465,98 @@ async fn instance_serial_history_get(
}))
}

#[endpoint {
method = GET,
path = "/mock/mode"
}]
async fn mock_mode_get(
rqctx: RequestContext<Arc<Context>>,
) -> Result<HttpResponseOk<MockMode>, HttpError> {
let instance = rqctx.context().instance.lock().await;
let instance = instance.as_ref().ok_or_else(|| {
HttpError::for_internal_error(
"Server not initialized (no instance)".to_string(),
)
})?;
let mode = if instance.state_watcher_rx.borrow().single_step_gen.is_some() {
MockMode::SingleStep
} else {
MockMode::Run
};
Ok(HttpResponseOk(mode))
}

#[endpoint {
method = PUT,
path = "/mock/mode"
}]
async fn mock_mode_set(
rqctx: RequestContext<Arc<Context>>,
request: TypedBody<MockMode>,
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
let instance = rqctx.context().instance.lock().await;
let instance = instance.as_ref().ok_or_else(|| {
HttpError::for_internal_error(
"Server not initialized (no instance)".to_string(),
)
})?;
let mode = request.into_inner();
instance.state_watcher_tx.send_if_modified(|mock_state| {
match mode {
MockMode::Run => {
mock_state.single_step_gen = None;
true
}
// If we're already in single-step mode, don't clobber the existing
// single-step generation.
MockMode::SingleStep if mock_state.single_step_gen.is_some() => {
false
}
// Otherwise, start single-stepping from the current generation.
MockMode::SingleStep => {
mock_state.single_step_gen = Some(instance.curr_gen);
true
}
}
});
Ok(HttpResponseUpdatedNoContent())
}

#[endpoint {
method = PUT,
path = "/mock/step"
}]
async fn mock_step(
rqctx: RequestContext<Arc<Context>>,
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
let instance = rqctx.context().instance.lock().await;
let instance = instance.as_ref().ok_or_else(|| {
HttpError::for_internal_error(
"Server not initialized (no instance)".to_string(),
)
})?;
if instance.state_watcher_rx.borrow().single_step_gen.is_none() {
return Err(HttpError::for_bad_request(
None,
"not in single-step mode".to_string(),
));
}

instance.state_watcher_tx.send_modify(|state| {
let g = state
.single_step_gen
.as_mut()
.expect("we just checked that it's set");
*g += 1;
slog::info!(
rqctx.log,
"instance state stepped to generation {g}";
"gen" => *g,
);
});
Ok(HttpResponseUpdatedNoContent())
}

mod serial {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -699,6 +832,9 @@ pub fn api() -> ApiDescription<Arc<Context>> {
api.register(instance_state_put).unwrap();
api.register(instance_serial).unwrap();
api.register(instance_serial_history_get).unwrap();
api.register(mock_mode_get).unwrap();
api.register(mock_mode_set).unwrap();
api.register(mock_step).unwrap();
api
}

Expand Down