From 38254a65eb674ee29aaa061c68d9e00d2b20c0b5 Mon Sep 17 00:00:00 2001 From: Patrick Mooney Date: Thu, 30 Sep 2021 19:26:29 -0500 Subject: [PATCH 1/3] Reshape block interfaces for a "pull" model This temporarily excludes the NVMe emulation from the build as parallel efforts to update its internals are underway. --- propolis/src/block.rs | 450 ---------------------------- propolis/src/block/file.rs | 228 ++++++++++++++ propolis/src/block/mod.rs | 210 +++++++++++++ propolis/src/dispatch/sync_tasks.rs | 23 +- propolis/src/hw/chipset/i440fx.rs | 8 +- propolis/src/hw/mod.rs | 3 +- propolis/src/hw/pci/device.rs | 29 +- propolis/src/hw/virtio/block.rs | 239 +++++++-------- propolis/src/hw/virtio/mod.rs | 9 +- propolis/src/hw/virtio/pci.rs | 62 ++-- propolis/src/hw/virtio/queue.rs | 96 ++++-- propolis/src/hw/virtio/viona.rs | 49 ++- propolis/src/inventory.rs | 46 ++- propolis/src/vmm/mapping.rs | 24 +- server/src/lib/config.rs | 26 +- server/src/lib/initializer.rs | 24 +- server/src/lib/server.rs | 37 ++- standalone/src/config.rs | 24 +- standalone/src/main.rs | 101 ++++--- 19 files changed, 881 insertions(+), 807 deletions(-) delete mode 100644 propolis/src/block.rs create mode 100644 propolis/src/block/file.rs create mode 100644 propolis/src/block/mod.rs diff --git a/propolis/src/block.rs b/propolis/src/block.rs deleted file mode 100644 index d5d8d74b3..000000000 --- a/propolis/src/block.rs +++ /dev/null @@ -1,450 +0,0 @@ -//! Implements an interface to virtualized block devices. - -use std::collections::VecDeque; -use std::fs::{metadata, File, OpenOptions}; -use std::io::Result; -use std::io::{Error, ErrorKind}; -use std::path::Path; -use std::sync::Condvar; -use std::sync::{Arc, Mutex, Weak}; - -use crate::common::*; -use crate::dispatch::{DispCtx, Dispatcher, SyncCtx}; -use crate::vmm::{MappingExt, MemCtx, SubMapping}; - -/// Type of operations which may be issued to a virtual block device. -#[derive(Copy, Clone, Debug, PartialEq)] -pub enum BlockOp { - Flush, - Read, - Write, -} - -#[derive(Copy, Clone, Debug)] -pub enum BlockResult { - Success, - Failure, - Unsupported, -} - -/// Trait indicating that a type may be used as a request to a block device. -pub trait BlockReq: Send + Sync + 'static { - /// Type of operation being issued. - fn oper(&self) -> BlockOp; - - /// Offset within the block device, in bytes. - fn offset(&self) -> usize; - - /// Returns the next region of memory within a request to a block device. - fn next_buf(&mut self) -> Option; - - /// Signals to the device emulation that a block operation has been completed. - fn complete(self, res: BlockResult, ctx: &DispCtx); -} - -/// Metadata regarding a virtualized block device. -#[derive(Debug)] -pub struct BlockInquiry { - /// Device size in blocks (see below) - pub total_size: u64, - /// Size (in bytes) per block - pub block_size: u32, - pub writable: bool, -} - -/// API to access a virtualized block device. -pub trait BlockDev: Send + Sync + 'static { - /// Enqueues a [`BlockReq`] to the underlying device. - fn enqueue(&self, req: R); - - /// Requests metadata about the block device. - fn inquire(&self) -> BlockInquiry; - - /// Spawns a new thread named `name` on the dispatcher `disp` which - /// begins processing incoming requests. - fn start_dispatch(self: Arc, name: String, disp: &Dispatcher); -} - -/// Standard [`BlockDev`] implementation. -pub struct FileBdev { - fp: File, - is_ro: bool, - - block_size: usize, - sectors: usize, - reqs: Mutex>, - cond: Condvar, -} - -impl FileBdev { - /// Creates a new block device from a device at `path`. - pub fn create(path: impl AsRef, readonly: bool) -> Result> { - let p: &Path = path.as_ref(); - - let meta = metadata(p)?; - let is_ro = readonly || meta.permissions().readonly(); - - let fp = OpenOptions::new().read(true).write(!is_ro).open(p)?; - let len = fp.metadata().unwrap().len() as usize; - - let this = Self { - fp: fp, - is_ro, - - block_size: 512, - sectors: len / 512, - reqs: Mutex::new(VecDeque::new()), - cond: Condvar::new(), - }; - - Ok(Arc::new(this)) - } - - /// Consume enqueued requests and process them. Signal completion when done. - fn process_loop(&self, sctx: &mut SyncCtx) { - loop { - // Check for the yield state prior to acquiring the `reqs` guard. - // This prevents other threads from becoming stuck while attempting - // to enqueue a request while this thread has yielded. - if sctx.check_yield() { - break; - } - - let mut reqs = self.reqs.lock().unwrap(); - if let Some(mut req) = reqs.pop_front() { - let ctx = sctx.dispctx(); - let result = self.process_request(&mut req, &ctx); - req.complete(result, &ctx); - } else { - let _reqs = self.cond.wait(reqs).unwrap(); - } - } - } - - /// Gather all buffers from the request and pass as a group to the appropriate processing function. - fn process_request(&self, req: &mut R, ctx: &DispCtx) -> BlockResult { - let mem = ctx.mctx.memctx(); - - let offset = req.offset(); - - let mut bufs = vec![]; - - while let Some(buf) = req.next_buf() { - bufs.push(buf); - } - - let result = match req.oper() { - BlockOp::Read => self.process_rw_request(true, offset, &mem, bufs), - BlockOp::Write if self.is_ro => Ok(BlockResult::Failure), - BlockOp::Write => { - self.process_rw_request(false, offset, &mem, bufs) - } - BlockOp::Flush => self.process_flush(), - }; - - match result { - Ok(status) => status, - Err(_) => BlockResult::Failure, - } - } - - /// Delegate a block device read or write to the file. - fn process_rw_request( - &self, - is_read: bool, - offset: usize, - mem: &MemCtx, - bufs: Vec, - ) -> Result { - let mappings: Option> = bufs - .iter() - .map(|buf| { - if is_read { - mem.writable_region(buf) - } else { - mem.readable_region(buf) - } - }) - .collect(); - - let mappings = mappings.ok_or_else(|| { - Error::new(ErrorKind::Other, "getting a region failed!") - })?; - - let total_size: usize = mappings.iter().map(|x| x.len()).sum(); - - let nbytes = if is_read { - mappings.preadv(&self.fp, offset as i64) - } else { - mappings.pwritev(&self.fp, offset as i64) - }?; - - assert_eq!(nbytes as usize, total_size); - Ok(BlockResult::Success) - } - - /// Send flush to the file - fn process_flush(&self) -> Result { - self.fp.sync_data()?; - Ok(BlockResult::Success) - } -} - -impl BlockDev for FileBdev { - fn enqueue(&self, req: R) { - self.reqs.lock().unwrap().push_back(req); - self.cond.notify_all(); - } - - fn inquire(&self) -> BlockInquiry { - BlockInquiry { - total_size: self.sectors as u64, - block_size: self.block_size as u32, - writable: !self.is_ro, - } - } - - /// Spawns a new thread named `name` on the dispatcher `disp` which - /// begins processing incoming requests. - fn start_dispatch(self: Arc, name: String, disp: &Dispatcher) { - let ww = Arc::downgrade(&self); - - let bdev = Arc::clone(&self); - disp.spawn_sync( - name, - Box::new(move |sctx| { - bdev.process_loop(sctx); - }), - Some(Box::new(move |_ctx| { - if let Some(this) = Weak::upgrade(&ww) { - this.cond.notify_all() - } - })), - ) - .unwrap(); - } -} - -/* -#[cfg(test)] -mod test { - use std::collections::VecDeque; - use std::fs::File; - use std::io::{Read, Seek, SeekFrom, Write}; - use std::sync::mpsc; - use std::thread; - - use tempfile::tempdir; - - use crate::block::{BlockOp, BlockDev, BlockReq, FileBdev, BlockResult}; - use crate::vmm::mapping::{GuardSpace, Prot}; - use crate::common::{GuestAddr, GuestRegion}; - use crate::{DispCtx, Dispatcher, VcpuHdl}; - - pub struct TestBlockReq { - op: BlockOp, - offset: usize, - len: usize, - bufs: VecDeque, - send: mpsc::SyncSender, - } - - impl BlockReq for TestBlockReq { - fn oper(&self) -> BlockOp { - self.op - } - - fn offset(&self) -> usize { - self.offset - } - - fn next_buf(&mut self) -> Option { - self.bufs.pop_front() - } - - fn complete(self, _res: BlockResult, _ctx: &DispCtx) { - self.send.send(0).expect("send failed!"); - } - } - - impl TestBlockReq { - fn read(offset: usize, bufs: VecDeque) -> (mpsc::Receiver, TestBlockReq) { - let (send, recv) = mpsc::sync_channel(1); - (recv, TestBlockReq { - op: BlockOp::Read, - offset, - len: 0, - bufs, - send, - }) - } - - fn write(offset: usize, bufs: VecDeque) -> (mpsc::Receiver, TestBlockReq) { - let (send, recv) = mpsc::sync_channel(1); - (recv, TestBlockReq { - op: BlockOp::Write, - offset, - len: 0, - bufs, - send, - }) - } - - fn flush() -> (mpsc::Receiver, TestBlockReq) { - let (send, recv) = mpsc::sync_channel(1); - (recv, TestBlockReq { - op: BlockOp::Flush, - offset: 0, - len: 0, - bufs: VecDeque::new(), - send, - }) - } - } - - - #[test] - fn test_plainbdev() { - /* - * Test the following: - * - seed a 512 byte file with a known pattern - * - read that file into four mappings - * - verify that the known pattern was read correctly into those four mappings - * - write another known pattern into another mapping - * - write from that mapping into the file - * - verify that the file received the full write - */ - let dir = tempdir().expect("cannot create tempdir!"); - let file_path = dir.path().join("disk.img"); - - let mut file = - File::create(file_path.clone()).expect("cannot create tempfile!"); - file.set_len(512).unwrap(); - - let bdev = - FileBdev::create(file_path.clone(), false) - .expect("could not create FileBackingStore!"); - - let inquiry = bdev.inquire(); - assert_eq!(512, inquiry.total_size * inquiry.block_size as u64); - - /// XXX: Dispatcher context and `bdev.start_dispatch` required for this to work - - let guard_len = 4096; - let mut guard = GuardSpace::new(guard_len).unwrap(); - let vmm = crate::vmm::mapping::tests::test_vmm(guard_len as u64); - let mapping = guard - .mapping(guard_len, Prot::READ | Prot::WRITE, &vmm, 0) - .unwrap(); - - // write into file - file.seek(SeekFrom::Start(0)).expect("seek failed!"); - file.write(&vec![0; 128][..]).expect("write failed!"); - file.write(&vec![1; 128][..]).expect("write failed!"); - file.write(&vec![2; 128][..]).expect("write failed!"); - file.write(&vec![3; 128][..]).expect("write failed!"); - - // read into mappings - let (read_recv, read_req) = TestBlockReq::read(0, VecDeque::from(vec![ - GuestRegion(GuestAddr(25), 7), - GuestRegion(GuestAddr(75), 256), - GuestRegion(GuestAddr(1350), 128), - GuestRegion(GuestAddr(2048), 121), - ])); - bdev.enqueue(read_req); - let (flush_recv, flush_req) = TestBlockReq::flush(); - bdev.enqueue(flush_req); - - let _ = read_recv.recv(); - let _ = flush_recv.recv(); - - // verify mapping[0] only got zeros - let mut bytes = vec![100; 7]; - mapping - .as_ref() - .subregion(25, 7) - .unwrap() - .read_bytes(&mut bytes[..]) - .unwrap(); - assert_eq!(&bytes, &vec![0u8; 7]); - - // verify mapping[1] got zeros, ones, and twos - // 000000011111111111111111111111122222222 - // ^ ^ ^ ^ - // 7 128 256 263 - // - let mut bytes = vec![100; 256]; - mapping - .as_ref() - .subregion(75, 256) - .unwrap() - .read_bytes(&mut bytes[..]) - .unwrap(); - - let mut expected = vec![0u8; 128 - 7]; - expected.append(&mut vec![1u8; 128]); - expected.append(&mut vec![2u8; 7]); - assert_eq!(bytes, expected); - - // verify mapping[2] got twos and threes - // 222222222222223333333 - // ^ ^ ^ - // 263 384 391 - let mut bytes = vec![100; 128]; - mapping - .as_ref() - .subregion(1350, 128) - .unwrap() - .read_bytes(&mut bytes[..]) - .unwrap(); - - let mut expected = vec![2u8; 384 - 263]; - expected.append(&mut vec![3u8; 391 - 384]); - assert_eq!(bytes, expected); - - // verify mapping[3] got threes - let mut bytes = vec![100; 121]; - mapping - .as_ref() - .subregion(2048, 121) - .unwrap() - .read_bytes(&mut bytes[..]) - .unwrap(); - - let expected = vec![3u8; 121]; - assert_eq!(bytes, expected); - - // write into file - - let mut bytes = vec![100u8; 512]; - mapping - .as_ref() - .subregion(3072, 512) - .unwrap() - .write_bytes(&mut bytes[..]) - .unwrap(); - - let (write_recv, write_req) = TestBlockReq::write(0, VecDeque::from(vec![ - GuestRegion(GuestAddr(3072), 512), - ])); - bdev.enqueue(write_req); - let (flush_recv, flush_req) = TestBlockReq::flush(); - bdev.enqueue(flush_req); - - let _ = write_recv.recv(); - let _ = flush_recv.recv(); - - // verify write to file - - let mut file = File::open(file_path).expect("cannot open tempfile!"); - - let mut buffer = vec![0; 512]; - file.seek(SeekFrom::Start(0)).expect("seek failed!"); - file.read(&mut buffer[..]).expect("buffer read failed!"); - assert_eq!(buffer, vec![100u8; 512]); - - drop(file); - dir.close().expect("could not close dir!"); - } -} -*/ diff --git a/propolis/src/block/file.rs b/propolis/src/block/file.rs new file mode 100644 index 000000000..7104ffe89 --- /dev/null +++ b/propolis/src/block/file.rs @@ -0,0 +1,228 @@ +use std::collections::VecDeque; +use std::fs::{metadata, File, OpenOptions}; +use std::io::{Error, ErrorKind, Result}; +use std::num::NonZeroUsize; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::path::Path; +use std::sync::{Arc, Condvar, Mutex}; + +use super::DeviceInfo; +use crate::block; +use crate::dispatch::{AsyncCtx, DispCtx, Dispatcher, SyncCtx, WakeFn}; +use crate::inventory::Entity; +use crate::vmm::MappingExt; + +use libc::fdatasync; +use tokio::sync::Semaphore; + +// XXX: completely arb for now +const MAX_WORKERS: usize = 32; + +/// Standard [`BlockDev`] implementation. +pub struct FileBackend { + fp: File, + + driver: Mutex>>, + worker_count: NonZeroUsize, + + is_ro: bool, + block_size: usize, + sectors: usize, +} + +impl FileBackend { + /// Creates a new block device from a device at `path`. + pub fn create( + path: impl AsRef, + readonly: bool, + worker_count: NonZeroUsize, + ) -> Result> { + if worker_count.get() > MAX_WORKERS { + return Err(Error::new( + ErrorKind::InvalidInput, + "too many workers", + )); + } + let p: &Path = path.as_ref(); + + let meta = metadata(p)?; + let is_ro = readonly || meta.permissions().readonly(); + + let fp = OpenOptions::new().read(true).write(!is_ro).open(p)?; + let len = fp.metadata().unwrap().len() as usize; + + let this = Self { + fp, + + driver: Mutex::new(None), + worker_count, + + is_ro, + block_size: 512, + sectors: len / 512, + }; + + Ok(Arc::new(this)) + } +} +impl block::Backend for FileBackend { + fn info(&self) -> DeviceInfo { + DeviceInfo { + block_size: self.block_size as u32, + total_size: self.sectors as u64, + writable: !self.is_ro, + } + } + + fn attach(&self, dev: Arc, disp: &Dispatcher) { + let mut driverg = self.driver.lock().unwrap(); + assert!(driverg.is_none()); + + let driver = Driver::new(&self.fp, dev); + driver.spawn(self.worker_count, disp); + *driverg = Some(driver); + } +} +impl Entity for FileBackend {} + +struct Driver { + fd: RawFd, + cv: Condvar, + queue: Mutex>, + idle_threads: Semaphore, + dev: Arc, + waiter: block::AsyncWaiter, +} +impl Driver { + fn new(fp: &File, dev: Arc) -> Arc { + let waiter = block::AsyncWaiter::new(dev.as_ref()); + Arc::new(Self { + fd: fp.as_raw_fd(), + cv: Condvar::new(), + queue: Mutex::new(VecDeque::new()), + idle_threads: Semaphore::new(0), + dev, + waiter, + }) + } + fn blocking_loop(&self, sctx: &mut SyncCtx) { + loop { + if sctx.check_yield() { + break; + } + + let mut guard = self.queue.lock().unwrap(); + if let Some(req) = guard.pop_front() { + drop(guard); + let ctx = sctx.dispctx(); + match process_request(self.fd, &req, &ctx) { + Ok(_) => req.complete(block::Result::Success, &ctx), + Err(_) => req.complete(block::Result::Failure, &ctx), + } + } else { + // wait until more requests are available + self.idle_threads.add_permits(1); + let _guard = self + .cv + .wait_while(guard, |g| { + // While `sctx.check_yield()` is tempting here, it will + // block if this thread goes into a quiesce state, + // excluding all others from the queue lock. + g.is_empty() && !sctx.pending_reqs() + }) + .unwrap(); + } + } + } + + async fn do_scheduling(&self, actx: &mut AsyncCtx) { + loop { + let avail = self.idle_threads.acquire().await.unwrap(); + avail.forget(); + + if let Some(req) = self.waiter.next(self.dev.as_ref(), actx).await { + let mut queue = self.queue.lock().unwrap(); + queue.push_back(req); + drop(queue); + self.cv.notify_one(); + } + } + } + + fn spawn(self: &Arc, worker_count: NonZeroUsize, disp: &Dispatcher) { + for i in 0..worker_count.get() { + let tself = Arc::clone(self); + + // Configure a waker to help threads to reach their yield points + // Doing this once (from thread 0) is adequate to wake them all. + let wake = if i == 0 { + let tnotify = Arc::downgrade(self); + Some(Box::new(move |_ctx: &DispCtx| { + if let Some(this) = tnotify.upgrade() { + let _guard = this.queue.lock().unwrap(); + this.cv.notify_all(); + } + }) as Box) + } else { + None + }; + + let _ = disp + .spawn_sync( + format!("file bdev worker {}", i), + Box::new(move |mut sctx| { + tself.blocking_loop(&mut sctx); + }), + wake, + ) + .unwrap(); + } + + // TODO: do we need the task for later? + let sched_self = Arc::clone(self); + let _sched_task = disp.spawn_async(|mut actx| async move { + let _ = sched_self.do_scheduling(&mut actx).await; + }); + } +} + +fn process_request( + fd: RawFd, + req: &block::Request, + ctx: &DispCtx, +) -> Result<()> { + let mem = ctx.mctx.memctx(); + match req.oper() { + block::Operation::Read(off) => { + let maps = req.mappings(&mem).ok_or_else(|| { + Error::new(ErrorKind::Other, "bad guest region") + })?; + + let nbytes = maps.preadv(fd, off as i64)?; + if nbytes != req.len() { + return Err(Error::new(ErrorKind::Other, "bad read length")); + } + } + block::Operation::Write(off) => { + let maps = req.mappings(&mem).ok_or_else(|| { + Error::new(ErrorKind::Other, "bad guest region") + })?; + + let nbytes = maps.pwritev(fd, off as i64)?; + if nbytes != req.len() { + return Err(Error::new(ErrorKind::Other, "bad write length")); + } + } + block::Operation::Flush(_off, _len) => { + // SAFETY: The backing fd should be valid and fdatasync() is + // doing nothing besides the flush. + unsafe { + let res = fdatasync(fd); + if res != 0 { + return Err(Error::from_raw_os_error(res)); + } + }; + } + } + Ok(()) +} diff --git a/propolis/src/block/mod.rs b/propolis/src/block/mod.rs new file mode 100644 index 000000000..aea02becc --- /dev/null +++ b/propolis/src/block/mod.rs @@ -0,0 +1,210 @@ +//! Implements an interface to virtualized block devices. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; + +use crate::common::*; +use crate::dispatch::{AsyncCtx, DispCtx, Dispatcher}; +use crate::vmm::{MemCtx, SubMapping}; + +use tokio::sync::Notify; + +mod file; +pub use file::FileBackend; + +pub type ByteOffset = usize; +pub type ByteLen = usize; + +/// Type of operations which may be issued to a virtual block device. +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum Operation { + /// Read from offset + Read(ByteOffset), + /// Write to offset + Write(ByteOffset), + /// Flush buffer(s) for [offset, offset + len) + Flush(ByteOffset, ByteLen), +} + +#[derive(Copy, Clone, Debug)] +pub enum Result { + Success, + Failure, + Unsupported, +} + +pub type CompleteFn = dyn FnOnce(Result, &DispCtx) + Send + Sync + 'static; + +/// Trait indicating that a type may be used as a request to a block device. +pub struct Request { + op: Operation, + regions: Vec, + donef: Option>, +} +impl Request { + pub fn new_read( + off: usize, + regions: Vec, + donef: Box, + ) -> Self { + let op = Operation::Read(off); + Self { op, regions, donef: Some(donef) } + } + + pub fn new_write( + off: usize, + regions: Vec, + donef: Box, + ) -> Self { + let op = Operation::Write(off); + Self { op, regions, donef: Some(donef) } + } + + pub fn new_flush(off: usize, len: usize, donef: Box) -> Self { + let op = Operation::Flush(off, len); + Self { op, regions: Vec::new(), donef: Some(donef) } + } + + /// Type of operation being issued. + pub fn oper(&self) -> Operation { + self.op + } + + /// Guest memory regions underlying the request + pub fn regions(&self) -> &[GuestRegion] { + &self.regions[..] + } + + pub fn mappings<'a>(&self, mem: &'a MemCtx) -> Option>> { + match &self.op { + Operation::Read(_) => { + self.regions.iter().map(|r| mem.writable_region(r)).collect() + } + Operation::Write(_) => { + self.regions.iter().map(|r| mem.readable_region(r)).collect() + } + Operation::Flush(_, _) => None, + } + } + + /// Total length of operation + pub fn len(&self) -> usize { + match &self.op { + Operation::Read(_) | Operation::Write(_) => { + self.regions.iter().map(|r| r.1).sum() + } + Operation::Flush(_, len) => *len, + } + } + + /// Indiciate disposition of completed request + pub fn complete(mut self, res: Result, ctx: &DispCtx) { + let func = self.donef.take().unwrap(); + func(res, ctx); + } +} +impl Drop for Request { + fn drop(&mut self) { + if self.donef.is_some() { + panic!("request dropped prior to completion"); + } + } +} + +/// Metadata regarding a virtualized block device. +#[derive(Debug, Copy, Clone)] +pub struct DeviceInfo { + /// Size (in bytes) per block + pub block_size: u32, + /// Device size in blocks (see above) + pub total_size: u64, + /// Is the device writable + pub writable: bool, +} + +/// API to access a virtualized block device. +pub trait Device: Send + Sync + 'static { + /// Retreive the next request (if any) + fn next(&self, ctx: &DispCtx) -> Option; + + fn set_notifier(&self, f: Option>); +} + +pub trait Backend: Send + Sync + 'static { + fn attach(&self, dev: Arc, disp: &Dispatcher); + fn info(&self) -> DeviceInfo; +} + +pub type NotifierFn = dyn Fn(&dyn Device, &DispCtx) + Send + Sync + 'static; + +pub struct Notifier { + armed: AtomicBool, + notifier: Mutex>>, +} +impl Notifier { + pub fn new() -> Self { + Self { armed: AtomicBool::new(false), notifier: Mutex::new(None) } + } + pub fn next_arming( + &self, + nextf: impl Fn() -> Option, + ) -> Option { + self.armed.store(false, Ordering::Release); + let res = nextf(); + if res.is_some() { + // Since a result was successfully retrieved, no need to rearm the + // notification trigger. + return res; + } + + // On the off chance that the underlying resource became available after + // rearming the notification trigger, check again. + self.armed.store(true, Ordering::Release); + if let Some(r) = nextf() { + self.armed.store(false, Ordering::Release); + Some(r) + } else { + None + } + } + pub fn notify(&self, dev: &dyn Device, ctx: &DispCtx) { + if self.armed.load(Ordering::Acquire) { + let inner = self.notifier.lock().unwrap(); + if let Some(func) = inner.as_ref() { + func(dev, ctx); + } + } + } + pub fn set(&self, val: Option>) { + let mut inner = self.notifier.lock().unwrap(); + *inner = val; + } +} + +pub struct AsyncWaiter { + wake: Arc, +} +impl AsyncWaiter { + pub fn new(attach_dev: &dyn Device) -> Self { + let this = Self { wake: Arc::new(Notify::new()) }; + let wake = Arc::clone(&this.wake); + attach_dev + .set_notifier(Some(Box::new(move |_dev, _ctx| wake.notify_one()))); + this + } + pub async fn next( + &self, + dev: &dyn Device, + actx: &AsyncCtx, + ) -> Option { + loop { + { + let ctx = actx.dispctx().await?; + if let Some(r) = dev.next(&ctx) { + return Some(r); + } + } + self.wake.notified().await; + } + } +} diff --git a/propolis/src/dispatch/sync_tasks.rs b/propolis/src/dispatch/sync_tasks.rs index 8aeb43bc6..59d716696 100644 --- a/propolis/src/dispatch/sync_tasks.rs +++ b/propolis/src/dispatch/sync_tasks.rs @@ -350,6 +350,14 @@ impl WorkerCtrl { } } + fn pending_reqs(&self) -> bool { + if self.active_req.load(Ordering::Acquire) { + let inner = self.inner.lock().unwrap(); + return inner.req_hold || inner.req_exit; + } + false + } + fn wait_until_held(&self) { let inner = self.inner.lock().unwrap(); assert!(inner.req_hold); @@ -376,13 +384,18 @@ impl SyncCtx { } /// Returns true if the function holding this [`DispCtx`] object /// should yield control back to the dispatcher. + /// + /// If this thread has been requested to yield, this will block until the + /// yield condition passes. pub fn check_yield(&mut self) -> bool { - if let Some(ctrl) = self.ctrl.as_ref() { - ctrl.check_yield() - } else { - false - } + self.ctrl.as_ref().map_or(false, |c| c.check_yield()) } + + /// Are there pending requests for this thread to yield or exit? + pub fn pending_reqs(&self) -> bool { + self.ctrl.as_ref().map_or(false, |c| c.pending_reqs()) + } + pub fn dispctx(&mut self) -> DispCtx { DispCtx { mctx: &self.shared.mctx, diff --git a/propolis/src/hw/chipset/i440fx.rs b/propolis/src/hw/chipset/i440fx.rs index 53f733bea..8a53834b1 100644 --- a/propolis/src/hw/chipset/i440fx.rs +++ b/propolis/src/hw/chipset/i440fx.rs @@ -75,14 +75,10 @@ impl I440Fx { // XXX: hardcoded attachments for now let lpc = bus.device_at(1, 0).unwrap(); - lpc.as_devinst().unwrap().with_inner(|lpc: Arc| { - lpc.attach(mctx.pio()); - }); + lpc.as_devinst().unwrap().inner_dev::().attach(mctx.pio()); let pm = bus.device_at(1, 3).unwrap(); - pm.as_devinst().unwrap().with_inner(|pm: Arc| { - pm.attach(mctx); - }); + pm.as_devinst().unwrap().inner_dev::().attach(mctx); } fn set_lnk_route(&self, idx: usize, irq: Option) { diff --git a/propolis/src/hw/mod.rs b/propolis/src/hw/mod.rs index 9e57162b3..471965195 100644 --- a/propolis/src/hw/mod.rs +++ b/propolis/src/hw/mod.rs @@ -1,6 +1,7 @@ pub mod chipset; pub mod ibmpc; -pub mod nvme; +// XXX: skip nvme for now +//pub mod nvme; pub mod pci; pub mod ps2ctrl; pub mod qemu; diff --git a/propolis/src/hw/pci/device.rs b/propolis/src/hw/pci/device.rs index 8d04200eb..17a174265 100644 --- a/propolis/src/hw/pci/device.rs +++ b/propolis/src/hw/pci/device.rs @@ -356,15 +356,19 @@ pub struct DeviceInst { } impl DeviceInst { - fn new( + fn new( ident: Ident, cfg_space: RegMap, msix_cfg: Option>, caps: Vec, bars: Bars, - inner: Arc, - inner_any: Arc, - ) -> Self { + inner: Arc, + ) -> Self + where + D: Device + Send + Sync + 'static, + { + let inner_any = + Arc::clone(&inner) as Arc; Self { ident, lintr_req: false, @@ -381,7 +385,7 @@ impl DeviceInst { sa_cell: SelfArcCell::new(), - inner, + inner: inner as Arc, inner_any, } } @@ -734,13 +738,12 @@ impl DeviceInst { fn notify_msi_update(&self, info: MsiUpdate, ctx: &DispCtx) { self.inner.msi_update(info, ctx); } - pub fn with_inner(&self, f: F) -> R - where - T: Send + Sync + 'static, - F: FnOnce(Arc) -> R, - { + /// Get access to the inner device emulation. + /// + /// This will panic if the provided type does not match. + pub fn inner_dev(&self) -> Arc { let inner = Arc::clone(&self.inner_any); - f(Arc::downcast(inner).unwrap()) + Arc::downcast(inner).unwrap() } fn do_reset(&self, ctx: &DispCtx) { let state = self.state.lock().unwrap(); @@ -1422,9 +1425,6 @@ impl Builder { pub fn finish(self, inner: Arc) -> Arc { let bars = Bars::new(&self.bars); - let inner_any = - Arc::clone(&inner) as Arc; - let mut inst = DeviceInst::new( self.ident, self.cfgmap, @@ -1432,7 +1432,6 @@ impl Builder { self.caps, bars, inner, - inner_any, ); inst.lintr_req = self.lintr_req; diff --git a/propolis/src/hw/virtio/block.rs b/propolis/src/hw/virtio/block.rs index fc6796d78..bfaf92678 100644 --- a/propolis/src/hw/virtio/block.rs +++ b/propolis/src/hw/virtio/block.rs @@ -1,6 +1,7 @@ +use std::num::NonZeroU16; use std::sync::Arc; -use crate::block::*; +use crate::block; use crate::common::*; use crate::dispatch::DispCtx; use crate::hw::pci; @@ -8,7 +9,7 @@ use crate::util::regmap::RegMap; use super::bits::*; use super::pci::PciVirtio; -use super::queue::{Chain, VirtQueue}; +use super::queue::{Chain, VirtQueue, VirtQueues}; use super::VirtioDevice; use lazy_static::lazy_static; @@ -17,31 +18,37 @@ use lazy_static::lazy_static; const SECTOR_SZ: usize = 512; pub struct VirtioBlock { - bdev: Arc>, + info: block::DeviceInfo, + queues: VirtQueues, + notifier: block::Notifier, } impl VirtioBlock { pub fn create( queue_size: u16, - bdev: Arc>, + info: block::DeviceInfo, ) -> Arc { // virtio-block only needs two MSI-X entries for its interrupt needs: // - device config changes // - queue 0 notification let msix_count = Some(2); + let queues = VirtQueues::new( + NonZeroU16::new(queue_size).unwrap(), + NonZeroU16::new(1).unwrap(), + ); + + let notifier = block::Notifier::new(); PciVirtio::create( - queue_size, - 1, msix_count, VIRTIO_DEV_BLOCK, pci::bits::CLASS_STORAGE, VIRTIO_BLK_CFG_SIZE, - Arc::new(Self { bdev }), + Arc::new(Self { info, queues, notifier }), ) } fn block_cfg_read(&self, id: &BlockReg, ro: &mut ReadOp) { - let info = self.bdev.inquire(); + let info = self.info; let total_bytes = info.total_size * info.block_size as u64; match id { BlockReg::Capacity => { @@ -61,6 +68,70 @@ impl VirtioBlock { } } } + + fn next_req(&self, ctx: &DispCtx) -> Option { + let vq = &self.queues[0]; + let mem = &ctx.mctx.memctx(); + + let mut chain = Chain::with_capacity(4); + let _clen = vq.pop_avail(&mut chain, mem)?; + + let mut breq = VbReq::default(); + if !chain.read(&mut breq, mem) { + todo!("error handling"); + } + let req = match breq.rtype { + VIRTIO_BLK_T_IN => { + // should be (blocksize * 512) + 1 remaining writable byte for status + // TODO: actually enforce block size + let blocks = (chain.remain_write_bytes() - 1) / SECTOR_SZ; + + if let Some(regions) = chain.writable_bufs(blocks * SECTOR_SZ) { + let mvq = Arc::clone(vq); + Ok(block::Request::new_read( + breq.sector as usize * SECTOR_SZ, + regions, + Box::new(move |res, ctx| { + complete_blockreq(res, chain, mvq, ctx); + }), + )) + } else { + Err(chain) + } + } + VIRTIO_BLK_T_OUT => { + // should be (blocksize * 512) remaining read bytes + let blocks = chain.remain_read_bytes() / SECTOR_SZ; + + if let Some(regions) = chain.readable_bufs(blocks * SECTOR_SZ) { + let mvq = Arc::clone(vq); + Ok(block::Request::new_write( + breq.sector as usize * SECTOR_SZ, + regions, + Box::new(move |res, ctx| { + complete_blockreq(res, chain, mvq, ctx); + }), + )) + } else { + Err(chain) + } + } + _ => Err(chain), + }; + match req { + Err(mut chain) => { + // try to set the status byte to failed + let remain = chain.remain_write_bytes(); + if remain >= 1 { + chain.write_skip(remain - 1); + chain.write(&VIRTIO_BLK_S_UNSUPP, mem); + } + vq.push_used(&mut chain, mem, ctx); + None + } + Ok(r) => Some(r), + } + } } impl VirtioDevice for VirtioBlock { fn device_cfg_rw(&self, mut rwo: RWOp) { @@ -75,8 +146,7 @@ impl VirtioDevice for VirtioBlock { let mut feat = VIRTIO_BLK_F_BLK_SIZE; feat |= VIRTIO_BLK_F_SEG_MAX; - let dev_data = self.bdev.inquire(); - if !dev_data.writable { + if !self.info.writable { feat |= VIRTIO_BLK_F_RO; } feat @@ -85,142 +155,39 @@ impl VirtioDevice for VirtioBlock { // XXX: real features } - fn queue_notify(&self, vq: &Arc, ctx: &DispCtx) { - let mem = &ctx.mctx.memctx(); - - loop { - let mut chain = Chain::with_capacity(4); - let clen = vq.pop_avail(&mut chain, mem); - if clen.is_none() { - break; - } - - let mut breq = VbReq::default(); - if !chain.read(&mut breq, mem) { - todo!("error handling"); - } - match breq.rtype { - VIRTIO_BLK_T_IN => { - // should be (blocksize * 512) + 1 remaining write bytes - let remain = chain.remain_write_bytes(); - let blocks = (remain - 1) / SECTOR_SZ; - - self.bdev.enqueue(Request::new_read( - chain, - Arc::clone(vq), - breq.sector as usize * SECTOR_SZ, - blocks * SECTOR_SZ, - )); - } - VIRTIO_BLK_T_OUT => { - // should be (blocksize * 512) remaining read bytes - let blocks = chain.remain_read_bytes() / SECTOR_SZ; - self.bdev.enqueue(Request::new_write( - chain, - Arc::clone(vq), - breq.sector as usize * SECTOR_SZ, - blocks * SECTOR_SZ, - )); - } - _ => { - // try to set the status byte to failed - let remain = chain.remain_write_bytes(); - if remain >= 1 { - chain.write_skip(remain - 1); - chain.write(&VIRTIO_BLK_S_UNSUPP, mem); - } - vq.push_used(&mut chain, mem, ctx); - } - } - } + fn queue_notify(&self, _vq: &Arc, ctx: &DispCtx) { + self.notifier.notify(self, ctx); + } + fn queues(&self) -> &VirtQueues { + &self.queues } } -impl Entity for VirtioBlock {} -pub struct Request { - op: BlockOp, - off: usize, - xfer_left: usize, - xfer_used: usize, - chain: Chain, +fn complete_blockreq( + res: block::Result, + mut chain: Chain, vq: Arc, + ctx: &DispCtx, +) { + let mem = &ctx.mctx.memctx(); + let _ = match res { + block::Result::Success => chain.write(&VIRTIO_BLK_S_OK, mem), + block::Result::Failure => chain.write(&VIRTIO_BLK_S_IOERR, mem), + block::Result::Unsupported => chain.write(&VIRTIO_BLK_S_UNSUPP, mem), + }; + vq.push_used(&mut chain, mem, ctx); } -impl Request { - fn new_read( - chain: Chain, - vq: Arc, - off: usize, - size: usize, - ) -> Self { - assert_eq!(chain.remain_write_bytes(), size + 1); - Self { - op: BlockOp::Read, - off, - xfer_left: size, - xfer_used: 0, - chain, - vq, - } - } - fn new_write( - chain: Chain, - vq: Arc, - off: usize, - size: usize, - ) -> Self { - assert_eq!(chain.remain_read_bytes(), size); - assert_eq!(chain.remain_write_bytes(), 1); - Self { - op: BlockOp::Write, - off, - xfer_left: size, - xfer_used: 0, - chain, - vq, - } - } -} - -impl BlockReq for Request { - fn oper(&self) -> BlockOp { - self.op - } - - fn offset(&self) -> usize { - self.off +impl block::Device for VirtioBlock { + fn next(&self, ctx: &DispCtx) -> Option { + self.notifier.next_arming(|| self.next_req(ctx)) } - fn next_buf(&mut self) -> Option { - if self.xfer_left == 0 { - return None; - } - let res = match self.op { - BlockOp::Flush => return None, - BlockOp::Read => self.chain.writable_buf(self.xfer_left), - BlockOp::Write => self.chain.readable_buf(self.xfer_left), - }; - if let Some(region) = res.as_ref() { - assert!(self.xfer_left >= region.1); - self.xfer_left -= region.1; - self.xfer_used += region.1; - } - res - } - - fn complete(mut self, res: BlockResult, ctx: &DispCtx) { - assert_eq!(self.chain.remain_write_bytes(), 1); - let mem = &ctx.mctx.memctx(); - match res { - BlockResult::Success => self.chain.write(&VIRTIO_BLK_S_OK, mem), - BlockResult::Failure => self.chain.write(&VIRTIO_BLK_S_IOERR, mem), - BlockResult::Unsupported => { - self.chain.write(&VIRTIO_BLK_S_UNSUPP, mem) - } - }; - self.vq.push_used(&mut self.chain, mem, ctx); + fn set_notifier(&self, val: Option>) { + self.notifier.set(val); } } +impl Entity for VirtioBlock {} #[derive(Copy, Clone, Debug, Default)] #[repr(C)] diff --git a/propolis/src/hw/virtio/mod.rs b/propolis/src/hw/virtio/mod.rs index e46f29078..3d5f37293 100644 --- a/propolis/src/hw/virtio/mod.rs +++ b/propolis/src/hw/virtio/mod.rs @@ -4,27 +4,26 @@ use std::sync::Arc; mod bits; pub mod block; -mod pci; +pub mod pci; mod queue; pub mod viona; use crate::common::*; use crate::dispatch::DispCtx; -use queue::VirtQueue; +use queue::{VirtQueue, VirtQueues}; pub use block::VirtioBlock; -pub trait VirtioDevice: Send + Sync + 'static + Entity { +trait VirtioDevice: Send + Sync + 'static + Entity { fn device_cfg_rw(&self, ro: RWOp); fn device_get_features(&self) -> u32; fn device_set_features(&self, feat: u32); fn queue_notify(&self, vq: &Arc, ctx: &DispCtx); + fn queues(&self) -> &VirtQueues; #[allow(unused_variables)] fn device_reset(&self, ctx: &DispCtx) {} #[allow(unused_variables)] - fn attach(&self, queues: &[Arc]) {} - #[allow(unused_variables)] fn queue_change( &self, vq: &Arc, diff --git a/propolis/src/hw/virtio/pci.rs b/propolis/src/hw/virtio/pci.rs index 7dbfa8587..d0119eed8 100644 --- a/propolis/src/hw/virtio/pci.rs +++ b/propolis/src/hw/virtio/pci.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak}; @@ -87,30 +88,23 @@ pub struct PciVirtio { state: Mutex, state_cv: Condvar, - queue_size: u16, - queues: Vec>, sa_cell: SelfArcCell, dev: Arc, + dev_any: Arc, } impl PciVirtio { - pub fn create( - queue_size: u16, - num_queues: u16, + pub(super) fn create( msix_count: Option, dev_id: u16, dev_class: u8, cfg_sz: usize, - inner: Arc, - ) -> Arc { - assert!(queue_size > 1 && queue_size.is_power_of_two()); - - let mut queues = Vec::new(); - for id in 0..num_queues { - queues.push(Arc::new(VirtQueue::new(id, queue_size))); - } - + inner: Arc, + ) -> Arc + where + D: VirtioDevice + Send + Sync + 'static, + { let layout = [ (VirtioTop::LegacyConfig, LEGACY_REG_SZ), (VirtioTop::DeviceConfig, cfg_sz), @@ -120,6 +114,8 @@ impl PciVirtio { (VirtioTop::DeviceConfig, cfg_sz), ]; + let dev_any = + Arc::clone(&inner) as Arc; let mut this = Arc::new(Self { map: RegMap::create_packed_passthru( cfg_sz + LEGACY_REG_SZ, @@ -131,18 +127,17 @@ impl PciVirtio { ), map_which: AtomicBool::new(false), - state: Mutex::new(VirtioState::new(num_queues)), + state: Mutex::new(VirtioState::new(inner.queues().count().get())), state_cv: Condvar::new(), - queue_size, - queues, dev: inner, + dev_any, sa_cell: SelfArcCell::new(), }); SelfArc::self_arc_init(&mut this); - for queue in this.queues.iter() { + for queue in this.dev.queues()[..].iter() { queue.set_interrupt(IsrIntr::new(this.self_weak())); } @@ -175,7 +170,7 @@ impl PciVirtio { } LegacyReg::QueuePfn => { let state = self.state.lock().unwrap(); - if let Some(queue) = self.queues.get(state.queue_sel as usize) { + if let Some(queue) = self.vq(state.queue_sel) { let addr = queue.ctrl.lock().unwrap().gpa_desc.0; ro.write_u32((addr >> PAGE_SHIFT) as u32); } else { @@ -184,7 +179,7 @@ impl PciVirtio { } } LegacyReg::QueueSize => { - ro.write_u16(self.queue_size); + ro.write_u16(self.dev.queues().queue_size().get()); } LegacyReg::QueueSelect => { let state = self.state.lock().unwrap(); @@ -233,7 +228,7 @@ impl PciVirtio { let mut state = self.state.lock().unwrap(); let mut success = false; let pfn = wo.read_u32(); - if let Some(queue) = self.queues.get(state.queue_sel as usize) { + if let Some(queue) = self.vq(state.queue_sel) { success = queue.map_legacy((pfn as u64) << PAGE_SHIFT); self.queue_change(queue, VqChange::Address, ctx); } @@ -259,7 +254,7 @@ impl PciVirtio { LegacyReg::MsixVectorQueue => { let mut state = self.state.lock().unwrap(); let sel = state.queue_sel as usize; - if let Some(queue) = self.queues.get(sel) { + if let Some(queue) = self.vq(state.queue_sel) { let val = wo.read_u16(); if state.intr_mode != IntrMode::Msi { @@ -309,7 +304,7 @@ impl PciVirtio { } fn queue_notify(&self, queue: u16, ctx: &DispCtx) { probe_virtio_vq_notify!(|| (self as *const PciVirtio as u64, queue)); - if let Some(vq) = self.queues.get(queue as usize) { + if let Some(vq) = self.vq(queue) { self.dev.queue_notify(vq, ctx); } } @@ -322,7 +317,7 @@ impl PciVirtio { self.dev.queue_change(vq, change, ctx); } fn device_reset(&self, mut state: MutexGuard, ctx: &DispCtx) { - for queue in self.queues.iter() { + for queue in self.dev.queues()[..].iter() { queue.reset(); self.queue_change(queue, VqChange::Reset, ctx); } @@ -362,7 +357,7 @@ impl PciVirtio { // To avoid deadlock, the state lock must be dropped while // updating the interrupts handlers on queues. drop(state); - for queue in self.queues.iter() { + for queue in self.dev.queues()[..].iter() { queue.set_interrupt(IsrIntr::new(self.self_weak())); } state = self.state.lock().unwrap(); @@ -380,7 +375,7 @@ impl PciVirtio { } } IntrMode::Msi => { - for (idx, queue) in self.queues.iter().enumerate() { + for (idx, queue) in self.dev.queues()[..].iter().enumerate() { let vec = *state.msix_queue_vec.get(idx).unwrap(); let hdl = state.msix_hdl.as_ref().unwrap().clone(); @@ -396,6 +391,18 @@ impl PciVirtio { state.intr_mode_updating = false; self.state_cv.notify_all(); } + + fn vq(&self, qid: u16) -> Option<&Arc> { + self.dev.queues().get(qid) + } + + /// Get access to the inner device emulation. + /// + /// This will panic if the provided type does not match. + pub fn inner_dev(&self) -> Arc { + let inner = Arc::clone(&self.dev_any); + Arc::downcast(inner).unwrap() + } } impl SelfArc for PciVirtio { @@ -429,7 +436,6 @@ impl pci::Device for PciVirtio { let mut state = self.state.lock().unwrap(); state.lintr_pin = lintr_pin; state.msix_hdl = msix_hdl; - self.dev.attach(&self.queues[..]); } fn interrupt_mode_change(&self, mode: pci::IntrMode) { self.set_intr_mode(match mode { @@ -450,7 +456,7 @@ impl pci::Device for PciVirtio { self.state_cv.wait_while(state, |s| s.intr_mode_updating).unwrap(); state.intr_mode_updating = true; - for vq in self.queues.iter() { + for vq in self.dev.queues()[..].iter() { let val = *state.msix_queue_vec.get(vq.id as usize).unwrap(); // avoid deadlock while modify per-VQ interrupt config diff --git a/propolis/src/hw/virtio/queue.rs b/propolis/src/hw/virtio/queue.rs index 1b4760668..306dfc603 100644 --- a/propolis/src/hw/virtio/queue.rs +++ b/propolis/src/hw/virtio/queue.rs @@ -1,7 +1,9 @@ use std::mem; -use std::num::Wrapping; +use std::num::{NonZeroU16, Wrapping}; +use std::ops::Index; +use std::slice::SliceIndex; use std::sync::atomic::{fence, Ordering}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use super::bits::*; use super::VirtioIntr; @@ -193,19 +195,6 @@ impl VirtQueue { None } } - pub fn avail_count(&self, mem: &MemCtx) -> u16 { - let avail = self.avail.lock().unwrap(); - if !avail.valid { - return 0; - } - if let Some(idx) = mem.read::(avail.gpa_idx) { - let ndesc = Wrapping(idx) - avail.cur_avail_idx; - if ndesc.0 != 0 && ndesc.0 < self.size { - return ndesc.0; - } - } - 0 - } pub fn pop_avail(&self, chain: &mut Chain, mem: &MemCtx) -> Option { assert!(chain.idx.is_none()); let mut avail = self.avail.lock().unwrap(); @@ -404,19 +393,27 @@ impl Chain { }); total == item_sz } - pub fn readable_buf(&mut self, limit: usize) -> Option { - if limit == 0 || self.read_stat.bytes_remain == 0 { + /// Fetch a string of readable guest regions from the chain, provided there + /// are enough to cover a specified length. + pub fn readable_bufs(&mut self, len: usize) -> Option> { + if len == 0 || (self.read_stat.bytes_remain as usize) < len { return None; } - let mut res: Option = None; + let mut bufs = Vec::new(); + let mut remain = len; self.for_remaining_type(true, |addr, blen| { - let to_consume = usize::min(blen, limit); + let to_consume = usize::min(blen, remain); - res = Some(GuestRegion(addr, to_consume)); - (to_consume, false) + bufs.push(GuestRegion(addr, to_consume)); + + // Since we checked for enough remaining bytes ahead of time, there + // should be no risk of this failing. + remain = remain.checked_sub(to_consume).unwrap(); + (to_consume, remain != 0) }); - res + assert_eq!(remain, 0); + Some(bufs) } pub fn write(&mut self, item: &T, mem: &MemCtx) -> bool { let item_sz = mem::size_of::(); @@ -464,19 +461,27 @@ impl Chain { }); true } - pub fn writable_buf(&mut self, limit: usize) -> Option { - if limit == 0 || self.write_stat.bytes_remain == 0 { + /// Fetch a string of writable guest regions from the chain, provided there + /// are enough to cover a specified length. + pub fn writable_bufs(&mut self, len: usize) -> Option> { + if len == 0 || (self.write_stat.bytes_remain as usize) < len { return None; } - let mut res: Option = None; + let mut bufs = Vec::new(); + let mut remain = len; self.for_remaining_type(false, |addr, blen| { - let to_consume = usize::min(blen, limit); + let to_consume = usize::min(blen, remain); + + bufs.push(GuestRegion(addr, to_consume)); - res = Some(GuestRegion(addr, to_consume)); - (to_consume, false) + // Since we checked for enough remaining bytes ahead of time, there + // should be no risk of this failing. + remain = remain.checked_sub(to_consume).unwrap(); + (to_consume, remain != 0) }); - res + assert_eq!(remain, 0); + Some(bufs) } pub fn remain_write_bytes(&self) -> usize { @@ -555,3 +560,36 @@ pub struct MapInfo { pub avail_addr: u64, pub used_addr: u64, } + +pub(super) struct VirtQueues { + queues: Vec>, + size: NonZeroU16, + num: NonZeroU16, +} +impl VirtQueues { + pub fn new(size: NonZeroU16, num: NonZeroU16) -> Self { + assert!(size.get().is_power_of_two()); + let mut queues = Vec::with_capacity(size.get() as usize); + for id in 0..num.get() { + queues.push(Arc::new(VirtQueue::new(id, size.get()))); + } + Self { queues, size, num } + } + pub fn queue_size(&self) -> NonZeroU16 { + self.size + } + pub fn count(&self) -> NonZeroU16 { + self.num + } + pub fn get(&self, qid: u16) -> Option<&Arc> { + self.queues.get(qid as usize) + } +} + +impl]>> Index for VirtQueues { + type Output = S::Output; + + fn index(&self, index: S) -> &Self::Output { + Index::index(&self.queues, index) + } +} diff --git a/propolis/src/hw/virtio/viona.rs b/propolis/src/hw/virtio/viona.rs index 38d89d707..c97db103b 100644 --- a/propolis/src/hw/virtio/viona.rs +++ b/propolis/src/hw/virtio/viona.rs @@ -1,5 +1,6 @@ use std::fs::{File, OpenOptions}; use std::io::{Error, ErrorKind, Result}; +use std::num::NonZeroU16; use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::{Arc, Mutex, Weak}; @@ -14,7 +15,7 @@ use crate::vmm::VmmHdl; use super::bits::*; use super::pci::PciVirtio; -use super::queue::VirtQueue; +use super::queue::{VirtQueue, VirtQueues}; use super::{VirtioDevice, VqChange, VqIntr}; use lazy_static::lazy_static; @@ -24,12 +25,11 @@ use tokio::io::Interest; const ETHERADDRL: usize = 6; struct Inner { - queues: Vec>, poller: Option<(Arc, AsyncTaskId)>, } impl Inner { fn new() -> Self { - Self { queues: Vec::new(), poller: None } + Self { poller: None } } } @@ -41,6 +41,7 @@ pub struct VirtioViona { mtu: Option, hdl: VionaHdl, inner: Mutex, + queues: VirtQueues, sa_cell: SelfArcCell, } @@ -54,12 +55,21 @@ impl VirtioViona { let info = dlhdl.query_vnic(vnic_name)?; let hdl = VionaHdl::new(info.link_id, vm.fd())?; + // TX and RX + let queue_count = NonZeroU16::new(2).unwrap(); + // interrupts for TX, RX, and device config + let msix_count = Some(3); + + let queues = + VirtQueues::new(NonZeroU16::new(queue_size).unwrap(), queue_count); + let mut this = VirtioViona { dev_features: hdl.get_avail_features()?, mac_addr: [0; ETHERADDRL], mtu: info.mtu, hdl, inner: Mutex::new(Inner::new()), + queues, sa_cell: SelfArcCell::new(), }; this.mac_addr.copy_from_slice(&info.mac_addr); @@ -68,14 +78,7 @@ impl VirtioViona { let mut this = Arc::new(this); SelfArc::self_arc_init(&mut this); - // TX and RX - let queue_count = 2; - // interrupts for TX, RX, and device config - let msix_count = Some(3); - Ok(PciVirtio::create( - queue_size, - queue_count, msix_count, VIRTIO_DEV_NET, pci::bits::CLASS_NETWORK, @@ -85,17 +88,14 @@ impl VirtioViona { } fn process_interrupts(&self, ctx: &DispCtx) { - let inner = self.inner.lock().unwrap(); self.hdl .intr_poll(|vq_idx| { self.hdl.ring_intr_clear(vq_idx).unwrap(); - if let Some(vq) = inner.queues.get(vq_idx as usize) { - vq.with_intr(|intr| { - if let Some(intr) = intr { - intr.notify(ctx); - } - }); - } + self.queues[vq_idx as usize].with_intr(|intr| { + if let Some(intr) = intr { + intr.notify(ctx); + } + }); }) .unwrap(); } @@ -196,15 +196,8 @@ impl VirtioDevice for VirtioViona { } } } - - fn attach(&self, queues: &[Arc]) { - let mut inner = self.inner.lock().unwrap(); - // Keep references to all of the virtqueues around so we can issue - // interrupts to them. This is necessary when MSI is not configured or - // is masked (device-wide or for a given queue). - for vq in queues { - inner.queues.push(Arc::clone(vq)); - } + fn queues(&self) -> &VirtQueues { + &self.queues } } impl Entity for VirtioViona { @@ -226,7 +219,7 @@ impl Entity for VirtioViona { let (poller, task) = inner.poller.take().unwrap(); ctx.cancel_async(task); drop(poller); - for vq in inner.queues.iter() { + for vq in self.queues[..].iter() { let _ = self.hdl.ring_reset(vq.id); } } diff --git a/propolis/src/inventory.rs b/propolis/src/inventory.rs index 44efb2ad8..36b4546fd 100644 --- a/propolis/src/inventory.rs +++ b/propolis/src/inventory.rs @@ -78,7 +78,32 @@ impl Inventory { parent_id: Option, ) -> Result { let mut inv = self.inner.lock().unwrap(); - inv.register(ent, name, parent_id) + let to_register = ent.child_register(); + let res = inv.register(ent, name, parent_id)?; + + if let Some(children) = to_register { + for child in children { + // Since the parent successfully registered, the children should + // have no issues. + inv.register_inner( + child.ent, + child.ent_any, + child.name, + Some(res), + ) + .unwrap(); + } + } + Ok(res) + } + + pub fn register_child( + &self, + reg: ChildRegister, + parent_id: EntityID, + ) -> Result { + let mut inv = self.inner.lock().unwrap(); + inv.register_inner(reg.ent, reg.ent_any, reg.name, Some(parent_id)) } /// Access the concrete type of an entity by ID. @@ -405,6 +430,25 @@ pub trait Entity: Send + Sync + 'static { ctx: &DispCtx, ) { } + #[allow(unused_variables)] + fn child_register(&self) -> Option> { + None + } +} + +pub struct ChildRegister { + ent: Arc, + ent_any: Arc, + name: String, +} +impl ChildRegister { + pub fn new(ent: &Arc, name: String) -> Self { + Self { + ent: Arc::clone(ent) as Arc, + ent_any: Arc::clone(ent) as Arc, + name, + } + } } /// ID referencing an entity stored within the inventory. diff --git a/propolis/src/vmm/mapping.rs b/propolis/src/vmm/mapping.rs index 739a2791c..a63aafa6b 100644 --- a/propolis/src/vmm/mapping.rs +++ b/propolis/src/vmm/mapping.rs @@ -10,7 +10,7 @@ use std::fs::File; use std::io::{Error, ErrorKind, Result}; use std::marker::PhantomData; use std::mem::ManuallyDrop; -use std::os::unix::io::AsRawFd; +use std::os::unix::io::{AsRawFd, RawFd}; use std::ptr::{copy_nonoverlapping, NonNull}; // 2MB guard length @@ -521,14 +521,14 @@ impl<'a> SubMapping<'a> { pub trait MappingExt { /// preadv from `file` into multiple mappings - fn preadv(&self, file: &File, offset: i64) -> Result; + fn preadv(&self, fd: RawFd, offset: i64) -> Result; /// pwritev from multiple mappings to `file` - fn pwritev(&self, file: &File, offset: i64) -> Result; + fn pwritev(&self, fd: RawFd, offset: i64) -> Result; } impl<'a, T: AsRef<[SubMapping<'a>]>> MappingExt for T { - fn preadv(&self, file: &File, offset: i64) -> Result { + fn preadv(&self, fd: RawFd, offset: i64) -> Result { if !self .as_ref() .iter() @@ -550,12 +550,7 @@ impl<'a, T: AsRef<[SubMapping<'a>]>> MappingExt for T { .collect::>(); let read = unsafe { - libc::preadv( - file.as_raw_fd(), - iov.as_ptr(), - iov.len() as libc::c_int, - offset, - ) + libc::preadv(fd, iov.as_ptr(), iov.len() as libc::c_int, offset) }; if read == -1 { return Err(Error::last_os_error()); @@ -564,7 +559,7 @@ impl<'a, T: AsRef<[SubMapping<'a>]>> MappingExt for T { Ok(read as usize) } - fn pwritev(&self, file: &File, offset: i64) -> Result { + fn pwritev(&self, fd: RawFd, offset: i64) -> Result { if !self .as_ref() .iter() @@ -586,12 +581,7 @@ impl<'a, T: AsRef<[SubMapping<'a>]>> MappingExt for T { .collect::>(); let written = unsafe { - libc::pwritev( - file.as_raw_fd(), - iov.as_ptr(), - iov.len() as libc::c_int, - offset, - ) + libc::pwritev(fd, iov.as_ptr(), iov.len() as libc::c_int, offset) }; if written == -1 { return Err(Error::last_os_error()); diff --git a/server/src/lib/config.rs b/server/src/lib/config.rs index 76e4c5e59..6cbd559e6 100644 --- a/server/src/lib/config.rs +++ b/server/src/lib/config.rs @@ -1,6 +1,7 @@ //! Describes a server config which may be parsed from a TOML file. use std::collections::{btree_map, BTreeMap}; +use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; @@ -8,6 +9,9 @@ use std::sync::Arc; use serde_derive::Deserialize; use thiserror::Error; +use propolis::block; +use propolis::inventory; + /// Errors which may be returned when parsing the server configuration. #[derive(Error, Debug)] pub enum ParseError { @@ -60,14 +64,15 @@ impl Config { IterDevs { inner: self.devices.iter() } } - pub fn create_block_device( + pub fn create_block_backend( &self, name: &str, - ) -> Result>, ParseError> { + ) -> Result<(Arc, inventory::ChildRegister), ParseError> + { let entry = self.block_devs.get(name).ok_or_else(|| { ParseError::KeyNotFound(name.to_string(), "block_dev".to_string()) })?; - entry.create_block_device::() + entry.create_block_backend() } } @@ -101,9 +106,10 @@ pub struct BlockDevice { } impl BlockDevice { - pub fn create_block_device( + pub fn create_block_backend( &self, - ) -> Result>, ParseError> { + ) -> Result<(Arc, inventory::ChildRegister), ParseError> + { match &self.bdtype as &str { "file" => { let path = self @@ -127,8 +133,14 @@ impl BlockDevice { self.options.get("readonly")?.as_str()?.parse().ok() }() .unwrap_or(false); - - Ok(propolis::block::FileBdev::::create(path, readonly)?) + let nworkers = NonZeroUsize::new(8).unwrap(); + let be = propolis::block::FileBackend::create( + path, readonly, nworkers, + )?; + let child = + inventory::ChildRegister::new(&be, "backend".to_string()); + + Ok((be, child)) } _ => { panic!("unrecognized block dev type {}!", self.bdtype); diff --git a/server/src/lib/initializer.rs b/server/src/lib/initializer.rs index dad1fceef..1f4cc109e 100644 --- a/server/src/lib/initializer.rs +++ b/server/src/lib/initializer.rs @@ -17,7 +17,7 @@ use propolis::hw::qemu::{debug::QemuDebugPort, fwcfg, ramfb}; use propolis::hw::uart::LpcUart; use propolis::hw::virtio; use propolis::instance::Instance; -use propolis::inventory::{EntityID, Inventory}; +use propolis::inventory::{ChildRegister, EntityID, Inventory}; use propolis::vmm::{self, Builder, Machine, MachineCtx, Prot}; use crate::serial::Serial; @@ -197,23 +197,27 @@ impl<'a> MachineInitializer<'a> { Ok(()) } - pub fn initialize_block( + pub fn initialize_virtio_block( &self, chipset: &RegisteredChipset, bdf: pci::Bdf, - block_dev_name: &str, - block_dev: Arc>, + backend: Arc, + be_register: ChildRegister, ) -> Result<(), Error> { - let vioblk = virtio::VirtioBlock::create(0x100, Arc::clone(&block_dev)); - self.inv + let be_info = backend.info(); + let vioblk = virtio::VirtioBlock::create(0x100, be_info); + let id = self + .inv .register(&vioblk, format!("vioblk-{}", bdf), None) .map_err(|e| -> std::io::Error { e.into() })?; + let _ = self.inv.register_child(be_register, id).unwrap(); + + let blk = vioblk + .inner_dev::() + .inner_dev::(); + backend.attach(blk, self.disp); chipset.device().pci_attach(bdf, vioblk); - block_dev.start_dispatch( - format!("bdev-{} thread", block_dev_name), - &self.disp, - ); Ok(()) } diff --git a/server/src/lib/server.rs b/server/src/lib/server.rs index 90b24250d..92519928b 100644 --- a/server/src/lib/server.rs +++ b/server/src/lib/server.rs @@ -268,20 +268,34 @@ async fn instance_ensure( .options .get("block_dev") .ok_or_else(|| { - Error::new(ErrorKind::InvalidData, format!("no block_dev key for {}!", devname)) + Error::new( + ErrorKind::InvalidData, + format!( + "no block_dev key for {}!", + devname + ), + ) })? .as_str() .ok_or_else(|| { - Error::new(ErrorKind::InvalidData, format!("as_str() failed for {}'s block_dev!", devname)) + Error::new( + ErrorKind::InvalidData, + format!( + "as_str() failed for {}'s block_dev!", + devname + ), + ) })?; - let block_dev = server_context + let (backend, creg) = server_context .config - .create_block_device::( - block_dev_name, - ).map_err(|e| { - Error::new(ErrorKind::InvalidData, format!("ParseError: {:?}", e)) - })?; + .create_block_backend(block_dev_name) + .map_err(|e| { + Error::new( + ErrorKind::InvalidData, + format!("ParseError: {:?}", e), + ) + })?; let bdf: pci::Bdf = dev.get("pci-path").ok_or_else(|| { @@ -291,11 +305,8 @@ async fn instance_ensure( ) })?; - init.initialize_block( - &chipset, - bdf, - block_dev_name, - block_dev, + init.initialize_virtio_block( + &chipset, bdf, backend, creg, )?; } "pci-virtio-viona" => { diff --git a/standalone/src/config.rs b/standalone/src/config.rs index 4c3e523c7..882ce0629 100644 --- a/standalone/src/config.rs +++ b/standalone/src/config.rs @@ -1,11 +1,13 @@ use std::collections::{btree_map, BTreeMap}; +use std::num::NonZeroUsize; use std::str::FromStr; use std::sync::Arc; use serde_derive::Deserialize; use crate::hw::pci; -use propolis::block::{BlockDev, BlockReq}; +use propolis::block; +use propolis::inventory::ChildRegister; #[derive(Deserialize, Debug)] struct Top { @@ -46,9 +48,12 @@ impl Config { IterDevs { inner: self.inner.devices.iter() } } - pub fn block_dev(&self, name: &str) -> Arc> { + pub fn block_dev( + &self, + name: &str, + ) -> (Arc, ChildRegister) { let entry = self.inner.block_devs.get(name).unwrap(); - entry.block_dev::() + entry.block_dev() } } @@ -72,9 +77,7 @@ pub struct BlockDevice { } impl BlockDevice { - pub fn block_dev( - &self, - ) -> Arc> { + pub fn block_dev(&self) -> (Arc, ChildRegister) { match &self.bdtype as &str { "file" => { let path = self.options.get("path").unwrap().as_str().unwrap(); @@ -84,7 +87,14 @@ impl BlockDevice { }() .unwrap_or(false); - propolis::block::FileBdev::::create(path, readonly).unwrap() + let be = block::FileBackend::create( + path, + readonly, + NonZeroUsize::new(8).unwrap(), + ) + .unwrap(); + let creg = ChildRegister::new(&be, "backend".to_string()); + (be, creg) } _ => { panic!("unrecognized block dev type {}!", self.bdtype); diff --git a/standalone/src/main.rs b/standalone/src/main.rs index 311ef4db5..74ce95036 100644 --- a/standalone/src/main.rs +++ b/standalone/src/main.rs @@ -7,7 +7,6 @@ extern crate serde; extern crate serde_derive; extern crate toml; -use std::collections::HashMap; use std::fs::File; use std::io::{Error, ErrorKind, Result}; use std::path::Path; @@ -185,7 +184,7 @@ fn main() { inv.register(&debug_device, "debug".to_string(), None) .map_err(|e| -> std::io::Error { e.into() })?; - let mut devices = HashMap::new(); + // let mut devices = HashMap::new(); for (name, dev) in config.devs() { let driver = &dev.driver as &str; @@ -201,19 +200,23 @@ fn main() { let block_dev = dev.options.get("block_dev").unwrap().as_str().unwrap(); - let block_dev = config - .block_dev::(block_dev); + let (backend, creg) = config.block_dev(block_dev); - let vioblk = hw::virtio::VirtioBlock::create( - 0x100, - Arc::clone(&block_dev), - ); - inv.register(&vioblk, format!("vioblk-{}", name), None) + let info = backend.info(); + let vioblk = hw::virtio::VirtioBlock::create(0x100, info); + let id = inv + .register(&vioblk, format!("vioblk-{}", name), None) .map_err(|e| -> std::io::Error { e.into() })?; - chipset.pci_attach(bdf.unwrap(), vioblk); + let _be_id = inv + .register_child(creg, id) + .map_err(|e| -> std::io::Error { e.into() })?; + + let blk = vioblk + .inner_dev::() + .inner_dev::(); + backend.attach(blk as Arc, disp); - block_dev - .start_dispatch(format!("bdev-{} thread", name), disp); + chipset.pci_attach(bdf.unwrap(), vioblk); } "pci-virtio-viona" => { let vnic_name = @@ -227,43 +230,43 @@ fn main() { .map_err(|e| -> std::io::Error { e.into() })?; chipset.pci_attach(bdf.unwrap(), viona); } - "pci-nvme" => { - let nvme = hw::nvme::PciNvme::create(0x1de, 0x1000); - devices.insert(&**name, nvme.clone()); - chipset.pci_attach(bdf.unwrap(), nvme); - } - "nvme-ns" => { - let nvme_ctrl = dev - .options - .get("controller") - .unwrap() - .as_str() - .unwrap(); - - let nvme = devices.get(nvme_ctrl).unwrap_or_else(|| { - panic!("no such nvme controller: {}", nvme_ctrl) - }); - - let block_dev = - dev.options.get("block_dev").unwrap().as_str().unwrap(); - - let block_dev = - config.block_dev::(block_dev); - - let ns = hw::nvme::NvmeNs::create(block_dev.clone()); - - if let Err(e) = - nvme.with_inner(|nvme: Arc| { - nvme.add_ns(ns) - }) - { - eprintln!("failed to attach nvme-ns: {}", e); - std::process::exit(libc::EXIT_FAILURE); - } - - block_dev - .start_dispatch(format!("bdev-{} thread", name), disp); - } + // "pci-nvme" => { + // let nvme = hw::nvme::PciNvme::create(0x1de, 0x1000); + // devices.insert(&**name, nvme.clone()); + // chipset.pci_attach(bdf.unwrap(), nvme); + // } + // "nvme-ns" => { + // let nvme_ctrl = dev + // .options + // .get("controller") + // .unwrap() + // .as_str() + // .unwrap(); + + // let nvme = devices.get(nvme_ctrl).unwrap_or_else(|| { + // panic!("no such nvme controller: {}", nvme_ctrl) + // }); + + // let block_dev = + // dev.options.get("block_dev").unwrap().as_str().unwrap(); + + // let block_dev = + // config.block_dev::(block_dev); + + // let ns = hw::nvme::NvmeNs::create(block_dev.clone()); + + // if let Err(e) = + // nvme.with_inner(|nvme: Arc| { + // nvme.add_ns(ns) + // }) + // { + // eprintln!("failed to attach nvme-ns: {}", e); + // std::process::exit(libc::EXIT_FAILURE); + // } + + // block_dev + // .start_dispatch(format!("bdev-{} thread", name), disp); + // } _ => { eprintln!("unrecognized driver: {}", name); std::process::exit(libc::EXIT_FAILURE); From e2fb77f7334d5025e76f82db1483b6df5771faa7 Mon Sep 17 00:00:00 2001 From: Patrick Mooney Date: Thu, 7 Oct 2021 13:36:21 -0500 Subject: [PATCH 2/3] Feedback from James --- propolis/src/block/file.rs | 32 ++++++++++++-------------------- propolis/src/block/mod.rs | 2 +- server/src/lib/config.rs | 2 +- standalone/src/config.rs | 2 +- 4 files changed, 15 insertions(+), 23 deletions(-) diff --git a/propolis/src/block/file.rs b/propolis/src/block/file.rs index 7104ffe89..d5aaf1049 100644 --- a/propolis/src/block/file.rs +++ b/propolis/src/block/file.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use std::fs::{metadata, File, OpenOptions}; use std::io::{Error, ErrorKind, Result}; use std::num::NonZeroUsize; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::AsRawFd; use std::path::Path; use std::sync::{Arc, Condvar, Mutex}; @@ -12,7 +12,6 @@ use crate::dispatch::{AsyncCtx, DispCtx, Dispatcher, SyncCtx, WakeFn}; use crate::inventory::Entity; use crate::vmm::MappingExt; -use libc::fdatasync; use tokio::sync::Semaphore; // XXX: completely arb for now @@ -20,7 +19,7 @@ const MAX_WORKERS: usize = 32; /// Standard [`BlockDev`] implementation. pub struct FileBackend { - fp: File, + fp: Arc, driver: Mutex>>, worker_count: NonZeroUsize, @@ -52,7 +51,7 @@ impl FileBackend { let len = fp.metadata().unwrap().len() as usize; let this = Self { - fp, + fp: Arc::new(fp), driver: Mutex::new(None), worker_count, @@ -78,7 +77,7 @@ impl block::Backend for FileBackend { let mut driverg = self.driver.lock().unwrap(); assert!(driverg.is_none()); - let driver = Driver::new(&self.fp, dev); + let driver = Driver::new(self.fp.clone(), dev); driver.spawn(self.worker_count, disp); *driverg = Some(driver); } @@ -86,7 +85,7 @@ impl block::Backend for FileBackend { impl Entity for FileBackend {} struct Driver { - fd: RawFd, + fp: Arc, cv: Condvar, queue: Mutex>, idle_threads: Semaphore, @@ -94,10 +93,10 @@ struct Driver { waiter: block::AsyncWaiter, } impl Driver { - fn new(fp: &File, dev: Arc) -> Arc { + fn new(fp: Arc, dev: Arc) -> Arc { let waiter = block::AsyncWaiter::new(dev.as_ref()); Arc::new(Self { - fd: fp.as_raw_fd(), + fp, cv: Condvar::new(), queue: Mutex::new(VecDeque::new()), idle_threads: Semaphore::new(0), @@ -115,7 +114,7 @@ impl Driver { if let Some(req) = guard.pop_front() { drop(guard); let ctx = sctx.dispctx(); - match process_request(self.fd, &req, &ctx) { + match process_request(&self.fp, &req, &ctx) { Ok(_) => req.complete(block::Result::Success, &ctx), Err(_) => req.complete(block::Result::Failure, &ctx), } @@ -187,7 +186,7 @@ impl Driver { } fn process_request( - fd: RawFd, + fp: &File, req: &block::Request, ctx: &DispCtx, ) -> Result<()> { @@ -198,7 +197,7 @@ fn process_request( Error::new(ErrorKind::Other, "bad guest region") })?; - let nbytes = maps.preadv(fd, off as i64)?; + let nbytes = maps.preadv(fp.as_raw_fd(), off as i64)?; if nbytes != req.len() { return Err(Error::new(ErrorKind::Other, "bad read length")); } @@ -208,20 +207,13 @@ fn process_request( Error::new(ErrorKind::Other, "bad guest region") })?; - let nbytes = maps.pwritev(fd, off as i64)?; + let nbytes = maps.pwritev(fp.as_raw_fd(), off as i64)?; if nbytes != req.len() { return Err(Error::new(ErrorKind::Other, "bad write length")); } } block::Operation::Flush(_off, _len) => { - // SAFETY: The backing fd should be valid and fdatasync() is - // doing nothing besides the flush. - unsafe { - let res = fdatasync(fd); - if res != 0 { - return Err(Error::from_raw_os_error(res)); - } - }; + fp.sync_data()?; } } Ok(()) diff --git a/propolis/src/block/mod.rs b/propolis/src/block/mod.rs index aea02becc..6bdec99aa 100644 --- a/propolis/src/block/mod.rs +++ b/propolis/src/block/mod.rs @@ -35,7 +35,7 @@ pub enum Result { pub type CompleteFn = dyn FnOnce(Result, &DispCtx) + Send + Sync + 'static; -/// Trait indicating that a type may be used as a request to a block device. +/// Block device operation request pub struct Request { op: Operation, regions: Vec, diff --git a/server/src/lib/config.rs b/server/src/lib/config.rs index 6cbd559e6..06a541fbe 100644 --- a/server/src/lib/config.rs +++ b/server/src/lib/config.rs @@ -138,7 +138,7 @@ impl BlockDevice { path, readonly, nworkers, )?; let child = - inventory::ChildRegister::new(&be, "backend".to_string()); + inventory::ChildRegister::new(&be, "backend-file".to_string()); Ok((be, child)) } diff --git a/standalone/src/config.rs b/standalone/src/config.rs index 882ce0629..c4311bed5 100644 --- a/standalone/src/config.rs +++ b/standalone/src/config.rs @@ -93,7 +93,7 @@ impl BlockDevice { NonZeroUsize::new(8).unwrap(), ) .unwrap(); - let creg = ChildRegister::new(&be, "backend".to_string()); + let creg = ChildRegister::new(&be, "backend-file".to_string()); (be, creg) } _ => { From f043524f58eed6aa7a850af2f767ca99ff40f794 Mon Sep 17 00:00:00 2001 From: Patrick Mooney Date: Thu, 7 Oct 2021 14:03:30 -0500 Subject: [PATCH 3/3] Fix fmt --- server/src/lib/config.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/lib/config.rs b/server/src/lib/config.rs index 06a541fbe..729d4fb37 100644 --- a/server/src/lib/config.rs +++ b/server/src/lib/config.rs @@ -137,8 +137,10 @@ impl BlockDevice { let be = propolis::block::FileBackend::create( path, readonly, nworkers, )?; - let child = - inventory::ChildRegister::new(&be, "backend-file".to_string()); + let child = inventory::ChildRegister::new( + &be, + "backend-file".to_string(), + ); Ok((be, child)) }