diff --git a/Cargo.lock b/Cargo.lock index a81c2c078e39..3f5a82f5e030 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3791,13 +3791,10 @@ version = "0.0.0" dependencies = [ "anyhow", "base64", - "env_logger 0.11.5", "flate2", "futures", "libc", - "log", "once_cell", - "pin-project-lite", "regex", "sha2", "url", @@ -4961,7 +4958,6 @@ dependencies = [ "cranelift-bitset", "env_logger 0.11.5", "futures", - "indexmap 2.14.0", "log", "mutatis", "rand 0.10.1", @@ -4971,10 +4967,8 @@ dependencies = [ "smallvec 1.15.1", "target-lexicon", "tempfile", - "test-programs-artifacts", "tokio", "v8", - "wasm-compose", "wasm-encoder 0.247.0", "wasm-mutate", "wasm-smith", @@ -4987,7 +4981,6 @@ dependencies = [ "wasmtime-environ", "wasmtime-internal-core", "wasmtime-test-util", - "wasmtime-wasi", "wasmtime-wast", "wat", ] diff --git a/crates/fuzzing/Cargo.toml b/crates/fuzzing/Cargo.toml index cb6f135a3150..e26d58f7506e 100644 --- a/crates/fuzzing/Cargo.toml +++ b/crates/fuzzing/Cargo.toml @@ -33,16 +33,12 @@ wasmtime-core = { workspace = true, features = ['backtrace', 'serde'] } wasm-encoder = { workspace = true } wasm-smith = { workspace = true, features = ['serde'] } wasm-mutate = { workspace = true } -wasm-compose = { workspace = true } wasm-spec-interpreter = { path = "./wasm-spec-interpreter", optional = true } wasmi = { version = "1.0.8", default-features = false, features = ["std", "simd"] } -futures = { workspace = true, features = ['async-await'] } +futures = { workspace = true } wasmtime-test-util = { workspace = true, features = ['wast', 'component-fuzz', 'component'] } serde_json = { workspace = true } serde = { workspace = true } -test-programs-artifacts = { workspace = true } -wasmtime-wasi = { workspace = true } -indexmap = { workspace = true } [dependencies.wasmtime-cli-flags] workspace = true diff --git a/crates/fuzzing/src/generators.rs b/crates/fuzzing/src/generators.rs index a96a582f836c..2b163a7ae5bb 100644 --- a/crates/fuzzing/src/generators.rs +++ b/crates/fuzzing/src/generators.rs @@ -11,7 +11,6 @@ pub mod api; mod async_config; mod codegen_settings; -pub mod component_async; mod config; pub mod exception_ops; pub mod gc_ops; diff --git a/crates/fuzzing/src/generators/component_async.rs b/crates/fuzzing/src/generators/component_async.rs deleted file mode 100644 index cc4d542ffbe6..000000000000 --- a/crates/fuzzing/src/generators/component_async.rs +++ /dev/null @@ -1,1020 +0,0 @@ -//! For a high-level overview of this fuzz target see `fuzz_async.rs` - -#![expect(missing_docs, reason = "macro-generated code")] - -use arbitrary::{Arbitrary, Unstructured}; -use indexmap::{IndexMap, IndexSet}; - -wasmtime::component::bindgen!({ - world: "fuzz-async", - imports: { - "wasmtime-fuzz:fuzz/types.get-commands": store, - }, - exports: { default: async | store }, -}); - -use wasmtime_fuzz::fuzz::types::{ - Command, FuturePayload, StreamReadPayload, StreamReadyPayload, StreamWritePayload, -}; - -const SOFT_MAX_COMMANDS: usize = 100; -const MAX_STREAM_COUNT: u32 = 10; - -/// Structure used for the "component async" fuzzer. -/// -/// This encapsulates a list of commands for the fuzzer to run. Note that the -/// commands are not 100% arbitrary but instead they're generated similar to -/// wasm instructions where only some sequences of instructions are valid. The -/// rest of this module is dedicated to the generation of these commands. -#[derive(Debug)] -pub struct ComponentAsync { - /// A sequence of commands to run, tagged with a scope that they're run - /// within. - pub commands: Vec<(Scope, Command)>, -} - -/// The possible "scopes" that async commands run within. -#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub enum Scope { - /// The outermost layer of the host, which controls invocations of the - /// guests. - HostCaller, - - /// The first layer of the guest, or the raw exports from the root of the - /// component. - /// - /// This imports functions from the `GuestCallee`. - GuestCaller, - - /// The second layer of the guest which imports the host functions directly. - /// - /// This is then in turn imported by the `GuestCaller`. - GuestCallee, - - /// The innermost layer of the host which provides imported functions to the - /// `GuestCallee`. - HostCallee, -} - -impl Scope { - const ALL: &[Scope; 4] = &[ - Scope::HostCaller, - Scope::GuestCaller, - Scope::GuestCallee, - Scope::HostCallee, - ]; - const CALLERS: &[Scope; 3] = &[Scope::HostCaller, Scope::GuestCaller, Scope::GuestCallee]; - - fn callee(&self) -> Option { - match self { - Scope::HostCaller => Some(Scope::GuestCaller), - Scope::GuestCaller => Some(Scope::GuestCallee), - Scope::GuestCallee => Some(Scope::HostCallee), - Scope::HostCallee => None, - } - } - - fn caller(&self) -> Option { - match self { - Scope::HostCaller => None, - Scope::GuestCaller => Some(Scope::HostCaller), - Scope::GuestCallee => Some(Scope::GuestCaller), - Scope::HostCallee => Some(Scope::GuestCallee), - } - } - - fn is_host(&self) -> bool { - match self { - Scope::HostCaller | Scope::HostCallee => true, - Scope::GuestCaller | Scope::GuestCallee => false, - } - } -} - -impl Arbitrary<'_> for ComponentAsync { - fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { - let mut state = State::default(); - let mut ret = Vec::new(); - - // While there's more unstructured data, and our list of commands isn't - // too long, generate some new commands per-component. - while !u.is_empty() && ret.len() < SOFT_MAX_COMMANDS { - state.generate(u, false, &mut ret)?; - } - - // Optionally, if specified, finish up all async operations. - if u.arbitrary()? { - while !state.is_empty() { - state.generate(u, true, &mut ret)?; - } - } - - Ok(ComponentAsync { commands: ret }) - } -} - -#[derive(Default)] -struct State { - next_id: u32, - - /// List of scopes that have an active and pending call to the - /// `async-pending` function. - async_pending: Vec<(Scope, u32)>, - - /// Deferred work that can happen at any time, for example asserting the - /// result of some previous operation. - deferred: Vec<(Scope, Command)>, - - /// State associated with futures/streams and their handles within. - futures: HandleStates<(), u32>, - streams: HandleStates, -} - -#[derive(Default)] -struct HandleStates { - readers: HalfStates, - writers: HalfStates, -} - -impl HandleStates { - fn is_empty(&self) -> bool { - self.readers.is_empty() && self.writers.is_empty() - } -} - -/// State management for "half" of a future/stream read/write pair. -/// -/// This tracks all the various states of all handles in the system to be able -/// to select amongst them an arbitrary operation to perform. This structure's -/// sets are primarily manipulated through helper methods to ensure that the set -/// metadata all stays in sync. -#[derive(Default)] -struct HalfStates { - /// All known handles of this type, where they're located, etc. - handles: IndexMap, - - /// All handles which can currently be dropped. Handles can't be dropped if - /// they're in use, for example. - droppable: IndexSet, - - /// All handles which can be read/written from (depending on handle type). - /// Handles where both pairs are in the same component can't be - /// read/written to for example. - ready: IndexSet, - - /// All handles which can be transferred somewhere else. - /// - /// Some examples of non-transferable handles are: - /// - /// * writers - /// * handles with an outstanding read - /// * host-based handles that have been used at least once (FIXME #12090) - transferable: IndexSet, - - /// Handles currently being read/written to. - /// - /// Also includes state about the operation, such as whether it's been - /// dropped on the other side. - in_use: IndexMap, - - /// Handles with a pending operation which can be cancelled. - cancellable: IndexSet, -} - -enum HalfState { - Idle, - InUse, -} - -#[derive(Copy, Clone, PartialEq, Debug)] -enum Transferable { - Yes, - No, -} - -#[derive(Copy, Clone, PartialEq, Debug)] -enum Cancellable { - Yes, - No, -} - -#[derive(Copy, Clone, PartialEq, Debug)] -enum OpState { - Pending, - Dropped, -} - -#[derive(Default, Copy, Clone)] -struct StreamRead { - count: u32, -} - -#[derive(Default, Copy, Clone)] -struct StreamWrite { - item: u32, - count: u32, -} - -impl HalfStates { - fn is_empty(&self) -> bool { - self.handles.is_empty() - } - - /// Adds a new handle `id` to this set. - fn insert(&mut self, id: u32, scope: Scope, transferable: Transferable) { - let prev = self - .handles - .insert(id, (scope, HalfState::Idle, transferable)); - assert!(prev.is_none()); - assert!(self.droppable.insert(id)); - if transferable == Transferable::Yes { - self.transferable.insert(id); - } - } - - /// Removes the handle `id` for closing. - fn remove(&mut self, id: u32) -> Scope { - let (scope, state, transferable) = self.handles.swap_remove(&id).unwrap(); - assert!(matches!(state, HalfState::Idle)); - self.droppable.swap_remove(&id); - self.ready.swap_remove(&id); - if transferable == Transferable::Yes { - assert!(self.transferable.swap_remove(&id)); - } - scope - } - - /// Locks `id` in whatever scope it's currently in for the rest of its - /// lifetime, preventing its transfer. This is used as a workaround for - /// #12090. - fn lock_in_place(&mut self, id: u32) { - let (_scope, state, transferable) = self.handles.get_mut(&id).unwrap(); - assert!(matches!(state, HalfState::Idle)); - if matches!(transferable, Transferable::Yes) { - assert!(self.transferable.swap_remove(&id)); - *transferable = Transferable::No; - } - } - - /// Starts an operation on the handle `id`. - fn start(&mut self, id: u32, cancellable: Cancellable, payload: T) { - let (_scope, state, transferable) = self.handles.get_mut(&id).unwrap(); - assert!(matches!(state, HalfState::Idle)); - assert!(self.ready.swap_remove(&id)); - self.droppable.swap_remove(&id); - *state = HalfState::InUse; - let prev = self.in_use.insert(id, (payload, OpState::Pending)); - assert!(prev.is_none()); - if *transferable == Transferable::Yes { - assert!(self.transferable.swap_remove(&id)); - } - if cancellable == Cancellable::Yes { - assert!(self.cancellable.insert(id)); - } - } - - /// Completes an operation on `id`, returning the state it was started with - /// along with whether it was dropped. - fn stop(&mut self, id: u32) -> (T, OpState) { - let (_scope, state, transferable) = self.handles.get_mut(&id).unwrap(); - assert!(matches!(state, HalfState::InUse)); - *state = HalfState::Idle; - let dropped = self.in_use.swap_remove(&id).unwrap(); - self.cancellable.swap_remove(&id); - if *transferable == Transferable::Yes { - assert!(self.transferable.insert(id)); - } - assert!(self.droppable.insert(id)); - if dropped.1 != OpState::Dropped { - assert!(self.ready.insert(id)); - } else { - self.lock_in_place(id); - } - dropped - } - - /// Updates to `OpState::Dropped` for an operation-in-progress. - fn set_in_use_state_dropped(&mut self, id: u32) { - let (_, prev) = self.in_use.get_mut(&id).unwrap(); - assert_eq!(*prev, OpState::Pending); - *prev = OpState::Dropped; - - // This operation is now "cancellable" meaning that at any point in the - // future it can be resolved since the other end was dropped. - self.cancellable.insert(id); - } -} - -impl State { - fn is_empty(&self) -> bool { - let State { - next_id: _, - async_pending, - deferred, - futures, - streams, - } = self; - async_pending.is_empty() && deferred.is_empty() && futures.is_empty() && streams.is_empty() - } - - fn generate( - &mut self, - u: &mut Unstructured<'_>, - finish: bool, - commands: &mut Vec<(Scope, Command)>, - ) -> arbitrary::Result<()> { - let mut choices = Vec::new(); - - // If we're not finishing up then have the possibility of - // immediately-ready sync/async calls and such sort of miscellaneous - // work. - if !finish { - choices.push(Choice::SyncReadyCall); - choices.push(Choice::AsyncReadyCall); - choices.push(Choice::FutureNew); - choices.push(Choice::StreamNew); - } - - // If we're not finishing up, and if we don't have too much pending - // work, then possibly make some more pending work. - if !finish && self.async_pending.len() < 20 { - choices.push(Choice::AsyncPendingCall); - } - - // If there's pending work, possibly resolve something. - if self.async_pending.len() > 0 { - choices.push(Choice::AsyncPendingResolve); - } - - // If something has been deferred to later, possibly add that command - // into the stream. - if self.deferred.len() > 0 { - choices.push(Choice::Deferred); - } - - // Wrap up work with futures by dropping handles, writing, cancelling, - // etc. - if self.futures.readers.droppable.len() > 0 { - choices.push(Choice::FutureDropReadable); - } - if self.futures.writers.droppable.len() > 0 { - choices.push(Choice::FutureDropWritable); - } - if self.futures.writers.cancellable.len() > 0 { - choices.push(Choice::FutureCancelWrite); - } - if self.futures.readers.cancellable.len() > 0 { - choices.push(Choice::FutureCancelRead); - } - // If more work is allowed kick of reads/transfers. - if !finish { - if self.futures.writers.ready.len() > 0 { - choices.push(Choice::FutureWrite); - } - if self.futures.readers.ready.len() > 0 { - choices.push(Choice::FutureRead); - } - if self.futures.readers.transferable.len() > 0 { - choices.push(Choice::FutureReaderTransfer); - } - } - - // Streams can be dropped at any time and their pending operations can - // be ceased at any time. - if self.streams.readers.droppable.len() > 0 { - choices.push(Choice::StreamDropReadable); - } - if self.streams.writers.droppable.len() > 0 { - choices.push(Choice::StreamDropWritable); - } - if self.streams.readers.cancellable.len() > 0 { - choices.push(Choice::StreamEndRead); - } - if self.streams.writers.cancellable.len() > 0 { - choices.push(Choice::StreamEndWrite); - } - // If more work is allowed then streams can be moved around and new - // reads/writes may be started. - if !finish { - if self.streams.readers.transferable.len() > 0 { - choices.push(Choice::StreamReaderTransfer); - } - if self.streams.readers.ready.len() > 0 { - choices.push(Choice::StreamRead); - } - if self.streams.writers.ready.len() > 0 { - choices.push(Choice::StreamWrite); - } - } - - #[derive(Debug)] - enum Choice { - SyncReadyCall, - AsyncReadyCall, - AsyncPendingCall, - AsyncPendingResolve, - Deferred, - - FutureNew, - FutureReaderTransfer, - FutureRead, - FutureWrite, - FutureCancelRead, - FutureCancelWrite, - FutureDropReadable, - FutureDropWritable, - - StreamNew, - StreamReaderTransfer, - StreamDropReadable, - StreamDropWritable, - StreamRead, - StreamWrite, - StreamEndRead, - StreamEndWrite, - } - - match u.choose(&choices)? { - Choice::SyncReadyCall => { - let caller = *u.choose(Scope::CALLERS)?; - commands.push((caller, Command::SyncReadyCall)); - } - Choice::AsyncReadyCall => { - let caller = *u.choose(Scope::CALLERS)?; - commands.push((caller, Command::AsyncReadyCall)); - } - - Choice::AsyncPendingCall => { - let caller = *u.choose(Scope::CALLERS)?; - let id = self.next_id(); - self.async_pending.push((caller, id)); - commands.push((caller, Command::AsyncPendingImportCall(id))); - } - - Choice::AsyncPendingResolve => { - let index = u.int_in_range(0..=self.async_pending.len() - 1)?; - let (caller, id) = self.async_pending.swap_remove(index); - let callee = caller.callee().unwrap(); - - // FIXME(#11833) the host can't cancel calls at this time, so - // they can only be completed. Everything else though is - // guest-initiated which means that the call can be either - // completed or cancelled. - let complete = caller == Scope::HostCaller || u.arbitrary()?; - - if complete { - commands.push((callee, Command::AsyncPendingExportComplete(id))); - self.deferred - .push((caller, Command::AsyncPendingImportAssertReady(id))); - } else { - commands.push((caller, Command::AsyncPendingImportCancel(id))); - self.deferred - .push((callee, Command::AsyncPendingExportAssertCancelled(id))); - } - } - - Choice::Deferred => { - let index = u.int_in_range(0..=self.deferred.len() - 1)?; - let (scope, cmd) = self.deferred.swap_remove(index); - commands.push((scope, cmd)); - } - - Choice::FutureNew => { - let scope = *u.choose(Scope::ALL)?; - let id = self.next_id(); - commands.push((scope, Command::FutureNew(id))); - self.futures.readers.insert(id, scope, Transferable::Yes); - self.futures.writers.insert(id, scope, Transferable::No); - - // Future writers cannot be dropped without writing. - assert!(self.futures.writers.droppable.swap_remove(&id)); - } - Choice::FutureReaderTransfer => { - let set = &mut self.futures.readers.transferable; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = &mut self.futures.readers.handles[&id].0; - - enum Action { - CallerTake(Scope), - GiveCallee(Scope), - } - - let action = match (scope.caller(), scope.callee()) { - (Some(caller), None) => Action::CallerTake(caller), - (None, Some(callee)) => Action::GiveCallee(callee), - (Some(caller), Some(callee)) => { - if u.arbitrary()? { - Action::CallerTake(caller) - } else { - Action::GiveCallee(callee) - } - } - (None, None) => unreachable!(), - }; - match action { - Action::CallerTake(caller) => { - commands.push((caller, Command::FutureTake(id))); - *scope = caller; - } - Action::GiveCallee(callee) => { - commands.push((*scope, Command::FutureGive(id))); - *scope = callee; - } - } - - // See what scope the reader/writer half are in. Allow - // operations if they're in different scopes, but disallow - // operations if they're in the same scope. - let reader_scope = Some(*scope); - let writer_scope = self.futures.writers.handles.get(&id).map(|p| p.0); - if reader_scope == writer_scope { - self.futures.readers.ready.swap_remove(&id); - self.futures.writers.ready.swap_remove(&id); - } else { - self.futures.readers.ready.insert(id); - if writer_scope.is_some() && !self.futures.writers.in_use.contains_key(&id) { - self.futures.writers.ready.insert(id); - } - } - } - Choice::FutureRead => { - let set = &self.futures.readers.ready; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.futures.readers.handles[&id].0; - - if let Some((item, _)) = self.futures.writers.in_use.get(&id) { - // If the future has an active write, then this should - // complete with that write. The write is then resolved and - // the future reader/writer are both gone. - let item = *item; - commands.push(( - scope, - Command::FutureReadReady(FuturePayload { future: id, item }), - )); - let write_scope = self.futures.writers.handles[&id].0; - commands.push((write_scope, Command::FutureWriteAssertComplete(id))); - - self.futures.writers.stop(id); - self.futures.readers.remove(id); - self.futures.writers.remove(id); - } else { - // If the write-end is idle, then this should be a pending - // future read. - // - // FIXME(#12090) host reads cannot be cancelled - let cancellable = if scope.is_host() { - Cancellable::No - } else { - Cancellable::Yes - }; - self.futures.readers.start(id, cancellable, ()); - commands.push((scope, Command::FutureReadPending(id))); - } - } - Choice::FutureWrite => { - let set = &self.futures.writers.ready; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.futures.writers.handles[&id].0; - let item = self.next_id(); - let payload = FuturePayload { future: id, item }; - - if !self.futures.readers.handles.contains_key(&id) { - // If the reader is gone then this write should complete - // immediately with "dropped" and furthermore the writer - // should now be removed. - commands.push((scope, Command::FutureWriteDropped(id))); - self.futures.writers.remove(id); - } else if self.futures.readers.in_use.contains_key(&id) { - // If the reader is in-progress then this should complete - // the read/write pair. The reader/writer are both removed - // as a result. - commands.push((scope, Command::FutureWriteReady(payload))); - let read_scope = self.futures.readers.handles[&id].0; - commands.push((read_scope, Command::FutureReadAssertComplete(payload))); - self.futures.readers.stop(id); - self.futures.readers.remove(id); - self.futures.writers.remove(id); - } else { - // If the read-end is idle, then this should be a pending - // future read. - self.futures.writers.start(id, Cancellable::Yes, item); - commands.push((scope, Command::FutureWritePending(payload))); - } - } - Choice::FutureCancelWrite => { - let set = &self.futures.writers.cancellable; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.futures.writers.handles[&id].0; - - let (_write, state) = self.futures.writers.stop(id); - match state { - OpState::Pending => { - commands.push((scope, Command::FutureCancelWrite(id))); - assert!(self.futures.writers.droppable.swap_remove(&id)); - } - OpState::Dropped => { - commands.push((scope, Command::FutureWriteAssertDropped(id))); - self.futures.writers.remove(id); - } - } - } - Choice::FutureCancelRead => { - let set = &self.futures.readers.cancellable; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.futures.readers.handles[&id].0; - - let (_read, state) = self.futures.readers.stop(id); - match state { - OpState::Pending => { - commands.push((scope, Command::FutureCancelRead(id))); - } - // Writers cannot be dropped with futures, so this is not - // reachable. - OpState::Dropped => unreachable!(), - } - } - Choice::FutureDropReadable => { - let set = &self.futures.readers.droppable; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.futures.readers.remove(id); - commands.push((scope, Command::FutureDropReadable(id))); - - // If the writer is active then its write is now destined to - // finish with "dropped", and otherwise the writer is also now - // droppable since the reader handle is gone. - if self.futures.writers.in_use.contains_key(&id) { - self.futures.writers.set_in_use_state_dropped(id); - } else { - assert!(self.futures.writers.droppable.insert(id)); - } - } - Choice::FutureDropWritable => { - let set = &self.futures.writers.droppable; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.futures.writers.remove(id); - - // Writers can't actually be dropped prior to writing so fake - // a write by writing a value and asserting that the result is - // "dropped". - commands.push((scope, Command::FutureWriteDropped(id))); - - assert!(!self.futures.readers.handles.contains_key(&id)); - } - - Choice::StreamNew => { - let scope = *u.choose(Scope::ALL)?; - let id = self.next_id(); - commands.push((scope, Command::StreamNew(id))); - self.streams.readers.insert(id, scope, Transferable::Yes); - self.streams.writers.insert(id, scope, Transferable::No); - } - Choice::StreamReaderTransfer => { - let set = &mut self.streams.readers.transferable; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = &mut self.streams.readers.handles[&id].0; - - enum Action { - CallerTake(Scope), - GiveCallee(Scope), - } - - let action = match (scope.caller(), scope.callee()) { - (Some(caller), None) => Action::CallerTake(caller), - (None, Some(callee)) => Action::GiveCallee(callee), - (Some(caller), Some(callee)) => { - if u.arbitrary()? { - Action::CallerTake(caller) - } else { - Action::GiveCallee(callee) - } - } - (None, None) => unreachable!(), - }; - match action { - Action::CallerTake(caller) => { - commands.push((caller, Command::StreamTake(id))); - *scope = caller; - } - Action::GiveCallee(callee) => { - commands.push((*scope, Command::StreamGive(id))); - *scope = callee; - } - } - - // See what scope the reader/writer half are in. Allow - // operations if they're in different scopes, but disallow - // operations if they're in the same scope. - // - // Note that host<->host reads/writes for streams aren't fuzzed - // at this time so that's also explicitly disallowed. - let reader_scope = Some(*scope); - let writer_scope = self.streams.writers.handles.get(&id).map(|p| p.0); - if reader_scope == writer_scope - || reader_scope.is_some_and(|s| s.is_host()) - == writer_scope.is_some_and(|s| s.is_host()) - { - self.streams.readers.ready.swap_remove(&id); - self.streams.writers.ready.swap_remove(&id); - } else { - self.streams.readers.ready.insert(id); - if writer_scope.is_some() && !self.streams.writers.in_use.contains_key(&id) { - self.streams.writers.ready.insert(id); - } - } - } - Choice::StreamDropReadable => { - let set = &self.streams.readers.droppable; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.streams.readers.remove(id); - commands.push((scope, Command::StreamDropReadable(id))); - - if self.streams.writers.in_use.contains_key(&id) { - self.streams.writers.set_in_use_state_dropped(id); - } - } - Choice::StreamDropWritable => { - let set = &self.streams.writers.droppable; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.streams.writers.remove(id); - commands.push((scope, Command::StreamDropWritable(id))); - - if self.streams.readers.in_use.contains_key(&id) { - self.streams.readers.set_in_use_state_dropped(id); - } - } - Choice::StreamRead => { - let set = &self.streams.readers.ready; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.streams.readers.handles[&id].0; - let count = u.int_in_range(0..=MAX_STREAM_COUNT)?; - - // FIXME(#12090) - if scope.is_host() { - self.streams.readers.lock_in_place(id); - } - - if !self.streams.writers.handles.contains_key(&id) { - // If the write handle is dropped, then this should - // immediately report as such. - commands.push(( - scope, - Command::StreamReadDropped(StreamReadPayload { stream: id, count }), - )); - // Can't read from this stream again, so it's not ready, - // and then we also can't lift/lower it any more so it's - // locked in place. - assert!(self.streams.readers.ready.swap_remove(&id)); - self.streams.readers.lock_in_place(id); - } else if self.streams.writers.in_use.contains_key(&id) { - // If the write handle is active then this read should - // complete immediately. - let write_count = self.streams.writers.in_use[&id].0.count; - let write_scope = self.streams.writers.handles[&id].0; - let min = count.min(write_count); - - match (count, write_count) { - // Two zero-length operations rendezvousing will leave - // the reader blocked but the writer should wake up. A - // nonzero-length read and a 0-length write performs - // the same way too. - (0, 0) | (1.., 0) => { - self.streams - .readers - .start(id, Cancellable::Yes, StreamRead { count }); - commands.push(( - scope, - Command::StreamReadPending(StreamReadPayload { stream: id, count }), - )); - self.streams.writers.stop(id); - commands.push(( - write_scope, - Command::StreamWriteAssertComplete(StreamReadPayload { - stream: id, - count: min, - }), - )); - } - - // A zero-length read with a nonzero-length-write - // should wake up just the reader and do nothing to the - // writer. - (0, 1..) => { - commands.push(( - scope, - Command::StreamReadReady(StreamReadyPayload { - stream: id, - item: 0, - ready_count: min, - op_count: count, - }), - )); - } - - // With two nonzero lengths both operations should complete. - (1.., 1..) => { - let (write, _) = self.streams.writers.stop(id); - commands.push(( - scope, - Command::StreamReadReady(StreamReadyPayload { - stream: id, - item: write.item, - ready_count: min, - op_count: count, - }), - )); - commands.push(( - write_scope, - Command::StreamWriteAssertComplete(StreamReadPayload { - stream: id, - count: min, - }), - )); - } - } - } else { - // If the write handle is not active then this should be in - // a pending state now. - self.streams - .readers - .start(id, Cancellable::Yes, StreamRead { count }); - commands.push(( - scope, - Command::StreamReadPending(StreamReadPayload { stream: id, count }), - )); - } - } - Choice::StreamWrite => { - let set = &self.streams.writers.ready; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.streams.writers.handles[&id].0; - let item = self.next_id(); - let count = u.int_in_range(0..=MAX_STREAM_COUNT)?; - - // FIXME(#12090) - if scope.is_host() { - self.streams.writers.lock_in_place(id); - } - - if !self.streams.readers.handles.contains_key(&id) { - // If the read handle is dropped, then this should - // immediately report as such. - commands.push(( - scope, - Command::StreamWriteDropped(StreamWritePayload { - stream: id, - item, - count, - }), - )); - // Cannot write ever again to this handle so remove it from - // the writable set. - assert!(self.streams.writers.ready.swap_remove(&id)); - } else if self.streams.readers.in_use.contains_key(&id) { - // If the read handle is active then this write should - // complete immediately. - let read_count = self.streams.readers.in_use[&id].0.count; - let read_scope = self.streams.readers.handles[&id].0; - let min = count.min(read_count); - - match (read_count, count) { - // A zero-length write, no matter what the read half is - // pending as, is always ready and doesn't affect the - // reader. - (_, 0) => { - commands.push(( - scope, - Command::StreamWriteReady(StreamReadyPayload { - stream: id, - item, - op_count: count, - ready_count: min, - }), - )); - } - - // With a zero-length read and a nonzero-length write - // the writer is blocked but the reader is unblocked. - (0, 1..) => { - self.streams.writers.start( - id, - Cancellable::Yes, - StreamWrite { item, count }, - ); - commands.push(( - scope, - Command::StreamWritePending(StreamWritePayload { - stream: id, - item, - count, - }), - )); - self.streams.readers.stop(id); - commands.push(( - read_scope, - Command::StreamReadAssertComplete(StreamWritePayload { - stream: id, - item, - count: min, - }), - )); - } - - // Nonzero sizes means that the write immediately - // finishes and the read is also now ready to complete. - (1.., 1..) => { - commands.push(( - scope, - Command::StreamWriteReady(StreamReadyPayload { - stream: id, - item, - op_count: count, - ready_count: min, - }), - )); - self.streams.readers.stop(id); - commands.push(( - read_scope, - Command::StreamReadAssertComplete(StreamWritePayload { - stream: id, - item, - count: min, - }), - )); - } - } - } else { - // If the read handle is not active then this should be in - // a pending state now. - self.streams - .writers - .start(id, Cancellable::Yes, StreamWrite { item, count }); - commands.push(( - scope, - Command::StreamWritePending(StreamWritePayload { - stream: id, - item, - count, - }), - )); - } - } - Choice::StreamEndRead => { - let set = &self.streams.readers.cancellable; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.streams.readers.handles[&id].0; - - let (_read, state) = self.streams.readers.stop(id); - match state { - OpState::Pending => { - commands.push((scope, Command::StreamCancelRead(id))); - } - OpState::Dropped => { - commands.push((scope, Command::StreamReadAssertDropped(id))); - } - } - } - Choice::StreamEndWrite => { - let set = &self.streams.writers.cancellable; - let i = u.int_in_range(0..=set.len() - 1)?; - let id = *set.get_index(i).unwrap(); - let scope = self.streams.writers.handles[&id].0; - - let (_write, state) = self.streams.writers.stop(id); - match state { - OpState::Pending => { - commands.push((scope, Command::StreamCancelWrite(id))); - } - OpState::Dropped => { - commands.push(( - scope, - Command::StreamWriteAssertDropped(StreamReadPayload { - stream: id, - count: 0, - }), - )); - } - } - } - } - Ok(()) - } - - fn next_id(&mut self) -> u32 { - let id = self.next_id; - self.next_id += 1; - id - } -} diff --git a/crates/fuzzing/src/lib.rs b/crates/fuzzing/src/lib.rs index dcc69d1ab8a3..8721c814903c 100644 --- a/crates/fuzzing/src/lib.rs +++ b/crates/fuzzing/src/lib.rs @@ -49,7 +49,6 @@ pub fn init_fuzzing() { /// before the fuzz target itself. pub fn misc_init() { init_fuzzing(); - oracles::component_async::init(); } fn block_on(future: F) -> F::Output { diff --git a/crates/fuzzing/src/oracles.rs b/crates/fuzzing/src/oracles.rs index 03d68d408c16..96dfe31be1dd 100644 --- a/crates/fuzzing/src/oracles.rs +++ b/crates/fuzzing/src/oracles.rs @@ -11,7 +11,6 @@ //! panicking. pub mod component_api; -pub mod component_async; #[cfg(feature = "fuzz-spec-interpreter")] pub mod diff_spec; pub mod diff_wasmi; diff --git a/crates/fuzzing/src/oracles/component_async.rs b/crates/fuzzing/src/oracles/component_async.rs deleted file mode 100644 index b57708814bc8..000000000000 --- a/crates/fuzzing/src/oracles/component_async.rs +++ /dev/null @@ -1,1480 +0,0 @@ -//! For a high-level overview of this fuzz target see `fuzz_async.rs` - -use crate::block_on; -use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest; -use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command}; -use crate::generators::component_async::wasmtime_fuzz::fuzz::types; -use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope}; -use futures::channel::oneshot; -use std::collections::{HashMap, HashSet}; -use std::mem; -use std::pin::Pin; -use std::sync::{Arc, OnceLock, Weak}; -use std::task::{Context, Poll, Waker}; -use std::time::Instant; -use wasmtime::component::{ - Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer, - FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer, - StreamReader, StreamResult, VecBuffer, -}; -use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut}; -use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView}; - -static STATE: OnceLock<(Engine, FuzzAsyncPre)> = OnceLock::new(); - -/// Initializes state for future fuzz runs. -/// -/// This will create an `Engine` to run this fuzzer within and it will -/// additionally precompile the component that will be used for fuzzing. -/// -/// There are a few points of note about this: -/// -/// * The `misc` fuzzer is manually instrumented with this function as the init -/// hook to ensure this runs before any other fuzzing. -/// -/// * Compilation of the component takes quite some time with -/// fuzzing-instrumented Cranelift. To assist with local development this -/// implements a cache which is serialized/deserialized via an env var. -pub fn init() { - crate::init_fuzzing(); - - STATE.get_or_init(|| { - let mut config = Config::new(); - config.wasm_component_model_async(true); - let engine = Engine::new(&config).unwrap(); - let component = compile(&engine); - let mut linker = Linker::new(&engine); - wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap(); - async_test::add_to_linker::<_, HasSelf>(&mut linker, |d| d).unwrap(); - types::add_to_linker::<_, HasSelf>(&mut linker, |d| d).unwrap(); - - let pre = linker.instantiate_pre(&component).unwrap(); - let pre = FuzzAsyncPre::new(pre).unwrap(); - - (engine, pre) - }); - - fn compile(engine: &Engine) -> Component { - let wasm_path = test_programs_artifacts::FUZZ_ASYNC_COMPONENT; - let wasm = test_programs_artifacts::fuzz_async_component_bytes!(); - let wasm = &wasm[..]; - let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok(); - if let Some(path) = &cwasm_cache - && let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified()) - && let Ok(wasm_mtime) = std::fs::metadata(wasm_path).and_then(|m| m.modified()) - && cwasm_mtime > wasm_mtime - { - log::debug!("Using cached component async cwasm at {path}"); - unsafe { - return Component::deserialize_file(engine, path).unwrap(); - } - } - - let composition = { - let mut config = wasm_compose::config::Config::default(); - let tempdir = tempfile::TempDir::new().unwrap(); - let path = tempdir.path().join("fuzz-async.wasm"); - std::fs::write(&path, wasm).unwrap(); - config.definitions.push(path.clone()); - - wasm_compose::composer::ComponentComposer::new(&path, &config) - .compose() - .unwrap() - }; - let start = Instant::now(); - let component = Component::new(&engine, &composition).unwrap(); - if let Some(path) = cwasm_cache { - log::debug!("Caching component async cwasm to {path}"); - std::fs::write(path, &component.serialize().unwrap()).unwrap(); - } else if start.elapsed() > std::time::Duration::from_secs(1) { - eprintln!( - " -!!!!!!!!!!!!!!!!!!!!!!!!!! - -Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to -cache compilation results - -!!!!!!!!!!!!!!!!!!!!!!!!!! -" - ); - } - return component; - } -} - -#[derive(Default)] -struct Data { - ctx: WasiCtx, - table: ResourceTable, - wakers: HashMap, - commands: Vec<(Scope, Command)>, - - guest_caller_stream: Option>, - guest_callee_stream: Option>, - - host_pending_async_calls: HashMap>, - host_pending_async_calls_cancelled: HashSet, - guest_pending_async_calls_ready: HashSet, - - // State of futures/streams. Note that while #12091 is unresolved an - // `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams - // and the various halves we're interacting with using traits. - host_futures: HashMap>, - host_future_producers: HashMap)>, - host_future_consumers: HashMap)>, - host_streams: HashMap>, - host_stream_producers: HashMap)>, - host_stream_consumers: HashMap)>, -} - -impl WasiView for Data { - fn ctx(&mut self) -> WasiCtxView<'_> { - WasiCtxView { - ctx: &mut self.ctx, - table: &mut self.table, - } - } -} - -impl async_test::HostWithStore for HasSelf { - async fn async_ready(_store: &Accessor) {} - - async fn async_pending(store: &Accessor, id: u32) { - let (tx, rx) = oneshot::channel(); - store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx)); - let record = RecordCancelOnDrop { store, id }; - rx.await.unwrap(); - mem::forget(record); - - struct RecordCancelOnDrop<'a, T: 'static> { - store: &'a Accessor>, - id: u32, - } - - impl Drop for RecordCancelOnDrop<'_, T> { - fn drop(&mut self) { - self.store.with(|mut s| { - s.get().host_pending_async_calls_cancelled.insert(self.id); - }); - } - } - } - - async fn init(_store: &Accessor, _scope: types::Scope) {} -} - -impl async_test::Host for Data { - fn sync_ready(&mut self) {} - - fn future_take(&mut self, id: u32) -> FutureReader { - self.host_futures.remove(&id).unwrap() - } - - fn future_receive(&mut self, id: u32, future: FutureReader) { - let prev = self.host_futures.insert(id, future); - assert!(prev.is_none()); - } - - fn stream_take(&mut self, id: u32) -> StreamReader { - self.host_streams.remove(&id).unwrap() - } - - fn stream_receive(&mut self, id: u32, stream: StreamReader) { - let prev = self.host_streams.insert(id, stream); - assert!(prev.is_none()); - } -} - -impl types::HostWithStore for HasSelf { - fn get_commands( - mut store: Access<'_, T, Self>, - scope: types::Scope, - ) -> StreamReader { - let data = store.get(); - match scope { - types::Scope::Caller => data.guest_caller_stream.take().unwrap(), - types::Scope::Callee => data.guest_callee_stream.take().unwrap(), - } - } -} - -impl types::Host for Data {} - -/// Executes the `input` provided, assuming that `init` has been previously -/// executed. -pub fn run(mut input: ComponentAsync) { - log::debug!("Running component async fuzz test with\n{input:?}"); - - // Commands are executed in the order that they're listed in the input, but - // to make it easier on the `StreamProducer` implementation below they're - // popped off the back. To ensure that they're all delivered in the right - // order reverse the list to ensure the correct order is maintained. - input.commands.reverse(); - - let (engine, pre) = STATE.get().unwrap(); - let mut store = Store::new( - engine, - Data { - ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(), - commands: input.commands, - ..Data::default() - }, - ); - - let guest_caller_stream = - StreamReader::new(&mut store, SharedStream(Scope::GuestCaller)).unwrap(); - let guest_callee_stream = - StreamReader::new(&mut store, SharedStream(Scope::GuestCallee)).unwrap(); - store.data_mut().guest_caller_stream = Some(guest_caller_stream); - store.data_mut().guest_callee_stream = Some(guest_callee_stream); - block_on(async { - let instance = pre.instantiate_async(&mut store).await.unwrap(); - let test = instance.wasmtime_fuzz_fuzz_async_test(); - - let mut host_caller = SharedStream(Scope::HostCaller); - let mut host_callee = SharedStream(Scope::HostCallee); - store - .run_concurrent(async |store| { - // Kick off stream reads in the guest. This function will return - // but the tasks in the guest will keep running after they - // return to process stream items. - test.call_init(store, types::Scope::Caller).await.unwrap(); - - // Simultaneously process commands from both host streams. These - // will return once the entire command queue is exhausted. - futures::join!( - async { - while let Some(cmd) = host_caller.next(store).await { - host_caller_cmd(&test, store, cmd).await; - } - }, - async { - while let Some(cmd) = host_callee.next(store).await { - host_callee_cmd(store, cmd).await; - } - }, - ); - - // Note that there may still be pending async work in the guest - // (or host). It's intentional that it's not cleaned up here to - // help test situations where async work is all abruptly - // cancelled by just being dropped in the host. - }) - .await - .unwrap(); - }); -} - -/// See documentation in `fuzz_async.rs` for what's going on here. -async fn test_property(store: &Accessor, mut f: F) -> bool -where - F: FnMut(&mut Data) -> bool, -{ - for _ in 0..1000 { - let ready = store.with(|mut s| f(s.get())); - if ready { - return true; - } - - crate::YieldN(1).await; - } - - return false; -} - -async fn await_property(store: &Accessor, desc: &str, f: F) -where - F: FnMut(&mut Data) -> bool, -{ - assert!( - test_property(store, f).await, - "timed out waiting for {desc}", - ); -} - -async fn host_caller_cmd(test: &Guest, store: &Accessor, cmd: Command) { - match cmd { - Command::Ack => {} - Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(), - Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(), - Command::AsyncPendingExportComplete(_i) => todo!(), - Command::AsyncPendingExportAssertCancelled(_i) => todo!(), - Command::AsyncPendingImportCall(i) => { - struct RunPendingImport { - test: Guest, - i: u32, - } - - store.spawn(RunPendingImport { - test: test.clone(), - i, - }); - - impl AccessorTask for RunPendingImport { - async fn run(self, store: &Accessor) -> Result<()> { - self.test.call_async_pending(store, self.i).await?; - store.with(|mut s| { - s.get().guest_pending_async_calls_ready.insert(self.i); - }); - Ok(()) - } - } - } - Command::AsyncPendingImportCancel(_i) => todo!(), - Command::AsyncPendingImportAssertReady(i) => { - assert!( - test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await, - "expected async_pending import {i} to be ready", - ); - } - - Command::FutureTake(i) => { - let future = test.call_future_take(store, i).await.unwrap(); - store.with(|mut s| { - let prev = s.get().host_futures.insert(i, future); - assert!(prev.is_none()); - }); - } - Command::FutureGive(i) => { - let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap()); - test.call_future_receive(store, i, future).await.unwrap(); - } - Command::StreamTake(i) => { - let stream = test.call_stream_take(store, i).await.unwrap(); - store.with(|mut s| { - let prev = s.get().host_streams.insert(i, stream); - assert!(prev.is_none()); - }); - } - Command::StreamGive(i) => { - let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap()); - test.call_stream_receive(store, i, stream).await.unwrap(); - } - - other => future_or_stream_cmd(store, other).await, - } -} - -async fn host_callee_cmd(store: &Accessor, cmd: Command) { - match cmd { - Command::Ack => {} - Command::SyncReadyCall => todo!(), - Command::AsyncReadyCall => todo!(), - Command::AsyncPendingExportComplete(i) => store.with(|mut s| { - s.get() - .host_pending_async_calls - .remove(&i) - .unwrap() - .send(()) - .unwrap(); - }), - Command::AsyncPendingExportAssertCancelled(i) => { - assert!( - test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await, - "expected async_pending export {i} to be cancelled", - ); - } - Command::AsyncPendingImportCall(_i) => todo!(), - Command::AsyncPendingImportCancel(_i) => todo!(), - Command::AsyncPendingImportAssertReady(_i) => todo!(), - - other => future_or_stream_cmd(store, other).await, - } -} - -async fn future_or_stream_cmd(store: &Accessor, cmd: Command) { - match cmd { - // These commands should be handled above - Command::Ack - | Command::SyncReadyCall - | Command::AsyncReadyCall - | Command::AsyncPendingExportComplete(_) - | Command::AsyncPendingExportAssertCancelled(_) - | Command::AsyncPendingImportCall(_) - | Command::AsyncPendingImportCancel(_) - | Command::FutureTake(_) - | Command::FutureGive(_) - | Command::StreamTake(_) - | Command::StreamGive(_) - | Command::AsyncPendingImportAssertReady(_) => unreachable!(), - - Command::FutureNew(id) => { - store.with(|mut s| { - let arc = Arc::new(()); - let weak = Arc::downgrade(&arc); - let future = FutureReader::new(&mut s, HostFutureProducer(id, arc)).unwrap(); - let data = s.get(); - let prev = data.host_futures.insert(id, future); - assert!(prev.is_none()); - let prev = data - .host_future_producers - .insert(id, (HostFutureProducerState::Idle, weak)); - assert!(prev.is_none()); - }); - } - Command::FutureDropReadable(id) => { - store.with(|mut s| match s.get().host_futures.remove(&id) { - Some(mut future) => future.close(&mut s).unwrap(), - None => { - let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap(); - state.wake_by_ref(); - } - }) - } - Command::FutureWriteReady(payload) => { - await_property(store, "future write should be waiting", |s| { - matches!( - s.host_future_producers.get(&payload.future), - Some((HostFutureProducerState::Waiting(_), _)) - ) - }) - .await; - store.with(|mut s| { - let state = s - .get() - .host_future_producers - .get_mut(&payload.future) - .unwrap(); - match state { - (HostFutureProducerState::Waiting(waker), _) => { - waker.wake_by_ref(); - state.0 = HostFutureProducerState::Writing(payload.item); - } - (state, _) => panic!("future not waiting: {state:?}"), - } - }) - } - Command::FutureWritePending(payload) => store.with(|mut s| { - let state = s - .get() - .host_future_producers - .get_mut(&payload.future) - .unwrap(); - match state { - (HostFutureProducerState::Idle, _) => { - state.0 = HostFutureProducerState::Writing(payload.item); - } - _ => panic!("future not idle"), - } - }), - Command::FutureWriteDropped(id) => store.with(|mut s| { - let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); - assert!(matches!(state, HostFutureProducerState::Idle)); - assert!(weak.upgrade().is_none()); - }), - Command::FutureReadReady(payload) => { - let id = payload.future; - store.with(|mut s| { - let arc = Arc::new(()); - let weak = Arc::downgrade(&arc); - let data = s.get(); - let future = data.host_futures.remove(&id).unwrap(); - let prev = data - .host_future_consumers - .insert(id, (HostFutureConsumerState::Consuming, weak)); - assert!(prev.is_none()); - future.pipe(&mut s, HostFutureConsumer(id, arc)).unwrap(); - }); - - await_property(store, "future should be present", |s| { - matches!( - s.host_future_consumers[&id], - (HostFutureConsumerState::Complete(_), _) - ) - }) - .await; - - store.with(|mut s| { - let (state, _) = s.get().host_future_consumers.remove(&id).unwrap(); - match state { - HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item), - _ => panic!("future not complete"), - } - }); - } - Command::FutureReadPending(id) => { - ensure_future_reading(store, id); - store.with(|mut s| { - let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap(); - state.wake_by_ref(); - assert!( - matches!(state, HostFutureConsumerState::Idle), - "bad state: {state:?}", - ); - *state = HostFutureConsumerState::Consuming; - }) - } - Command::FutureCancelWrite(id) => store.with(|mut s| { - let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap(); - assert!(matches!(state, HostFutureProducerState::Writing(_))); - *state = HostFutureProducerState::Idle; - }), - Command::FutureCancelRead(id) => store.with(|mut s| { - let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap(); - assert!(matches!(state, HostFutureConsumerState::Consuming)); - *state = HostFutureConsumerState::Idle; - }), - Command::FutureReadAssertComplete(payload) => { - await_property(store, "future read should be complete", |s| { - matches!( - s.host_future_consumers.get(&payload.future), - Some((HostFutureConsumerState::Complete(_), _)) - ) - }) - .await; - store.with(|mut s| { - let (state, _) = s - .get() - .host_future_consumers - .remove(&payload.future) - .unwrap(); - match state { - HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item), - _ => panic!("future not complete"), - } - }) - } - Command::FutureWriteAssertComplete(id) => store.with(|mut s| { - let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); - assert!(matches!(state, HostFutureProducerState::Complete)); - assert!(weak.upgrade().is_none()); - }), - Command::FutureWriteAssertDropped(id) => store.with(|mut s| { - let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); - assert!(matches!(state, HostFutureProducerState::Writing(_))); - assert!(weak.upgrade().is_none()); - }), - - Command::StreamNew(id) => { - store.with(|mut s| { - let arc = Arc::new(()); - let weak = Arc::downgrade(&arc); - let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc)).unwrap(); - let data = s.get(); - let prev = data.host_streams.insert(id, stream); - assert!(prev.is_none()); - let prev = data - .host_stream_producers - .insert(id, (HostStreamProducerState::idle(), weak)); - assert!(prev.is_none()); - }); - } - Command::StreamDropReadable(id) => { - store.with(|mut s| match s.get().host_streams.remove(&id) { - Some(mut stream) => stream.close(&mut s).unwrap(), - None => { - let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap(); - state.wake_by_ref(); - } - }) - } - Command::StreamDropWritable(id) => store.with(|mut s| { - let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap(); - state.wake_by_ref(); - }), - Command::StreamWriteReady(payload) => { - let id = payload.stream; - store.with(|mut s| { - let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); - state.wake_by_ref(); - match state.kind { - HostStreamProducerStateKind::Idle => { - state.kind = HostStreamProducerStateKind::Writing(stream_payload( - payload.item, - payload.op_count, - )); - } - _ => panic!("stream not idle: {state:?}"), - } - }); - await_property(store, "stream should complete a write", |s| { - matches!( - s.host_stream_producers[&id].0.kind, - HostStreamProducerStateKind::Wrote(_), - ) - }) - .await; - store.with(|mut s| { - let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); - match state.kind { - HostStreamProducerStateKind::Wrote(amt) => { - assert_eq!(amt, payload.ready_count); - state.kind = HostStreamProducerStateKind::Idle; - } - _ => panic!("stream not idle: {state:?}"), - } - }); - } - Command::StreamReadReady(payload) => { - let id = payload.stream; - ensure_stream_reading(store, id); - store.with(|mut s| { - let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); - state.wake_by_ref(); - state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count); - }); - await_property(store, "stream should complete a read", |s| { - matches!( - s.host_stream_consumers[&id].0.kind, - HostStreamConsumerStateKind::Consumed(_), - ) - }) - .await; - - store.with(|mut s| { - let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); - match &state.kind { - HostStreamConsumerStateKind::Consumed(last_read) => { - assert_eq!( - *last_read, - stream_payload(payload.item, payload.ready_count) - ); - state.kind = HostStreamConsumerStateKind::Idle; - } - _ => panic!("future not complete"), - } - }); - } - Command::StreamWritePending(payload) => store.with(|mut s| { - let (state, _) = s - .get() - .host_stream_producers - .get_mut(&payload.stream) - .unwrap(); - state.wake_by_ref(); - match state.kind { - HostStreamProducerStateKind::Idle => { - state.kind = HostStreamProducerStateKind::Writing(stream_payload( - payload.item, - payload.count, - )); - } - _ => panic!("stream not idle {:?}", state.kind), - } - }), - Command::StreamReadPending(payload) => { - ensure_stream_reading(store, payload.stream); - store.with(|mut s| { - let (state, _) = s - .get() - .host_stream_consumers - .get_mut(&payload.stream) - .unwrap(); - state.wake_by_ref(); - assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle)); - state.kind = HostStreamConsumerStateKind::Consuming(payload.count); - }) - } - Command::StreamWriteDropped(payload) => store.with(|mut s| { - let (state, weak) = s - .get() - .host_stream_producers - .get_mut(&payload.stream) - .unwrap(); - assert!(matches!(state.kind, HostStreamProducerStateKind::Idle)); - assert!(weak.upgrade().is_none()); - }), - Command::StreamReadDropped(payload) => { - ensure_stream_reading(store, payload.stream); - await_property(store, "stream read should get dropped", |s| { - let weak = &s.host_stream_consumers[&payload.stream].1; - weak.upgrade().is_none() - }) - .await; - store.with(|mut s| { - let (state, weak) = s - .get() - .host_stream_consumers - .get_mut(&payload.stream) - .unwrap(); - assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle)); - assert!(weak.upgrade().is_none()); - }) - } - Command::StreamCancelWrite(id) => store.with(|mut s| { - let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); - assert!( - matches!(state.kind, HostStreamProducerStateKind::Writing(_)), - "invalid state {state:?}", - ); - state.kind = HostStreamProducerStateKind::Idle; - state.wake_by_ref(); - }), - Command::StreamCancelRead(id) => store.with(|mut s| { - let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); - assert!(matches!( - state.kind, - HostStreamConsumerStateKind::Consuming(_) - )); - state.kind = HostStreamConsumerStateKind::Idle; - }), - Command::StreamReadAssertComplete(payload) => store.with(|mut s| { - let (state, _) = s - .get() - .host_stream_consumers - .get_mut(&payload.stream) - .unwrap(); - match &state.kind { - HostStreamConsumerStateKind::Consumed(last_read) => { - assert_eq!(*last_read, stream_payload(payload.item, payload.count)); - state.kind = HostStreamConsumerStateKind::Idle; - } - _ => panic!("stream not complete"), - } - }), - Command::StreamWriteAssertComplete(payload) => store.with(|mut s| { - let (state, _) = s - .get() - .host_stream_producers - .get_mut(&payload.stream) - .unwrap(); - match state.kind { - HostStreamProducerStateKind::Wrote(amt) => { - assert_eq!(amt, payload.count); - state.kind = HostStreamProducerStateKind::Idle; - } - _ => panic!("stream not complete: {:?}", state.kind), - } - }), - Command::StreamWriteAssertDropped(payload) => { - await_property(store, "stream write should be dropped", |s| { - let weak = &s.host_stream_producers[&payload.stream].1; - weak.upgrade().is_none() - }) - .await; - store.with(|mut s| { - let (state, weak) = s - .get() - .host_stream_producers - .get_mut(&payload.stream) - .unwrap(); - assert!(matches!( - state.kind, - HostStreamProducerStateKind::Writing(_) - )); - assert!(weak.upgrade().is_none()); - }) - } - Command::StreamReadAssertDropped(id) => { - await_property(store, "stream read should be dropped", |s| { - let weak = &s.host_stream_consumers[&id].1; - weak.upgrade().is_none() - }) - .await; - store.with(|mut s| { - let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap(); - assert!(matches!( - state.kind, - HostStreamConsumerStateKind::Consuming(_), - )); - assert!(weak.upgrade().is_none()); - }) - } - } -} - -fn stream_payload(item: u32, count: u32) -> Vec { - (item..item + count).collect() -} - -fn ensure_future_reading(store: &Accessor, id: u32) { - store.with(|mut s| { - let data = s.get(); - if !data.host_futures.contains_key(&id) { - return; - } - log::debug!("future consume: start {id}"); - let arc = Arc::new(()); - let weak = Arc::downgrade(&arc); - let data = s.get(); - let future = data.host_futures.remove(&id).unwrap(); - let prev = data - .host_future_consumers - .insert(id, (HostFutureConsumerState::Idle, weak)); - assert!(prev.is_none()); - future.pipe(&mut s, HostFutureConsumer(id, arc)).unwrap(); - }); -} - -fn ensure_stream_reading(store: &Accessor, id: u32) { - store.with(|mut s| { - let data = s.get(); - if !data.host_streams.contains_key(&id) { - return; - } - log::debug!("stream consume: start {id}"); - let arc = Arc::new(()); - let weak = Arc::downgrade(&arc); - let prev = data.host_stream_consumers.insert( - id, - ( - HostStreamConsumerState { - kind: HostStreamConsumerStateKind::Idle, - waker: None, - }, - weak, - ), - ); - assert!(prev.is_none()); - let stream = data.host_streams.remove(&id).unwrap(); - stream.pipe(&mut s, HostStreamConsumer(id, arc)).unwrap(); - }); -} - -struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); - -/// Note that this is only created once a read is actually initiated on a -/// future. It's also not possible to cancel a host-based read on a future, -/// hence why this is simpler than the `HostFutureProducerState` state below. -#[derive(Debug)] -enum HostFutureConsumerState { - Idle, - Waiting(Waker), - Consuming, - Complete(u32), -} - -impl HostFutureConsumerState { - fn wake_by_ref(&mut self) { - if let HostFutureConsumerState::Waiting(waker) = &self { - waker.wake_by_ref(); - *self = HostFutureConsumerState::Idle; - } - } -} - -impl FutureConsumer for HostFutureConsumer { - type Item = u32; - - fn poll_consume( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut store: StoreContextMut<'_, Data>, - mut source: Source<'_, Self::Item>, - finish: bool, - ) -> Poll> { - let state = match store.data_mut().host_future_consumers.get_mut(&self.0) { - Some(state) => state, - None => { - log::debug!("consume: closed {}", self.0); - return Poll::Ready(Ok(())); - } - }; - match state.0 { - HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => { - if finish { - log::debug!("consume: cancel {}", self.0); - state.0 = HostFutureConsumerState::Idle; - Poll::Ready(Ok(())) - } else { - log::debug!("consume: wait {}", self.0); - state.0 = HostFutureConsumerState::Waiting(cx.waker().clone()); - Poll::Pending - } - } - HostFutureConsumerState::Consuming => { - log::debug!("consume: done {}", self.0); - let mut item = None; - source.read(&mut store, &mut item).unwrap(); - store - .data_mut() - .host_future_consumers - .get_mut(&self.0) - .unwrap() - .0 = HostFutureConsumerState::Complete(item.unwrap()); - Poll::Ready(Ok(())) - } - HostFutureConsumerState::Complete(_) => unreachable!(), - } - } -} - -struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); - -#[derive(Debug)] -enum HostFutureProducerState { - Idle, - Waiting(Waker), - Writing(u32), - Complete, -} - -impl FutureProducer for HostFutureProducer { - type Item = u32; - - fn poll_produce( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut store: StoreContextMut<'_, Data>, - finish: bool, - ) -> Poll>> { - let state = store - .data_mut() - .host_future_producers - .get_mut(&self.0) - .unwrap(); - match state.0 { - HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => { - if finish { - log::debug!("produce: cancel {}", self.0); - state.0 = HostFutureProducerState::Idle; - Poll::Ready(Ok(None)) - } else { - log::debug!("produce: wait {}", self.0); - state.0 = HostFutureProducerState::Waiting(cx.waker().clone()); - Poll::Pending - } - } - HostFutureProducerState::Writing(item) => { - log::debug!("produce: done {}", self.0); - state.0 = HostFutureProducerState::Complete; - Poll::Ready(Ok(Some(item))) - } - HostFutureProducerState::Complete => unreachable!(), - } - } -} - -struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); - -#[derive(Debug)] -struct HostStreamConsumerState { - waker: Option, - kind: HostStreamConsumerStateKind, -} - -#[derive(Debug)] -enum HostStreamConsumerStateKind { - Idle, - Consuming(u32), - Consumed(Vec), -} - -impl HostStreamConsumerState { - fn wake_by_ref(&mut self) { - if let Some(waker) = self.waker.take() { - waker.wake(); - } - } -} - -impl StreamConsumer for HostStreamConsumer { - type Item = u32; - - fn poll_consume( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut store: StoreContextMut<'_, Data>, - mut source: Source<'_, Self::Item>, - finish: bool, - ) -> Poll> { - let remaining = source.remaining(&mut store); - let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) { - Some((state, _)) => state, - None => { - log::debug!("stream consume: dropped {}", self.0); - return Poll::Ready(Ok(StreamResult::Dropped)); - } - }; - match state.kind { - HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => { - if finish { - log::debug!("stream consume: cancel {}", self.0); - state.waker = None; - Poll::Ready(Ok(StreamResult::Cancelled)) - } else { - log::debug!("stream consume: wait {}", self.0); - state.waker = Some(cx.waker().clone()); - Poll::Pending - } - } - HostStreamConsumerStateKind::Consuming(amt) => { - // The writer is performing a zero-length write. We always - // complete that without updating our own state. - if remaining == 0 { - log::debug!("stream consume: completing zero-length write {}", self.0); - return Poll::Ready(Ok(StreamResult::Completed)); - } - - // If this is a zero-length read then block the writer but update our own state. - if amt == 0 { - log::debug!("stream consume: finishing zero-length read {}", self.0); - state.kind = HostStreamConsumerStateKind::Consumed(Vec::new()); - state.waker = Some(cx.waker().clone()); - return Poll::Pending; - } - - // For non-zero sizes perform the read/copy. - log::debug!("stream consume: done {}", self.0); - let mut dst = Vec::with_capacity(amt as usize); - source.read(&mut store, &mut dst).unwrap(); - let state = &mut store - .data_mut() - .host_stream_consumers - .get_mut(&self.0) - .unwrap() - .0; - state.kind = HostStreamConsumerStateKind::Consumed(dst); - state.waker = None; - Poll::Ready(Ok(StreamResult::Completed)) - } - } - } -} - -impl Drop for HostStreamConsumer { - fn drop(&mut self) { - log::debug!("stream consume: drop {}", self.0); - } -} - -struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); - -#[derive(Debug)] -struct HostStreamProducerState { - kind: HostStreamProducerStateKind, - waker: Option, -} - -#[derive(Debug)] -enum HostStreamProducerStateKind { - Idle, - Writing(Vec), - Wrote(u32), -} - -impl HostStreamProducerState { - fn idle() -> Self { - HostStreamProducerState { - kind: HostStreamProducerStateKind::Idle, - waker: None, - } - } - - fn wake_by_ref(&mut self) { - if let Some(waker) = self.waker.take() { - waker.wake(); - } - } -} - -impl StreamProducer for HostStreamProducer { - type Item = u32; - type Buffer = VecBuffer; - - fn poll_produce( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut store: StoreContextMut<'_, Data>, - mut dst: Destination<'_, Self::Item, Self::Buffer>, - finish: bool, - ) -> Poll> { - let remaining = dst.remaining(&mut store); - let data = store.data_mut(); - let state = match data.host_stream_producers.get_mut(&self.0) { - Some((state, _)) => state, - None => { - log::debug!("stream produce: dropped {}", self.0); - return Poll::Ready(Ok(StreamResult::Dropped)); - } - }; - match &mut state.kind { - HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => { - if finish { - log::debug!("stream produce: cancel {}", self.0); - state.waker = None; - Poll::Ready(Ok(StreamResult::Cancelled)) - } else { - log::debug!("stream produce: wait {}", self.0); - state.waker = Some(cx.waker().clone()); - Poll::Pending - } - } - HostStreamProducerStateKind::Writing(buf) => { - // Keep the other side blocked for a zero-length write - // originated from the host. - if buf.len() == 0 { - log::debug!("stream produce: zero-length write {}", self.0); - state.kind = HostStreamProducerStateKind::Wrote(0); - state.waker = Some(cx.waker().clone()); - return Poll::Pending; - } - log::debug!("stream produce: write {}", self.0); - match remaining { - Some(amt) => { - // If the guest is doing a zero-length read then we've - // got some data for them. Complete the read but leave - // ourselves in the same `Writing` state as before. - if amt == 0 { - state.waker = None; - return Poll::Ready(Ok(StreamResult::Completed)); - } - - // Don't let wasmtime buffer up data for us, so truncate - // the buffer we're sending over to the amount that the - // reader is requesting. - if amt < buf.len() { - buf.truncate(amt); - } - } - - // At this time host<->host stream reads/writes aren't - // fuzzed since that brings up a bunch of weird edge cases - // which aren't fun to deal with and aren't interesting - // either. - None => unreachable!(), - } - let count = buf.len() as u32; - dst.set_buffer(mem::take(buf).into()); - state.kind = HostStreamProducerStateKind::Wrote(count); - state.waker = None; - Poll::Ready(Ok(StreamResult::Completed)) - } - } - } -} - -impl Drop for HostStreamProducer { - fn drop(&mut self) { - log::debug!("stream produce: drop {}", self.0); - } -} - -struct SharedStream(Scope); - -impl SharedStream { - async fn next(&mut self, accessor: &Accessor) -> Option { - std::future::poll_fn(|cx| { - accessor.with(|mut store| { - self.poll(cx, store.as_context_mut(), false) - .map(|pair| match pair { - (None, StreamResult::Dropped) => None, - (Some(item), StreamResult::Completed) => Some(item), - _ => unreachable!(), - }) - }) - }) - .await - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - mut store: StoreContextMut<'_, Data>, - finish: bool, - ) -> Poll<(Option, StreamResult)> { - let data = store.data_mut(); - - // If no more commands remain then this is a closed and dropped stream. - let Some((scope, command)) = data.commands.last_mut() else { - log::debug!("Stream closed: {:?}", self.0); - return Poll::Ready((None, StreamResult::Dropped)); - }; - - // If the next queued up command is for the scope that this stream is - // attached to then send off the command. - if *scope == self.0 { - let ret = Some(*command); - - // All commands are followed up with an "ack", and after the "ack" - // is delivered then the command is popped to move on to the next - // command. The reason for this is to guarantee that a command has - // been processed before moving on to the next command. This helps - // make the fuzzing easier to work with by being able to implicitly - // assume that a command has been processed by the time something - // else is. Otherwise it might be possible that wasmtime has a set - // of commands/callbacks that are all delivered at the same time and - // the component model doesn't specify what order they happen - // within. By forcing an "ack" it ensures a more expected ordering - // of execution to assist with fuzzing without losing really all - // that much coverage. - if matches!(command, Command::Ack) { - data.commands.pop(); - } else { - *command = Command::Ack; - } - - // After a command was popped other streams may be able to make - // progress so wake them all up. - for (_, waker) in data.wakers.drain() { - waker.wake(); - } - log::debug!("Delivering command {ret:?} for {:?}", self.0); - return Poll::Ready((ret, StreamResult::Completed)); - } - - // The command queue is non-empty and the next command isn't meant for - // us, so someone else needs to drain the queue. Enqueue our waker. - if finish { - Poll::Ready((None, StreamResult::Cancelled)) - } else { - data.wakers.insert(self.0, cx.waker().clone()); - Poll::Pending - } - } -} - -impl StreamProducer for SharedStream { - type Item = Command; - type Buffer = Option; - - fn poll_produce<'a>( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - store: StoreContextMut<'a, Data>, - mut destination: Destination<'a, Self::Item, Self::Buffer>, - finish: bool, - ) -> Poll> { - let (item, result) = std::task::ready!(self.poll(cx, store, finish)); - destination.set_buffer(item); - Poll::Ready(Ok(result)) - } -} - -#[cfg(test)] -mod tests { - use super::{ComponentAsync, Scope, init, run}; - use crate::oracles::component_async::types::*; - use crate::test::test_n_times; - use Scope::*; - - #[test] - fn smoke() { - init(); - - test_n_times(50, |c, _| { - run(c); - Ok(()) - }); - } - - // ======================================================================== - // A series of fuzz-generated test cases which caused problems during the - // development of this fuzzer. Feel free to delete/edit/etc if the fuzzer - // changes over time. - - #[test] - fn simple() { - init(); - - run(ComponentAsync { - commands: vec![ - (GuestCaller, Command::AsyncPendingImportCall(0)), - (GuestCallee, Command::AsyncPendingImportCall(1)), - (GuestCallee, Command::AsyncPendingExportComplete(0)), - (GuestCaller, Command::AsyncPendingImportAssertReady(0)), - (GuestCaller, Command::AsyncPendingImportCall(2)), - ], - }); - } - - #[test] - fn somewhat_larger() { - static COMMANDS: &[(Scope, Command)] = &[ - (GuestCallee, Command::FutureNew(0)), - (HostCaller, Command::FutureNew(1)), - (GuestCallee, Command::FutureReadPending(0)), - (GuestCaller, Command::AsyncPendingImportCall(2)), - (GuestCaller, Command::AsyncPendingImportCall(3)), - (GuestCaller, Command::AsyncPendingImportCall(4)), - (GuestCaller, Command::AsyncPendingImportCall(5)), - (GuestCallee, Command::AsyncPendingExportComplete(5)), - (GuestCallee, Command::AsyncPendingExportComplete(3)), - (GuestCallee, Command::AsyncPendingExportComplete(4)), - (GuestCallee, Command::AsyncPendingExportComplete(2)), - (GuestCaller, Command::AsyncPendingImportCall(6)), - (GuestCallee, Command::AsyncPendingExportComplete(6)), - (GuestCaller, Command::AsyncPendingImportCall(7)), - (GuestCallee, Command::AsyncPendingExportComplete(7)), - (GuestCaller, Command::AsyncPendingImportCall(8)), - (GuestCallee, Command::AsyncPendingExportComplete(8)), - (GuestCaller, Command::AsyncPendingImportCall(9)), - (GuestCallee, Command::AsyncPendingExportComplete(9)), - (GuestCaller, Command::AsyncPendingImportCall(10)), - (GuestCallee, Command::AsyncPendingExportComplete(10)), - (GuestCaller, Command::AsyncPendingImportCall(11)), - (GuestCallee, Command::AsyncPendingExportComplete(11)), - (GuestCaller, Command::AsyncPendingImportCall(12)), - (GuestCallee, Command::AsyncPendingExportComplete(12)), - (GuestCaller, Command::AsyncPendingImportCall(13)), - (GuestCallee, Command::AsyncPendingExportComplete(13)), - (GuestCaller, Command::AsyncPendingImportCall(14)), - (GuestCallee, Command::AsyncPendingExportComplete(14)), - (GuestCaller, Command::AsyncPendingImportCall(15)), - (GuestCallee, Command::AsyncPendingExportComplete(15)), - (GuestCaller, Command::AsyncPendingImportCall(16)), - (GuestCallee, Command::AsyncPendingExportComplete(16)), - (GuestCaller, Command::AsyncPendingImportCall(17)), - (GuestCallee, Command::AsyncPendingExportComplete(17)), - (GuestCaller, Command::AsyncPendingImportCall(18)), - (GuestCallee, Command::AsyncPendingExportComplete(18)), - (GuestCaller, Command::AsyncPendingImportCall(19)), - (GuestCallee, Command::AsyncPendingExportComplete(19)), - (GuestCaller, Command::AsyncPendingImportCall(20)), - (GuestCallee, Command::AsyncPendingExportComplete(20)), - (GuestCaller, Command::AsyncPendingImportCall(21)), - (GuestCallee, Command::AsyncPendingExportComplete(21)), - (GuestCaller, Command::AsyncPendingImportCall(22)), - (GuestCallee, Command::AsyncPendingExportComplete(22)), - (GuestCaller, Command::AsyncPendingImportCall(23)), - (GuestCallee, Command::AsyncPendingExportComplete(23)), - (GuestCaller, Command::AsyncPendingImportCall(24)), - (GuestCallee, Command::AsyncPendingExportComplete(24)), - (GuestCaller, Command::AsyncPendingImportCall(25)), - (GuestCallee, Command::AsyncPendingExportComplete(25)), - (GuestCaller, Command::AsyncPendingImportCall(26)), - (GuestCallee, Command::AsyncPendingExportComplete(26)), - (GuestCaller, Command::AsyncPendingImportCall(27)), - (GuestCallee, Command::AsyncPendingExportComplete(27)), - (GuestCaller, Command::AsyncPendingImportCall(28)), - (GuestCallee, Command::AsyncPendingExportComplete(28)), - (GuestCaller, Command::AsyncPendingImportCall(29)), - (GuestCallee, Command::AsyncPendingExportComplete(29)), - (GuestCaller, Command::AsyncPendingImportCall(30)), - (GuestCallee, Command::AsyncPendingExportComplete(30)), - (GuestCaller, Command::AsyncPendingImportCall(31)), - (GuestCallee, Command::AsyncPendingExportComplete(31)), - (GuestCaller, Command::AsyncPendingImportCall(32)), - (GuestCallee, Command::AsyncPendingExportComplete(32)), - (GuestCaller, Command::AsyncPendingImportCall(33)), - (GuestCallee, Command::AsyncPendingExportComplete(33)), - (GuestCaller, Command::AsyncPendingImportCall(34)), - (GuestCallee, Command::AsyncPendingExportComplete(34)), - (GuestCaller, Command::AsyncPendingImportCall(35)), - (GuestCallee, Command::AsyncPendingExportComplete(35)), - (GuestCaller, Command::AsyncPendingImportCall(36)), - (GuestCallee, Command::AsyncPendingExportComplete(36)), - (GuestCaller, Command::AsyncPendingImportCall(37)), - (GuestCallee, Command::AsyncPendingExportComplete(37)), - (GuestCaller, Command::AsyncPendingImportAssertReady(36)), - ]; - init(); - - run(ComponentAsync { - commands: COMMANDS.to_vec(), - }); - } - - #[test] - fn simple_stream1() { - init(); - - run(ComponentAsync { - commands: vec![ - (HostCallee, Command::StreamNew(1)), - ( - HostCallee, - Command::StreamReadPending(StreamReadPayload { - stream: 1, - count: 2, - }), - ), - (HostCallee, Command::StreamCancelRead(1)), - (GuestCaller, Command::SyncReadyCall), - ( - HostCallee, - Command::StreamWritePending(StreamWritePayload { - stream: 1, - item: 3, - count: 2, - }), - ), - (HostCallee, Command::StreamCancelWrite(1)), - (HostCallee, Command::StreamDropWritable(1)), - ( - HostCallee, - Command::StreamReadDropped(StreamReadPayload { - stream: 1, - count: 1, - }), - ), - ], - }); - } - - #[test] - fn simple_stream3() { - init(); - - run(ComponentAsync { - commands: vec![ - (GuestCaller, Command::StreamNew(26)), - ( - GuestCaller, - Command::StreamReadPending(StreamReadPayload { - stream: 26, - count: 10, - }), - ), - (GuestCaller, Command::StreamDropWritable(26)), - (GuestCaller, Command::StreamReadAssertDropped(26)), - ], - }); - } - - #[test] - fn simple_stream4() { - init(); - - run(ComponentAsync { - commands: vec![ - (GuestCaller, Command::StreamNew(23)), - ( - GuestCaller, - Command::StreamWritePending(StreamWritePayload { - stream: 23, - item: 24, - count: 2, - }), - ), - (GuestCaller, Command::StreamGive(23)), - (GuestCallee, Command::StreamDropReadable(23)), - ( - GuestCaller, - Command::StreamWriteAssertDropped(StreamReadPayload { - stream: 23, - count: 0, - }), - ), - ], - }); - } - - #[test] - fn zero_length_behavior() { - init(); - - run(ComponentAsync { - commands: vec![ - (GuestCaller, Command::StreamNew(10)), - (HostCaller, Command::StreamTake(10)), - ( - GuestCaller, - Command::StreamWritePending(StreamWritePayload { - stream: 10, - item: 13, - count: 5, - }), - ), - ( - HostCaller, - Command::StreamReadReady(StreamReadyPayload { - stream: 10, - item: 0, - op_count: 0, - ready_count: 0, - }), - ), - ( - HostCaller, - Command::StreamReadReady(StreamReadyPayload { - stream: 10, - item: 0, - op_count: 0, - ready_count: 0, - }), - ), - ], - }); - } -} diff --git a/crates/fuzzing/wit/fuzz.wit b/crates/fuzzing/wit/fuzz.wit deleted file mode 100644 index b98bdf93b32f..000000000000 --- a/crates/fuzzing/wit/fuzz.wit +++ /dev/null @@ -1,139 +0,0 @@ -// For more information on what this is used for, see `fuzz_async.rs` - -package wasmtime-fuzz:fuzz; - -interface types { - variant command { - // invoke the imported `sync-ready` function - sync-ready-call, - - // invoke the imported `async-ready` function, and assert it's ready - async-ready-call, - - // invoke the imported `async-pending` function, assert it's not - // ready, and save it with the id provided. - async-pending-import-call(u32), - // cancel's a prior call of `async-pending` with the id specified - async-pending-import-cancel(u32), - // asserts that a prior call of `async-pending` with the id specified is - // ready. - async-pending-import-assert-ready(u32), - // complete a previous invocation of this component's `async-pending` - // export. - async-pending-export-complete(u32), - // assert a previous invocation of this component's `async-pending` export - // is cancelled. - async-pending-export-assert-cancelled(u32), - - // make a future read/write combo with the `id` specified. - future-new(u32), - // take a future readable end through the `future-take` import. - future-take(u32), - // pass the future specified to this component's `future-receive` import. - future-give(u32), - // drop the future end identified - future-drop-readable(u32), - // ... - future-write-ready(future-payload), - future-read-ready(future-payload), - future-write-pending(future-payload), - future-read-pending(u32), - future-write-dropped(u32), - future-cancel-write(u32), - future-cancel-read(u32), - future-write-assert-complete(u32), - future-write-assert-dropped(u32), - future-read-assert-complete(future-payload), - - // make a stream read/write combo with the `id` specified. - stream-new(u32), - // take a stream readable end through the `stream-take` import. - stream-take(u32), - // pass the stream specified to this component's `stream-receive` import. - stream-give(u32), - // drop the stream end identified - stream-drop-readable(u32), - stream-drop-writable(u32), - // ... - stream-write-ready(stream-ready-payload), - stream-read-ready(stream-ready-payload), - stream-write-pending(stream-write-payload), - stream-read-pending(stream-read-payload), - stream-write-dropped(stream-write-payload), - stream-read-dropped(stream-read-payload), - stream-cancel-write(u32), - stream-cancel-read(u32), - stream-write-assert-complete(stream-read-payload), - stream-write-assert-dropped(stream-read-payload), - stream-read-assert-complete(stream-write-payload), - stream-read-assert-dropped(u32), - - ack, - } - - record future-payload { - %future: u32, - item: u32, - } - - record stream-ready-payload { - %stream: u32, - item: u32, - op-count: u32, - ready-count: u32, - } - - record stream-write-payload { - %stream: u32, - item: u32, - count: u32, - } - - record stream-read-payload { - %stream: u32, - count: u32, - } - - enum scope { - caller, - callee, - } - - get-commands: func(s: scope) -> stream; -} - -interface async-test { - use types.{command, scope}; - - // Initialization function. Invokes `task.return` but keeps running to receive - // commands. Commands come from the `get-commands` function which is passed - // the `scope` provided here. - // - // This invokes the imported `init` function with the `callee` scope. - init: async func(scope: scope); - - sync-ready: func(); - - // Must return immediately. - async-ready: async func(); - - // Must not be ready when called. Status will be resolved through a later - // command using the `id` provided. - async-pending: async func(id: u32); - - // Remove the future reader `id` from this component's state and return it. - future-take: func(id: u32) -> future; - // Receive a future from another component. - future-receive: func(id: u32, f: future); - - // Remove the stream reader `id` from this component's state and return it. - stream-take: func(id: u32) -> stream; - // Receive a stream from another component. - stream-receive: func(id: u32, f: stream); - -} - -world fuzz-async { - import async-test; - export async-test; -} diff --git a/crates/test-programs/Cargo.toml b/crates/test-programs/Cargo.toml index 5201041e08d6..884df0846438 100644 --- a/crates/test-programs/Cargo.toml +++ b/crates/test-programs/Cargo.toml @@ -15,7 +15,7 @@ anyhow = { workspace = true, features = ['std'] } wasi-nn = "0.6.0" wit-bindgen = { workspace = true, features = ['default', 'async-spawn', 'inter-task-wakeup'] } libc = { workspace = true } -futures = { workspace = true, default-features = false, features = ['alloc', 'async-await'] } +futures = { workspace = true, default-features = false, features = ['std', 'async-await'] } url = { workspace = true } sha2 = { workspace = true } base64 = { workspace = true } @@ -23,8 +23,4 @@ wasip1 = { version = "1.0.0", default-features = true } wasip2 = "1.0.0" once_cell = "1.19.0" flate2 = "1.0.28" -log = { workspace = true } -env_logger = { workspace = true } -pin-project-lite = { workspace = true } regex = { workspace = true } - diff --git a/crates/test-programs/src/bin/fuzz_async.rs b/crates/test-programs/src/bin/fuzz_async.rs deleted file mode 100644 index 6f6ac5a1feb0..000000000000 --- a/crates/test-programs/src/bin/fuzz_async.rs +++ /dev/null @@ -1,853 +0,0 @@ -//! Test case used with the `component_async` fuzzer which is part of the `misc` -//! fuzz target of Wasmtime. -//! -//! This test case is a binary that's suited for just that one fuzzer and has an -//! associated WIT world that it works with. This test case is composed with -//! itself and then run within the host as well. The exact semantics of this -//! program and all the exports/imports are defined within the context of the -//! fuzzer. -//! -//! The general idea is that this program creates an "async soup" and make sure -//! that everything works as expected, notably also not leading to any panics -//! anywhere within the runtime. An example of what this fuzzer intermingles -//! are: -//! -//! * Synchronous calls -//! * Async calls that are immediately ready -//! * Async calls that are not immediately ready and left pending -//! * Creation of futures/streams -//! * Moving futures/streams between components -//! * Reading/writing futures/streams -//! * Cancelling reads/writes of futures/streams -//! * Seeing futures/streams get dropped and the effect on active reads/writes -//! * Mixing host<->guest, guest<->guest, guest<->host, and host<->host -//! calls/primitives -//! -//! The purpose of this fuzzer is to stress the management of async stacks, the -//! async runtime, and in theory suss out various edge cases in the handling of -//! async events. This fuzzer does NOT stress lifting/lowering at all because -//! there is a static WIT signature that this fuzzer works with. -//! -//! Much of the code in this file is semi-duplicated in the host except written -//! with host `wasmtime` APIs instead of `wit-bindgen` APIs. The overall -//! structure is roughly the same. -//! -//! # Overall architecture -//! -//! The general structure of this fuzzer is that there's a "host sandwich" which -//! looks like: -//! -//! ```text -//! ╔══════╦══════════════════════════════════════════════════════════╗ -//! ║ Host ║ ║ -//! ╠══════╝ ║ -//! ║ ║ -//! ║ ┍┯┯┯━━━ wasmtime:fuzz/types ║ -//! ║ ││││ ║ -//! ║ ││││ ║ -//! ║ ││││ ╔════════════════════╦════════════════════╗ ║ -//! ║ ││││ ║ component_async.rs ║ ║ ║ -//! ║ ││││ ╠════════════════════╝ ║ ║ -//! ║ ││││ ║ ║ ║ -//! ║ ││││ ║ HostCaller ║ ║ -//! ║ ││││ ║ ║ ║ -//! ║ │││└────────╫─→ stream ║ ║ -//! ║ │││ ╚═══════════════════╤═════════════════════╝ ║ -//! ║ │││ │ ║ -//! ║ │││ ┝ wasmtime-fuzz:fuzz/async-test ║ -//! ║ │││ │ ║ -//! ║ │││ ╔═══════════╦════════════╪═════════════════════════════╗ ║ -//! ║ │││ ║ Component ║ │ ║ ║ -//! ║ │││ ╠═══════════╝ │ ║ ║ -//! ║ │││ ║ ↓ ║ ║ -//! ║ │││ ║ ╔═════════════════╦═══════════════════════╗ ║ ║ -//! ║ │││ ║ ║ fuzz-async.wasm ║ ║ ║ ║ -//! ║ │││ ║ ╠═════════════════╝ ║ ║ ║ -//! ║ │││ ║ ║ ║ ║ ║ -//! ║ │││ ║ ║ GuestCaller ║ ║ ║ -//! ║ │││ ║ ║ ║ ║ ║ -//! ║ ││└────╫────╫─→ stream ║ ║ ║ -//! ║ ││ ║ ╚═══════╤═════════════════════════════════╝ ║ ║ -//! ║ ││ ║ │ ║ ║ -//! ║ ││ ║ ┝ wasmtime-fuzz:fuzz/async-test ║ ║ -//! ║ ││ ║ │ ║ ║ -//! ║ ││ ║ ↓ ║ ║ -//! ║ ││ ║ ╔═════════════════╦═══════════════════════╗ ║ ║ -//! ║ ││ ║ ║ fuzz-async.wasm ║ ║ ║ ║ -//! ║ ││ ║ ╠═════════════════╝ ║ ║ ║ -//! ║ ││ ║ ║ ║ ║ ║ -//! ║ ││ ║ ║ GuestCallee ║ ║ ║ -//! ║ ││ ║ ║ ║ ║ ║ -//! ║ │└─────╫────╫─→ stream ║ ║ ║ -//! ║ │ ║ ╚═══════════════════╤═════════════════════╝ ║ ║ -//! ║ │ ║ │ ║ ║ -//! ║ │ ║ │ ║ ║ -//! ║ │ ╚════════════════════════╪═════════════════════════════╝ ║ -//! ║ │ │ ║ -//! ║ │ ┝ wasmtime-fuzz:fuzz/async-test ║ -//! ║ │ │ ║ -//! ║ │ ↓ ║ -//! ║ │ ╔════════════════════╦════════════════════╗ ║ -//! ║ │ ║ component_async.rs ║ ║ ║ -//! ║ │ ╠════════════════════╝ ║ ║ -//! ║ │ ║ ║ ║ -//! ║ │ ║ HostCallee ║ ║ -//! ║ │ ║ ║ ║ -//! ║ └───────────╫─→ stream ║ ║ -//! ║ ╚═════════════════════════════════════════╝ ║ -//! ║ ║ -//! ╚═════════════════════════════════════════════════════════════════╝ -//! ``` -//! -//! Here `fuzz-async.wasm` appears twice to model all the various types of -//! the host/guest interaction matrix. Everything is driven by a -//! `stream` provided to each component part of the system which -//! serves as a means of forcing one particular component to take action. -//! Commands are then the test case itself where a series of commands are -//! executed for each fuzz iteration. -//! -//! # Yield-loops -//! -//! This program has a function `test_property` which is a similar analog to the -//! one in the host-side as well. The general idea is that while component model -//! async is generally deterministic it does not specify what should happen when -//! multiple events are ready at the same time. This can pretty easily happen in -//! this fuzzer meaning that it's not precise which event happens first. To -//! assist in managing this there are two primary mitigations: -//! -//! * The first is that whenever a command is dispatched to a component it's -//! followed up with an "ack" which is a noop. Delivery of the "ack" can't -//! happen until the previous command is completely finished being processed -//! meaning it's a kludge way of synchronizing the receipt of a message. -//! -//! * The second is that there can still be small races where an async event -//! hasn't quite happened yet but it's queued up to happen. To handle these -//! events calls to `test_property` are sprinkled around which has an -//! internally-bounded yield-loop. It's expected that while yielding other -//! code can run which resolves the property being tested at-hand, and then -//! this yield loop will panic if it turns too many times as it's probably a -//! bug. -//! -//! It's a bit of a hack but it's so far the most effective way of handling this -//! that's (a) not timing-dependent e.g. adding sleeps, (b) is -//! reliable/deterministic, and (c) is flexible where the constant number of -//! yields can be bumped without much concern. The number of yields specifically -//! is arbitrarily chosen and while it can't be said exactly how many yields -//! should be necessary it should be able to say "less than N should always -//! work". - -wit_bindgen::generate!("fuzz-async" in "../fuzzing/wit"); - -use crate::exports::wasmtime_fuzz::fuzz::async_test as e; -use crate::wasmtime_fuzz::fuzz::async_test as i; -use crate::wasmtime_fuzz::fuzz::types::{self, Command, Scope}; -use futures::FutureExt; -use futures::channel::oneshot; -use pin_project_lite::pin_project; -use std::collections::{HashMap, HashSet}; -use std::mem; -use std::pin::{Pin, pin}; -use std::sync::Mutex; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::task::{Context, Poll, Waker}; -use wit_bindgen::{FutureReader, FutureWriter, StreamReader, StreamResult, StreamWriter}; - -struct Component; - -export!(Component); - -// Convenience macro to change the "target" of `log::debug!` based on whether -// this component is a `caller` or `callee` scope to distinguish logs in the -// output. -macro_rules! debug { - ($($arg:tt)*) => { - log::debug!(target: log_target(), $($arg)*); - } -} - -static IS_CALLER: AtomicBool = AtomicBool::new(false); - -fn log_target() -> &'static str { - if IS_CALLER.load(Ordering::Relaxed) { - "wasmtime_fuzzing::fuzz_async::caller" - } else { - "wasmtime_fuzzing::fuzz_async::callee" - } -} - -impl e::Guest for Component { - fn sync_ready() {} - - async fn async_ready() {} - - async fn async_pending(id: u32) { - let (tx, rx) = oneshot::channel(); - State::with(|s| s.async_pending_exports_ready.insert(id, tx)); - let record = RecordCancelOnDrop(id); - rx.await.unwrap(); - mem::forget(record); - debug!("export {id} is complete"); - - struct RecordCancelOnDrop(u32); - - impl Drop for RecordCancelOnDrop { - fn drop(&mut self) { - debug!("export {} was cancelled", self.0); - State::with(|s| { - s.async_pending_exports_cancelled.insert(self.0); - }); - } - } - } - - async fn init(scope: Scope) { - IS_CALLER.store(scope == Scope::Caller, Ordering::Relaxed); - env_logger::init(); - i::init(Scope::Callee).await; - let commands = types::get_commands(scope); - wit_bindgen::spawn(run(commands)); - } - - fn future_take(id: u32) -> FutureReader { - State::with(|s| s.future_readers.remove(&id).unwrap()) - } - - fn future_receive(id: u32, f: FutureReader) { - let prev = State::with(|s| s.future_readers.insert(id, f)); - assert!(prev.is_none()); - } - - fn stream_take(id: u32) -> StreamReader { - State::with(|s| s.stream_readers.remove(&id).unwrap()) - } - - fn stream_receive(id: u32, f: StreamReader) { - let prev = State::with(|s| s.stream_readers.insert(id, f)); - assert!(prev.is_none()); - } -} - -#[derive(Default)] -struct State { - async_pending_imports_ready: HashSet, - async_pending_imports_in_progress: HashMap>, - async_pending_exports_ready: HashMap>, - async_pending_exports_cancelled: HashSet, - - future_readers: HashMap>, - future_writers: HashMap>, - future_write_cancel_signals: HashMap>, - future_read_cancel_signals: HashMap>, - future_writes_completed: HashMap, - future_reads_completed: HashMap, - - stream_readers: HashMap>, - stream_writers: HashMap>, - stream_write_cancel_signals: HashMap>, - stream_read_cancel_signals: HashMap>, - stream_writes_completed: HashMap), (usize, Vec)>>, - stream_reads_completed: HashMap>>, -} - -impl State { - pub fn with(f: impl FnOnce(&mut State) -> R) -> R { - static STATE: Mutex> = Mutex::new(None); - let mut state = STATE.lock().unwrap(); - let state = state.get_or_insert_with(|| State::default()); - f(state) - } - - pub async fn test_property(mut f: impl FnMut(&mut State) -> bool) -> bool { - // Test if the property is ready, but it might require a sibling future - // task to run, so if it's not true yet then pump the executor a - // few times to let it finish. - for _ in 0..1000 { - if State::with(&mut f) { - return true; - } - wit_bindgen::yield_async().await; - } - return false; - } -} - -async fn run(mut commands: StreamReader) { - while let Some(command) = commands.next().await { - match command { - Command::SyncReadyCall => i::sync_ready(), - - Command::AsyncReadyCall => assert_ready(pin!(i::async_ready())), - - Command::AsyncPendingExportComplete(i) => { - assert!( - State::test_property(|s| s.async_pending_exports_ready.contains_key(&i)).await, - "expected async_pending export {i} should be pending", - ); - debug!("finishing export {i}"); - State::with(|s| { - s.async_pending_exports_ready - .remove(&i) - .unwrap() - .send(()) - .unwrap(); - }); - } - Command::AsyncPendingExportAssertCancelled(i) => { - assert!( - State::test_property(|s| s.async_pending_exports_cancelled.remove(&i)).await, - "expected async_pending export {i} to be cancelled", - ); - } - Command::AsyncPendingImportCall(i) => { - let mut future = Box::pin(i::async_pending(i)); - debug!("starting export {i}"); - assert_not_ready(future.as_mut()); - let (cancel_tx, mut cancel_rx) = oneshot::channel(); - State::with(|s| { - s.async_pending_imports_in_progress.insert(i, cancel_tx); - }); - wit_bindgen::spawn(async move { - futures::select! { - _ = cancel_rx => {} - _ = future.fuse() => { - State::with(|s| s.async_pending_imports_ready.insert(i)); - } - } - }); - } - Command::AsyncPendingImportCancel(i) => { - debug!("cancelling import {i}"); - State::with(|s| { - s.async_pending_imports_in_progress - .remove(&i) - .unwrap() - .send(()) - .unwrap(); - }); - } - Command::AsyncPendingImportAssertReady(i) => { - assert!( - State::test_property(|s| s.async_pending_imports_ready.remove(&i)).await, - "expected async_pending import {i} to be ready", - ); - } - - Command::FutureNew(id) => { - let (writer, reader) = wit_future::new(|| unreachable!()); - State::with(|s| { - let prev = s.future_writers.insert(id, writer); - assert!(prev.is_none()); - let prev = s.future_readers.insert(id, reader); - assert!(prev.is_none()); - }); - } - Command::FutureTake(id) => { - let reader = i::future_take(id); - State::with(|s| { - let prev = s.future_readers.insert(id, reader); - assert!(prev.is_none()); - }); - } - Command::FutureGive(id) => { - let reader = State::with(|s| s.future_readers.remove(&id).unwrap()); - i::future_receive(id, reader); - } - Command::FutureDropReadable(id) => { - let _ = State::with(|s| s.future_readers.remove(&id).unwrap()); - } - Command::FutureWriteReady(payload) => { - let writer = State::with(|s| s.future_writers.remove(&payload.future).unwrap()); - assert_ready(pin!(writer.write(payload.item))).unwrap(); - } - Command::FutureReadReady(payload) => { - let reader = State::with(|s| s.future_readers.remove(&payload.future).unwrap()); - assert_eq!(assert_ready(pin!(reader.into_future())), payload.item); - } - Command::FutureWriteDropped(id) => { - let writer = State::with(|s| s.future_writers.remove(&id).unwrap()); - match assert_ready(pin!(writer.write(0))) { - Ok(_) => panic!("should be dropped"), - Err(_) => {} - } - } - Command::FutureWritePending(payload) => { - use wit_bindgen::FutureWriteCancel; - - let writer = State::with(|s| s.future_writers.remove(&payload.future).unwrap()); - let (tx, rx) = oneshot::channel(); - let mut future = Box::pin(CancellableFutureWrite { - cancel: rx, - write: writer.write(payload.item), - }); - assert_not_ready(future.as_mut()); - wit_bindgen::spawn(async move { - let result = future.await; - debug!("future write {} completed: {result:?}", payload.future); - State::with(|s| match result { - FutureWriteCancel::AlreadySent => { - s.future_writes_completed.insert(payload.future, true); - } - FutureWriteCancel::Dropped(_) => { - s.future_writes_completed.insert(payload.future, false); - } - FutureWriteCancel::Cancelled(_, writer) => { - let prev = s.future_writers.insert(payload.future, writer); - assert!(prev.is_none()); - } - }); - }); - State::with(|s| { - let prev = s.future_write_cancel_signals.insert(payload.future, tx); - assert!(prev.is_none()); - }); - - pin_project! { - struct CancellableFutureWrite { - #[pin] - cancel: oneshot::Receiver<()>, - #[pin] - write: wit_bindgen::FutureWrite, - } - } - - impl Future for CancellableFutureWrite { - type Output = FutureWriteCancel; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.cancel.poll(cx) { - Poll::Ready(_) => return Poll::Ready(this.write.cancel()), - Poll::Pending => {} - } - match this.write.poll(cx) { - Poll::Ready(Ok(())) => Poll::Ready(FutureWriteCancel::AlreadySent), - Poll::Ready(Err(val)) => { - Poll::Ready(FutureWriteCancel::Dropped(val.value)) - } - Poll::Pending => Poll::Pending, - } - } - } - } - Command::FutureReadPending(id) => { - let reader = State::with(|s| s.future_readers.remove(&id).unwrap()); - let (tx, rx) = oneshot::channel(); - let mut future = Box::pin(CancellableFutureRead { - cancel: rx, - read: reader.into_future(), - }); - assert_not_ready(future.as_mut()); - wit_bindgen::spawn(async move { - let result = future.await; - State::with(|s| match result { - Ok(result) => { - let prev = s.future_reads_completed.insert(id, result); - assert!(prev.is_none()); - } - Err(reader) => { - let prev = s.future_readers.insert(id, reader); - assert!(prev.is_none()); - } - }); - }); - State::with(|s| { - let prev = s.future_read_cancel_signals.insert(id, tx); - assert!(prev.is_none()); - }); - - pin_project! { - struct CancellableFutureRead { - #[pin] - cancel: oneshot::Receiver<()>, - #[pin] - read: wit_bindgen::FutureRead, - } - } - - impl Future for CancellableFutureRead { - type Output = Result>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.cancel.poll(cx) { - Poll::Ready(_) => return Poll::Ready(this.read.cancel()), - Poll::Pending => {} - } - match this.read.poll(cx) { - Poll::Ready(i) => Poll::Ready(Ok(i)), - Poll::Pending => Poll::Pending, - } - } - } - } - Command::FutureCancelWrite(id) => { - State::with(|s| { - s.future_write_cancel_signals - .remove(&id) - .unwrap() - .send(()) - .unwrap(); - }); - assert!( - State::test_property(|s| s.future_writers.contains_key(&id)).await, - "expected future write {id} to be cancelled", - ); - } - Command::FutureCancelRead(id) => { - State::with(|s| { - s.future_read_cancel_signals - .remove(&id) - .unwrap() - .send(()) - .unwrap(); - }); - assert!( - State::test_property(|s| s.future_readers.contains_key(&id)).await, - "expected future read {id} to be cancelled", - ); - } - Command::FutureWriteAssertComplete(id) => { - assert!( - State::test_property(|s| match s.future_writes_completed.remove(&id) { - Some(true) => true, - Some(false) => panic!("future was dropped"), - None => false, - }) - .await, - "expected future write {id} to be complete", - ); - } - Command::FutureWriteAssertDropped(id) => { - assert!( - State::test_property(|s| match s.future_writes_completed.remove(&id) { - Some(true) => panic!("future write completed"), - Some(false) => true, - None => false, - }) - .await, - "expected future write {id} to be complete", - ); - } - Command::FutureReadAssertComplete(payload) => { - assert!( - State::test_property(|s| { - match s.future_reads_completed.remove(&payload.future) { - Some(i) => { - assert_eq!(i, payload.item); - true - } - None => false, - } - }) - .await, - "expected future read {} to be complete", - payload.future, - ); - } - - Command::StreamNew(id) => { - let (writer, reader) = wit_stream::new(); - State::with(|s| { - let prev = s.stream_writers.insert(id, writer); - assert!(prev.is_none()); - let prev = s.stream_readers.insert(id, reader); - assert!(prev.is_none()); - }); - } - Command::StreamTake(id) => { - let reader = i::stream_take(id); - State::with(|s| { - let prev = s.stream_readers.insert(id, reader); - assert!(prev.is_none()); - }); - } - Command::StreamGive(id) => { - let reader = State::with(|s| s.stream_readers.remove(&id).unwrap()); - i::stream_receive(id, reader); - } - Command::StreamDropReadable(id) => { - let _ = State::with(|s| s.stream_readers.remove(&id).unwrap()); - } - Command::StreamDropWritable(id) => { - let _ = State::with(|s| s.stream_writers.remove(&id).unwrap()); - } - Command::StreamWriteReady(payload) => { - State::with(|s| { - let writer = s.stream_writers.get_mut(&payload.stream).unwrap(); - let (status, buffer) = assert_ready(pin!( - writer.write(stream_payload(payload.item, payload.op_count)) - )); - assert_eq!(status, StreamResult::Complete(payload.ready_count as usize)); - assert_eq!( - buffer.remaining() as u32, - payload.op_count - payload.ready_count - ); - }); - } - Command::StreamWriteDropped(payload) => { - State::with(|s| { - let writer = s.stream_writers.get_mut(&payload.stream).unwrap(); - let (status, buffer) = assert_ready(pin!( - writer.write(stream_payload(payload.item, payload.count)) - )); - assert_eq!(status, StreamResult::Dropped); - assert_eq!(buffer.remaining() as u32, payload.count); - }); - } - Command::StreamReadReady(payload) => { - State::with(|s| { - let reader = s.stream_readers.get_mut(&payload.stream).unwrap(); - let (status, buffer) = assert_ready(pin!( - reader.read(Vec::with_capacity(payload.op_count as usize)) - )); - assert_eq!(status, StreamResult::Complete(payload.ready_count as usize)); - assert_eq!(buffer, stream_payload(payload.item, payload.ready_count)); - }); - } - Command::StreamReadDropped(payload) => { - State::with(|s| { - let reader = s.stream_readers.get_mut(&payload.stream).unwrap(); - let (status, buffer) = assert_ready(pin!( - reader.read(Vec::with_capacity(payload.count as usize)) - )); - assert_eq!(status, StreamResult::Dropped); - assert!(buffer.is_empty()); - }); - } - Command::StreamWritePending(payload) => { - debug!("write pending: {}", payload.stream); - let mut writer = State::with(|s| s.stream_writers.remove(&payload.stream).unwrap()); - let (tx, rx) = oneshot::channel(); - State::with(|s| { - let prev = s.stream_write_cancel_signals.insert(payload.stream, tx); - assert!(prev.is_none()); - }); - let mut future = Box::pin(async move { - debug!("write pending start: {}", payload.stream); - let (result, remaining) = CancellableStreamWrite { - cancel: rx, - write: writer.write(stream_payload(payload.item, payload.count)), - } - .await; - debug!("write pending done: {} {result:?}", payload.stream); - State::with(|s| { - let _ = s.stream_write_cancel_signals.remove(&payload.stream); - match result { - StreamResult::Complete(n) => { - s.stream_writes_completed - .insert(payload.stream, Ok((n, remaining))); - } - StreamResult::Dropped => { - s.stream_writes_completed - .insert(payload.stream, Err((0, remaining))); - } - StreamResult::Cancelled => {} - } - let prev = s.stream_writers.insert(payload.stream, writer); - assert!(prev.is_none()); - }); - }); - assert_not_ready(future.as_mut()); - wit_bindgen::spawn(future); - - pin_project! { - struct CancellableStreamWrite<'a> { - #[pin] - cancel: oneshot::Receiver<()>, - #[pin] - write: wit_bindgen::StreamWrite<'a, u32>, - } - } - - impl Future for CancellableStreamWrite<'_> { - type Output = (StreamResult, Vec); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let (result, buffer) = match this.cancel.poll(cx) { - Poll::Ready(_) => this.write.cancel(), - Poll::Pending => match this.write.poll(cx) { - Poll::Ready(result) => result, - Poll::Pending => return Poll::Pending, - }, - }; - Poll::Ready((result, buffer.into_vec())) - } - } - } - Command::StreamReadPending(payload) => { - let mut reader = State::with(|s| s.stream_readers.remove(&payload.stream).unwrap()); - let (tx, rx) = oneshot::channel(); - State::with(|s| { - let prev = s.stream_read_cancel_signals.insert(payload.stream, tx); - assert!(prev.is_none()); - }); - let mut future = Box::pin(async move { - let (result, buf) = CancellableStreamRead { - cancel: rx, - read: reader.read(Vec::with_capacity(payload.count as usize)), - } - .await; - State::with(|s| { - let _ = s.stream_read_cancel_signals.remove(&payload.stream); - match result { - StreamResult::Complete(_) => { - s.stream_reads_completed.insert(payload.stream, Some(buf)); - } - StreamResult::Dropped => { - assert!(buf.is_empty(), "dropped but got {}", buf.len()); - s.stream_reads_completed.insert(payload.stream, None); - } - StreamResult::Cancelled => {} - } - let prev = s.stream_readers.insert(payload.stream, reader); - assert!(prev.is_none()); - }); - }); - assert_not_ready(future.as_mut()); - wit_bindgen::spawn(future); - - pin_project! { - struct CancellableStreamRead<'a> { - #[pin] - cancel: oneshot::Receiver<()>, - #[pin] - read: wit_bindgen::StreamRead<'a, u32>, - } - } - - impl Future for CancellableStreamRead<'_> { - type Output = (StreamResult, Vec); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let (result, buffer) = match this.cancel.poll(cx) { - Poll::Ready(_) => this.read.cancel(), - Poll::Pending => match this.read.poll(cx) { - Poll::Ready(result) => result, - Poll::Pending => return Poll::Pending, - }, - }; - Poll::Ready((result, buffer)) - } - } - } - Command::StreamCancelWrite(id) => { - State::with(|s| { - s.stream_write_cancel_signals - .remove(&id) - .unwrap() - .send(()) - .unwrap(); - }); - assert!( - State::test_property(|s| s.stream_writers.contains_key(&id)).await, - "expected cancel write {id} to be cancelled", - ); - } - Command::StreamCancelRead(id) => { - State::with(|s| { - s.stream_read_cancel_signals - .remove(&id) - .unwrap() - .send(()) - .unwrap(); - }); - assert!( - State::test_property(|s| s.stream_readers.contains_key(&id)).await, - "expected future read {id} to be cancelled", - ); - } - Command::StreamWriteAssertComplete(payload) => { - assert!( - State::test_property(|s| { - match s.stream_writes_completed.remove(&payload.stream) { - Some(Ok((size, _buf))) => { - assert_eq!(size, payload.count as usize); - true - } - Some(Err(_)) => panic!("stream was dropped"), - None => false, - } - }) - .await, - "expected stream write {} to be complete", - payload.stream, - ); - } - Command::StreamWriteAssertDropped(payload) => { - assert!( - State::test_property(|s| { - match s.stream_writes_completed.remove(&payload.stream) { - Some(Err((size, _buf))) => { - assert_eq!(size, payload.count as usize); - true - } - Some(Ok(_)) => panic!("stream was not dropped"), - None => false, - } - }) - .await, - "expected stream write {} to be complete", - payload.stream, - ); - } - Command::StreamReadAssertComplete(payload) => { - assert!( - State::test_property(|s| { - match s.stream_reads_completed.remove(&payload.stream) { - Some(Some(i)) => { - assert_eq!(i, stream_payload(payload.item, payload.count)); - true - } - Some(None) => panic!("stream was dropped"), - None => false, - } - }) - .await, - "expected stream read {} to be complete", - payload.stream, - ); - } - Command::StreamReadAssertDropped(id) => { - assert!( - State::test_property(|s| { - match s.stream_reads_completed.remove(&id) { - Some(None) => true, - Some(Some(_)) => panic!("stream was not dropped"), - None => false, - } - }) - .await, - "expected stream read {id} to be complete", - ); - } - - Command::Ack => {} - } - } -} - -fn stream_payload(init: u32, count: u32) -> Vec { - (init..init + count).collect() -} - -fn assert_ready(f: Pin<&mut F>) -> F::Output { - let mut cx = Context::from_waker(Waker::noop()); - match f.poll(&mut cx) { - Poll::Ready(i) => i, - Poll::Pending => panic!("future was pending"), - } -} - -fn assert_not_ready(f: Pin<&mut F>) { - let mut cx = Context::from_waker(Waker::noop()); - match f.poll(&mut cx) { - Poll::Ready(_) => panic!("future is ready"), - Poll::Pending => {} - } -} - -fn main() { - unreachable!(); -} diff --git a/fuzz/fuzz_targets/misc.rs b/fuzz/fuzz_targets/misc.rs index 5d2b29b64bb1..c2506075dfb1 100644 --- a/fuzz/fuzz_targets/misc.rs +++ b/fuzz/fuzz_targets/misc.rs @@ -68,7 +68,6 @@ run_fuzzers! { stacks api_calls dominator_tree - component_async } fn pulley_roundtrip(u: Unstructured<'_>) -> Result<()> { @@ -181,8 +180,3 @@ fn dominator_tree(mut data: Unstructured<'_>) -> Result<()> { Ok(()) } - -fn component_async(u: Unstructured<'_>) -> Result<()> { - wasmtime_fuzzing::oracles::component_async::run(Arbitrary::arbitrary_take_rest(u)?); - Ok(()) -}