diff --git a/Cargo.lock b/Cargo.lock index 0563448215..8e1c4d5193 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,6 +124,7 @@ dependencies = [ name = "cros_async" version = "0.1.0" dependencies = [ + "async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", "base 0.1.0", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "io_uring 0.1.0", @@ -132,6 +133,7 @@ dependencies = [ "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "syscall_defines 0.1.0", + "thiserror 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -760,6 +762,24 @@ dependencies = [ "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "thiserror" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "thiserror-impl 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tpm2" version = "0.1.0" @@ -922,5 +942,7 @@ dependencies = [ "checksum remain 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "99c861227fc40c8da6fdaa3d58144ac84c0537080a43eb1d7d45c28f88dcb888" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)" = "af6f3550d8dff9ef7dc34d384ac6f107e5d31c8f57d9f28e0081503f547ac8f5" +"checksum thiserror 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)" = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08" +"checksum thiserror-impl 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)" = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" "checksum unicode-width 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "882386231c45df4700b275c7ff55b6f3698780a650026380e72dabe76fa46526" "checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" diff --git a/cros_async/Cargo.toml b/cros_async/Cargo.toml index 741fcfee7f..b502d24883 100644 --- a/cros_async/Cargo.toml +++ b/cros_async/Cargo.toml @@ -5,6 +5,7 @@ authors = ["The Chromium OS Authors"] edition = "2018" [dependencies] +async-trait = "0.1.36" io_uring = { path = "../io_uring" } libc = "*" paste = "*" @@ -12,6 +13,7 @@ pin-utils = "0.1.0-alpha.4" base = { path = "../base" } syscall_defines = { path = "../syscall_defines" } slab = "0.4" +thiserror = "1.0.20" [dependencies.futures] version = "*" diff --git a/cros_async/src/event.rs b/cros_async/src/event.rs new file mode 100644 index 0000000000..852b58eeaa --- /dev/null +++ b/cros_async/src/event.rs @@ -0,0 +1,47 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use crate::{new, AsyncResult, IoSourceExt}; +use std::os::unix::io::AsRawFd; + +/// An async version of sys_util::EventFd. +pub struct EventAsync { + io_source: Box + 'static>, +} + +impl EventAsync { + /// Creates a new EventAsync wrapper around the provided eventfd. + #[allow(dead_code)] + pub fn new(f: F) -> AsyncResult> { + Ok(EventAsync { io_source: new(f)? }) + } + + /// Gets the next value from the eventfd. + #[allow(dead_code)] + pub async fn next_val(&self) -> AsyncResult { + self.io_source.read_u64().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use base::Event; + use futures::pin_mut; + + #[test] + fn next_val_reads_value() { + async fn go(event: Event) -> u64 { + let event_async = EventAsync::new(event).unwrap(); + event_async.next_val().await.unwrap() + } + + let eventfd = Event::new().unwrap(); + eventfd.write(0xaa).unwrap(); + let fut = go(eventfd); + pin_mut!(fut); + let val = crate::run_executor(crate::RunOne::new(fut)).unwrap(); + assert_eq!(val, 0xaa); + } +} diff --git a/cros_async/src/io_ext.rs b/cros_async/src/io_ext.rs index 0a9e8c698b..8e15f98a85 100644 --- a/cros_async/src/io_ext.rs +++ b/cros_async/src/io_ext.rs @@ -4,11 +4,10 @@ //! # `IoSourceExt` //! -//! Extension functions to asynchronously access files. -//! `IoSourceExt` is the interface exposed to users for `IoSource` objects. Using `IoSource` -//! directly is inconvenient and requires dealing with state machines for the backing uring and -//! future libraries. `IoSourceExt` instead provides users with a future that can be `await`ed from -//! async context. +//! User functions to asynchronously access files. +//! Using `IoSource` directly is inconvenient and requires dealing with state +//! machines for the backing uring, future libraries, etc. `IoSourceExt` instead +//! provides users with a future that can be `await`ed from async context. //! //! Each member of `IoSourceExt` returns a future for the supported operation. One or more //! operation can be pending at a time. @@ -16,84 +15,332 @@ //! Operations can only access memory in a `Vec` or an implementor of `BackingMemory`. See the //! `URingExecutor` documentation for an explaination of why. -use std::pin::Pin; +use crate::poll_source::PollSource; +use crate::UringSource; +use async_trait::async_trait; +use std::os::unix::io::AsRawFd; use std::rc::Rc; +use thiserror::Error as ThisError; -use crate::io_source::IoSource; -use crate::uring_futures; use crate::uring_mem::{BackingMemory, MemRegion}; -/// Extends IoSource with ergonomic methods to perform asynchronous IO. -pub trait IoSourceExt: IoSource { - /// Reads from the iosource at `file_offset` and fill the given `vec`. - fn read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec) -> uring_futures::ReadVec<'a, Self> - where - Self: Unpin, - { - uring_futures::ReadVec::new(Pin::new(self), file_offset, vec) - } +#[derive(ThisError, Debug)] +pub enum Error { + /// An error with a polled(FD) source. + #[error("An error with a poll source: {0}")] + Poll(crate::poll_source::Error), + /// An error reading from a wrapped source. + #[error("An error from the source: {0}")] + ReadingInner(isize), + /// An error with a uring source. + #[error("An error with a uring source: {0}")] + Uring(crate::uring_executor::Error), +} +pub type Result = std::result::Result; +/// Ergonomic methods for async reads. +#[async_trait(?Send)] +pub trait ReadAsync { + /// Reads from the iosource at `file_offset` and fill the given `vec`. + async fn read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec) -> Result<(usize, Vec)>; + + /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. + async fn read_to_mem<'a>( + &'a self, + file_offset: u64, + mem: Rc, + mem_offsets: &'a [MemRegion], + ) -> Result; + + /// Wait for the FD of `self` to be readable. + async fn wait_readable(&self) -> Result<()>; + + /// Reads a single u64 from the current offset. + async fn read_u64(&self) -> Result; +} + +/// Ergonomic methods for async writes. +#[async_trait(?Send)] +pub trait WriteAsync { /// Writes from the given `vec` to the file starting at `file_offset`. - fn write_from_vec<'a>( + async fn write_from_vec<'a>( &'a self, file_offset: u64, vec: Vec, - ) -> uring_futures::WriteVec<'a, Self> - where - Self: Unpin, - { - uring_futures::WriteVec::new(Pin::new(self), file_offset, vec) - } - - /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. - fn read_to_mem<'a, 'b>( - &'a self, - file_offset: u64, - mem: Rc, - mem_offsets: &'b [MemRegion], - ) -> uring_futures::ReadMem<'a, 'b, Self> - where - Self: Unpin, - { - uring_futures::ReadMem::new(Pin::new(self), file_offset, mem, mem_offsets) - } + ) -> Result<(usize, Vec)>; /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. - fn write_from_mem<'a, 'b>( + async fn write_from_mem<'a>( &'a self, file_offset: u64, mem: Rc, - mem_offsets: &'b [MemRegion], - ) -> uring_futures::WriteMem<'a, 'b, Self> - where - Self: Unpin, - { - uring_futures::WriteMem::new(Pin::new(self), file_offset, mem, mem_offsets) - } + mem_offsets: &'a [MemRegion], + ) -> Result; - /// See `fallocate(2)` for details. - fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> uring_futures::Fallocate - where - Self: Unpin, - { - uring_futures::Fallocate::new(Pin::new(self), file_offset, len, mode) - } + /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. + async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>; /// Sync all completed write operations to the backing storage. - fn fsync(&self) -> uring_futures::Fsync - where - Self: Unpin, - { - uring_futures::Fsync::new(Pin::new(self)) - } + async fn fsync(&self) -> Result<()>; +} - /// Wait for the FD of `self` to be readable. - fn wait_readable(&self) -> uring_futures::PollFd<'_, Self> - where - Self: Unpin, - { - uring_futures::PollFd::new(Pin::new(self)) +/// Subtrait for general async IO. +#[async_trait(?Send)] +pub trait IoSourceExt: ReadAsync + WriteAsync { + /// Yields the underlying IO source. + fn into_source(self: Box) -> F; + + /// Provides a mutable ref to the underlying IO source. + fn as_source_mut(&mut self) -> &mut F; + + /// Provides a ref to the underlying IO source. + fn as_source(&self) -> &F; +} + +/// Creates a concrete `IoSourceExt` that uses uring if available or falls back to the fd_executor if not. +/// Note that on older kernels (pre 5.6) FDs such as event or timer FDs are unreliable when +/// having readvwritev performed through io_uring. To deal with EventFd or TimerFd, use +/// `IoSourceExt::read_u64`. +pub fn new<'a, F: AsRawFd + 'a>(f: F) -> Result + 'a>> { + if crate::uring_executor::use_uring() { + new_uring(f) + } else { + new_poll(f) } } -impl IoSourceExt for T {} +/// Creates a concrete `IoSourceExt` using Uring. +pub(crate) fn new_uring<'a, F: AsRawFd + 'a>(f: F) -> Result + 'a>> { + UringSource::new(f) + .map(|u| Box::new(u) as Box>) + .map_err(Error::Uring) +} + +/// Creates a concrete `IoSourceExt` using the fd_executor. +pub(crate) fn new_poll<'a, F: AsRawFd + 'a>(f: F) -> Result + 'a>> { + PollSource::new(f) + .map(|u| Box::new(u) as Box>) + .map_err(Error::Poll) +} + +#[cfg(test)] +mod tests { + use std::fs::{File, OpenOptions}; + + use futures::pin_mut; + + use super::*; + use crate::uring_mem::{MemRegion, VecIoWrapper}; + use crate::Executor; + use std::os::unix::io::AsRawFd; + use std::rc::Rc; + + #[test] + fn readvec() { + async fn go(async_source: Box>) { + let v = vec![0x55u8; 32]; + let v_ptr = v.as_ptr(); + let ret = async_source.read_to_vec(0, v).await.unwrap(); + assert_eq!(ret.0, 32); + let ret_v = ret.1; + assert_eq!(v_ptr, ret_v.as_ptr()); + assert!(ret_v.iter().all(|&b| b == 0)); + } + + let f = File::open("/dev/zero").unwrap(); + let uring_source = new_uring(f).unwrap(); + let fut = go(uring_source); + pin_mut!(fut); + crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + + let f = File::open("/dev/zero").unwrap(); + let poll_source = new_poll(f).unwrap(); + let fut = go(poll_source); + pin_mut!(fut); + crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + } + + #[test] + fn writevec() { + async fn go(async_source: Box>) { + let v = vec![0x55u8; 32]; + let v_ptr = v.as_ptr(); + let ret = async_source.write_from_vec(0, v).await.unwrap(); + assert_eq!(ret.0, 32); + let ret_v = ret.1; + assert_eq!(v_ptr, ret_v.as_ptr()); + } + + let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); + let uring_source = new_uring(f).unwrap(); + let fut = go(uring_source); + pin_mut!(fut); + crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + + let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); + let poll_source = new_poll(f).unwrap(); + let fut = go(poll_source); + pin_mut!(fut); + crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + } + + #[test] + fn readmem() { + async fn go(async_source: Box>) { + let mem = Rc::new(VecIoWrapper::from(vec![0x55u8; 8192])); + let ret = async_source + .read_to_mem( + 0, + Rc::::clone(&mem), + &[ + MemRegion { offset: 0, len: 32 }, + MemRegion { + offset: 200, + len: 56, + }, + ], + ) + .await + .unwrap(); + assert_eq!(ret, 32 + 56); + let vec: Vec = match Rc::try_unwrap(mem) { + Ok(v) => v.into(), + Err(_) => panic!("Too many vec refs"), + }; + assert!(vec.iter().take(32).all(|&b| b == 0)); + assert!(vec.iter().skip(32).take(168).all(|&b| b == 0x55)); + assert!(vec.iter().skip(200).take(56).all(|&b| b == 0)); + assert!(vec.iter().skip(256).all(|&b| b == 0x55)); + } + + let f = File::open("/dev/zero").unwrap(); + let uring_source = new_uring(f).unwrap(); + let fut = go(uring_source); + pin_mut!(fut); + crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + + let f = File::open("/dev/zero").unwrap(); + let poll_source = new_poll(f).unwrap(); + let fut = go(poll_source); + pin_mut!(fut); + crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + } + + #[test] + fn writemem() { + async fn go(async_source: Box>) { + let mem = Rc::new(VecIoWrapper::from(vec![0x55u8; 8192])); + let ret = async_source + .write_from_mem( + 0, + Rc::::clone(&mem), + &[MemRegion { offset: 0, len: 32 }], + ) + .await + .unwrap(); + assert_eq!(ret, 32); + } + + let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); + let uring_source = new_uring(f).unwrap(); + let fut = go(uring_source); + pin_mut!(fut); + crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + + let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); + let poll_source = new_poll(f).unwrap(); + let fut = go(poll_source); + pin_mut!(fut); + crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + } + + #[test] + fn read_u64s() { + async fn go(async_source: F) -> u64 { + let source = new(async_source).unwrap(); + source.read_u64().await.unwrap() + } + + let f = File::open("/dev/zero").unwrap(); + let fut = go(f); + pin_mut!(fut); + let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + assert_eq!(val, 0); + } + + #[test] + fn read_eventfds() { + use base::EventFd; + + async fn go(source: Box>) -> u64 { + source.read_u64().await.unwrap() + } + + let eventfd = EventFd::new().unwrap(); + eventfd.write(0x55).unwrap(); + let fut = go(new(eventfd).unwrap()); + pin_mut!(fut); + let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + assert_eq!(val, 0x55); + + let eventfd = EventFd::new().unwrap(); + eventfd.write(0xaa).unwrap(); + let fut = go(new_poll(eventfd).unwrap()); + pin_mut!(fut); + let val = crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + assert_eq!(val, 0xaa); + } + + #[test] + fn fsync() { + async fn go(source: Box>) { + let v = vec![0x55u8; 32]; + let v_ptr = v.as_ptr(); + let ret = source.write_from_vec(0, v).await.unwrap(); + assert_eq!(ret.0, 32); + let ret_v = ret.1; + assert_eq!(v_ptr, ret_v.as_ptr()); + source.fsync().await.unwrap(); + } + + let f = tempfile::tempfile().unwrap(); + let source = new(f).unwrap(); + + let fut = go(source); + pin_mut!(fut); + crate::run_one(fut).unwrap(); + } +} diff --git a/cros_async/src/io_source.rs b/cros_async/src/io_source.rs index bd9ad2e38f..0cfd1e91be 100644 --- a/cros_async/src/io_source.rs +++ b/cros_async/src/io_source.rs @@ -2,7 +2,6 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; @@ -15,7 +14,7 @@ pub trait IoSource { /// Starts an operation to read from `self` at `file_offset` to `mem` at the addresses and /// lengths given in `mem_offsets`. fn read_to_mem( - self: Pin<&Self>, + &self, file_offset: u64, mem: Rc, mem_offsets: &[MemRegion], @@ -24,32 +23,23 @@ pub trait IoSource { /// Starts an operation to write to `self` at `file_offset` from `mem` at the addresses and /// lengths given in `mem_offsets`. fn write_from_mem( - self: Pin<&Self>, + &self, file_offset: u64, mem: Rc, mem_offsets: &[MemRegion], ) -> Result; /// Asynchronously `fallocate(2)` - fn fallocate( - self: Pin<&Self>, - file_offset: u64, - len: u64, - mode: u32, - ) -> Result; + fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result; /// Starts the fsync operation that will sync all completed writes to the backing storage. - fn fsync(self: Pin<&Self>) -> Result; + fn fsync(&self) -> Result; /// Waits for the inner source to be readable. This is similar to calling `poll` with the FD of /// `self`. However, this returns a `PendingOperation` which can be polled asynchronously for /// completion. - fn wait_readable(self: Pin<&Self>) -> Result; + fn wait_readable(&self) -> Result; /// Checks if a `PendingOperation` returned from another trait method is complete. - fn poll_complete( - self: Pin<&Self>, - cx: &mut Context, - token: &mut PendingOperation, - ) -> Poll>; + fn poll_complete(&self, cx: &mut Context, token: &mut PendingOperation) -> Poll>; } diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs index 399fd5f032..7438ec818c 100644 --- a/cros_async/src/lib.rs +++ b/cros_async/src/lib.rs @@ -50,20 +50,20 @@ //! long as kernels < 5.4 are supported. //! The other method submits operations to io_uring and is signaled when they complete. This is more //! efficient, but only supported on kernel 5.4+. -//! If `PollOrRing` is used to interface with async IO, then the correct backend will be chosen +//! If `IoSourceExt::new` is used to interface with async IO, then the correct backend will be chosen //! automatically. //! //! # Examples //! -//! See the docs for `PollOrRing` if support for kernels <5.4 is required. Focus on `UringSource` if +//! See the docs for `IoSourceExt` if support for kernels <5.4 is required. Focus on `UringSource` if //! all systems have support for io_uring. mod complete; +mod event; mod executor; mod fd_executor; mod io_ext; mod io_source; -mod poll_or_ring; mod poll_source; mod select; mod uring_executor; @@ -71,44 +71,36 @@ mod uring_futures; pub mod uring_mem; mod waker; +pub use event::EventAsync; pub use executor::Executor; -pub use io_ext::*; -pub use poll_or_ring::Error as AsyncError; -pub use poll_or_ring::{PollOrRing, U64Source}; +pub use io_ext::{ + new, Error as AsyncError, IoSourceExt, ReadAsync, Result as AsyncResult, WriteAsync, +}; +pub use poll_source::PollSource; pub use select::SelectResult; pub use uring_futures::UringSource; pub use uring_mem::{BackingMemory, MemRegion}; -use std::fmt::{self, Display}; use std::future::Future; use std::pin::Pin; use executor::{FutureList, RunOne}; use fd_executor::FdExecutor; +use thiserror::Error as ThisError; use uring_executor::URingExecutor; use waker::WakerToken; -#[derive(Debug)] +#[derive(ThisError, Debug)] pub enum Error { /// Error from the FD executor. + #[error("Failure in the FD executor: {0}")] FdExecutor(fd_executor::Error), /// Error from the uring executor. + #[error("Failure in the uring executor: {0}")] URingExecutor(uring_executor::Error), } pub type Result = std::result::Result; -impl Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - use self::Error::*; - - match self { - FdExecutor(e) => write!(f, "Failure in the FD executor: {}", e), - URingExecutor(e) => write!(f, "Failure in the uring executor: {}", e), - } - } -} -impl std::error::Error for Error {} - // Runs an executor with the given future list. // Chooses the uring executor if available, otherwise falls back to the FD executor. fn run_executor(future_list: T) -> Result { diff --git a/cros_async/src/poll_or_ring.rs b/cros_async/src/poll_or_ring.rs deleted file mode 100644 index 1ccd93b045..0000000000 --- a/cros_async/src/poll_or_ring.rs +++ /dev/null @@ -1,502 +0,0 @@ -// Copyright 2020 The Chromium OS Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -//! Temporarily to be used to hide the poll vs uring distinction from the rest of crosvm. - -use std::fmt::{self, Display}; -use std::os::unix::io::AsRawFd; -use std::rc::Rc; - -use crate::io_ext::IoSourceExt; -use crate::poll_source::PollSource; -use crate::uring_mem::{BackingMemory, MemRegion}; -use crate::UringSource; - -#[derive(Debug)] -/// Errors when executing the future. -pub enum Error { - /// An error with a polled(FD) source. - Poll(crate::poll_source::Error), - /// An error reading from a wrapped source. - ReadingInner(isize), - /// An error with a uring source. - Uring(crate::uring_executor::Error), -} -pub type Result = std::result::Result; - -impl Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - use self::Error::*; - - match self { - Poll(e) => write!(f, "An error with a poll source: {}.", e), - ReadingInner(e) => write!(f, "An error reading from the source: {}.", e), - Uring(e) => write!(f, "An error with a uring source: {}.", e), - } - } -} -impl std::error::Error for Error {} - -/// Either a polled or uring driven IO source. -/// Provided to allow for a common interface while older kernels without uring support are still -/// used. -/// Because async functions aren't allowed in traits, this wrapper is needed to abstract the two -/// implementations. -/// -/// # Example -/// ```rust -/// use std::fs::File; -/// use cros_async::{PollOrRing}; -/// -/// async fn read_four_bytes(source: &PollOrRing) -> (usize, Vec) { -/// let mem = vec![0u8; 4]; -/// source.read_to_vec(0, mem).await.unwrap() -/// } -/// -/// fn read_file(f: File) -> Result<(), Box> { -/// let async_source = PollOrRing::new(f)?; -/// let read_future = read_four_bytes(&async_source); -/// futures::pin_mut!(read_future); -/// let (nread, vec) = cros_async::run_one(read_future)?; -/// assert_eq!(nread, 4); -/// assert_eq!(vec.len(), 4); -/// Ok(()) -/// } -/// ``` -pub enum PollOrRing { - Poll(PollSource), - Uring(UringSource), -} - -impl PollOrRing { - /// Creates a `PollOrRing` that uses uring if available or falls back to the fd_executor if not. - /// Note that on older kernels (pre 5.6) FDs such as event or timer FDs are unreliable when - /// having readvwritev performed through io_uring. To deal with Event or Timer, use - /// `U64Source` instead. - pub fn new(f: F) -> Result { - if crate::uring_executor::use_uring() { - Self::new_uring(f) - } else { - Self::new_poll(f) - } - } - - /// Creates a `PollOrRing` that uses uring. - pub fn new_uring(f: F) -> Result { - UringSource::new(f) - .map_err(Error::Uring) - .map(PollOrRing::Uring) - } - - /// Creates a `PollOrRing` that uses polled FDs. - pub fn new_poll(f: F) -> Result { - PollSource::new(f) - .map_err(Error::Poll) - .map(PollOrRing::Poll) - } - - pub fn into_source(self) -> F { - match self { - PollOrRing::Poll(s) => s.into_source(), - PollOrRing::Uring(s) => s.into_source(), - } - } - - /// Reads from the iosource at `file_offset` and fill the given `vec`. - pub async fn read_to_vec(&self, file_offset: u64, vec: Vec) -> Result<(usize, Vec)> { - match &self { - PollOrRing::Poll(s) => s - .read_to_vec(file_offset, vec) - .await - .map_err(Error::Poll) - .map(|(n, vec)| (n as usize, vec)), - PollOrRing::Uring(s) => s - .read_to_vec(file_offset, vec) - .await - .map_err(Error::Uring) - .map(|(n, vec)| (n as usize, vec)), - } - } - - /// Writes from the given `vec` to the file starting at `file_offset`. - pub async fn write_from_vec(&self, file_offset: u64, vec: Vec) -> Result<(usize, Vec)> { - match &self { - PollOrRing::Poll(s) => s - .write_from_vec(file_offset, vec) - .await - .map_err(Error::Poll) - .map(|(n, vec)| (n as usize, vec)), - PollOrRing::Uring(s) => s - .write_from_vec(file_offset, vec) - .await - .map_err(Error::Uring) - .map(|(n, vec)| (n as usize, vec)), - } - } - - /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. - pub async fn read_to_mem<'a>( - &'a self, - file_offset: u64, - mem: Rc, - mem_offsets: &'a [MemRegion], - ) -> Result - where - Self: Unpin, - { - match &self { - PollOrRing::Poll(s) => s - .read_to_mem(file_offset, mem, mem_offsets) - .await - .map_err(Error::Poll) - .map(|n| n as usize), - PollOrRing::Uring(s) => s - .read_to_mem(file_offset, mem, mem_offsets) - .await - .map_err(Error::Uring) - .map(|n| n as usize), - } - } - - /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. - pub async fn write_from_mem<'a>( - &'a self, - file_offset: u64, - mem: Rc, - mem_offsets: &'a [MemRegion], - ) -> Result - where - Self: Unpin, - { - match &self { - PollOrRing::Poll(s) => s - .write_from_mem(file_offset, mem, mem_offsets) - .await - .map_err(Error::Poll) - .map(|n| n as usize), - PollOrRing::Uring(s) => s - .write_from_mem(file_offset, mem, mem_offsets) - .await - .map_err(Error::Uring) - .map(|n| n as usize), - } - } - - /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. - pub async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()> { - match &self { - PollOrRing::Poll(s) => s.fallocate(file_offset, len, mode).map_err(Error::Poll), - PollOrRing::Uring(s) => s - .fallocate(file_offset, len, mode) - .await - .map_err(Error::Uring), - } - } - - /// Sync all completed operations to the disk. Note this op is synchronous when using the Polled - /// backend. - pub async fn fsync(&self) -> Result<()> { - match &self { - PollOrRing::Poll(s) => s.fsync().map_err(Error::Poll), - PollOrRing::Uring(s) => s.fsync().await.map_err(Error::Uring), - } - } - - /// Wait for the soruce to be readable. - pub async fn wait_readable(&self) -> Result<()> { - match &self { - PollOrRing::Poll(s) => s.wait_readable().await.map_err(Error::Poll), - PollOrRing::Uring(s) => s.wait_readable().await.map_err(Error::Uring), - } - } -} - -impl std::ops::Deref for PollOrRing { - type Target = F; - fn deref(&self) -> &Self::Target { - match &self { - PollOrRing::Poll(s) => &s, - PollOrRing::Uring(s) => &s, - } - } -} - -impl std::ops::DerefMut for PollOrRing { - fn deref_mut(&mut self) -> &mut Self::Target { - match self { - PollOrRing::Poll(s) => s, - PollOrRing::Uring(s) => s, - } - } -} - -/// Convenience helper for reading a series of u64s which is common for timer and event. -pub struct U64Source { - inner: PollOrRing, -} - -impl U64Source { - pub fn new(source: F) -> Result { - Ok(U64Source { - inner: PollOrRing::new(source)?, - }) - } - - pub fn new_poll(source: F) -> Result { - Ok(U64Source { - inner: PollOrRing::new_poll(source)?, - }) - } - - pub async fn next_val(&mut self) -> Result { - match &mut self.inner { - PollOrRing::Poll(s) => s.read_u64().await.map_err(Error::Poll), - PollOrRing::Uring(s) => { - s.wait_readable().await.map_err(Error::Uring)?; - let mut bytes = 0u64.to_ne_bytes(); - // Safe to read to the buffer of known length. - let ret = - unsafe { libc::read(s.as_raw_fd(), bytes.as_mut_ptr() as *mut _, bytes.len()) }; - if ret < 0 { - return Err(Error::ReadingInner(ret)); - } - Ok(u64::from_ne_bytes(bytes)) - } - } - } -} - -impl std::ops::Deref for U64Source { - type Target = F; - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl std::ops::DerefMut for U64Source { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -#[cfg(test)] -mod tests { - use std::fs::{File, OpenOptions}; - - use futures::pin_mut; - - use crate::uring_mem::{MemRegion, VecIoWrapper}; - use crate::Executor; - - use super::*; - - #[test] - fn readvec() { - async fn go(async_source: PollOrRing) { - let v = vec![0x55u8; 32]; - let v_ptr = v.as_ptr(); - let ret = async_source.read_to_vec(0, v).await.unwrap(); - assert_eq!(ret.0, 32); - let ret_v = ret.1; - assert_eq!(v_ptr, ret_v.as_ptr()); - assert!(ret_v.iter().all(|&b| b == 0)); - } - - let f = File::open("/dev/zero").unwrap(); - let uring_source = PollOrRing::new_uring(f).unwrap(); - let fut = go(uring_source); - pin_mut!(fut); - crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - - let f = File::open("/dev/zero").unwrap(); - let poll_source = PollOrRing::new_poll(f).unwrap(); - let fut = go(poll_source); - pin_mut!(fut); - crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - } - - #[test] - fn writevec() { - async fn go(async_source: PollOrRing) { - let v = vec![0x55u8; 32]; - let v_ptr = v.as_ptr(); - let ret = async_source.write_from_vec(0, v).await.unwrap(); - assert_eq!(ret.0, 32); - let ret_v = ret.1; - assert_eq!(v_ptr, ret_v.as_ptr()); - } - - let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let uring_source = PollOrRing::new_uring(f).unwrap(); - let fut = go(uring_source); - pin_mut!(fut); - crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - - let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let poll_source = PollOrRing::new_poll(f).unwrap(); - let fut = go(poll_source); - pin_mut!(fut); - crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - } - - #[test] - fn readmem() { - async fn go(async_source: PollOrRing) { - let mem = Rc::new(VecIoWrapper::from(vec![0x55u8; 8192])); - let ret = async_source - .read_to_mem( - 0, - Rc::::clone(&mem), - &[ - MemRegion { offset: 0, len: 32 }, - MemRegion { - offset: 200, - len: 56, - }, - ], - ) - .await - .unwrap(); - assert_eq!(ret, 32 + 56); - let vec: Vec = match Rc::try_unwrap(mem) { - Ok(v) => v.into(), - Err(_) => panic!("Too many vec refs"), - }; - assert!(vec.iter().take(32).all(|&b| b == 0)); - assert!(vec.iter().skip(32).take(168).all(|&b| b == 0x55)); - assert!(vec.iter().skip(200).take(56).all(|&b| b == 0)); - assert!(vec.iter().skip(256).all(|&b| b == 0x55)); - } - - let f = File::open("/dev/zero").unwrap(); - let uring_source = PollOrRing::new_uring(f).unwrap(); - let fut = go(uring_source); - pin_mut!(fut); - crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - - let f = File::open("/dev/zero").unwrap(); - let poll_source = PollOrRing::new_poll(f).unwrap(); - let fut = go(poll_source); - pin_mut!(fut); - crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - } - - #[test] - fn writemem() { - async fn go(async_source: PollOrRing) { - let mem = Rc::new(VecIoWrapper::from(vec![0x55u8; 8192])); - let ret = async_source - .write_from_mem( - 0, - Rc::::clone(&mem), - &[MemRegion { offset: 0, len: 32 }], - ) - .await - .unwrap(); - assert_eq!(ret, 32); - } - - let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let uring_source = PollOrRing::new_uring(f).unwrap(); - let fut = go(uring_source); - pin_mut!(fut); - crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - - let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let poll_source = PollOrRing::new_poll(f).unwrap(); - let fut = go(poll_source); - pin_mut!(fut); - crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - } - - #[test] - fn read_u64s() { - async fn go(async_source: F) -> u64 { - let mut source = U64Source::new(async_source).unwrap(); - source.next_val().await.unwrap() - } - - let f = File::open("/dev/zero").unwrap(); - let fut = go(f); - pin_mut!(fut); - let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - assert_eq!(val, 0); - } - - #[test] - fn read_events() { - use base::Event; - - async fn go(mut source: U64Source) -> u64 { - source.next_val().await.unwrap() - } - - let event = Event::new().unwrap(); - event.write(0x55).unwrap(); - let fut = go(U64Source::new(event).unwrap()); - pin_mut!(fut); - let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - assert_eq!(val, 0x55); - - let event = Event::new().unwrap(); - event.write(0xaa).unwrap(); - let fut = go(U64Source::new_poll(event).unwrap()); - pin_mut!(fut); - let val = crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - assert_eq!(val, 0xaa); - } - - #[test] - fn fsync() { - async fn go(source: PollOrRing) { - let v = vec![0x55u8; 32]; - let v_ptr = v.as_ptr(); - let ret = source.write_from_vec(0, v).await.unwrap(); - assert_eq!(ret.0, 32); - let ret_v = ret.1; - assert_eq!(v_ptr, ret_v.as_ptr()); - source.fsync().await.unwrap(); - } - - let f = tempfile::tempfile().unwrap(); - let source = PollOrRing::new(f).unwrap(); - - let fut = go(source); - pin_mut!(fut); - crate::run_one(fut).unwrap(); - } -} diff --git a/cros_async/src/poll_source.rs b/cros_async/src/poll_source.rs index 0814da53ae..21b295dd9a 100644 --- a/cros_async/src/poll_source.rs +++ b/cros_async/src/poll_source.rs @@ -3,10 +3,10 @@ // found in the LICENSE file. //! A wrapped IO source that uses FdExecutor to drive asynchronous completion. Used from -//! `PollOrRing` when uring isn't available in the kernel. +//! `IoSourceExt::new` when uring isn't available in the kernel. +use async_trait::async_trait; use std::borrow::Borrow; -use std::fmt::{self, Display}; use std::future::Future; use std::ops::{Deref, DerefMut}; use std::os::unix::io::{AsRawFd, RawFd}; @@ -18,61 +18,41 @@ use libc::O_NONBLOCK; use crate::fd_executor::{self, add_read_waker, add_write_waker, PendingWaker}; use crate::uring_mem::{BackingMemory, BorrowedIoVec, MemRegion}; +use crate::AsyncError; +use crate::AsyncResult; +use crate::{IoSourceExt, ReadAsync, WriteAsync}; use base::{self, add_fd_flags}; +use thiserror::Error as ThisError; -#[derive(Debug)] +#[derive(ThisError, Debug)] pub enum Error { /// An error occurred attempting to register a waker with the executor. + #[error("An error occurred attempting to register a waker with the executor: {0}.")] AddingWaker(fd_executor::Error), /// An error occurred when executing fallocate synchronously. + #[error("An error occurred when executing fallocate synchronously: {0}")] Fallocate(base::Error), /// An error occurred when executing fsync synchronously. + #[error("An error occurred when executing fsync synchronously: {0}")] Fsync(base::Error), /// An error occurred when reading the FD. + /// + #[error("An error occurred when reading the FD: {0}.")] Read(base::Error), /// Can't seek file. + #[error("An error occurred when seeking the FD: {0}.")] Seeking(base::Error), /// An error occurred when setting the FD non-blocking. + #[error("An error occurred setting the FD non-blocking: {0}.")] SettingNonBlocking(base::Error), /// An error occurred when writing the FD. + #[error("An error occurred when writing the FD: {0}.")] Write(base::Error), } pub type Result = std::result::Result; -impl std::error::Error for Error {} - -impl Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - use self::Error::*; - - match self { - AddingWaker(e) => write!( - f, - "An error occurred attempting to register a waker with the executor: {}.", - e - ), - Fallocate(e) => write!( - f, - "An error occurred when executing fallocate synchronously: {}", - e - ), - Fsync(e) => write!( - f, - "An error occurred when executing fsync synchronously: {}", - e - ), - Read(e) => write!(f, "An error occurred when reading the FD: {}.", e), - Seeking(e) => write!(f, "An error occurred when seeking the FD: {}.", e), - SettingNonBlocking(e) => { - write!(f, "An error occurred setting the FD non-blocking: {}.", e) - } - Write(e) => write!(f, "An error occurred when writing the FD: {}.", e), - } - } -} - /// Async wrapper for an IO source that uses the FD executor to drive async operations. -/// Used by `PollOrRing` when uring isn't available. +/// Used by `IoSourceExt::new` when uring isn't available. pub struct PollSource { source: F, } @@ -85,32 +65,6 @@ impl PollSource { Ok(Self { source: f }) } - /// read from the iosource at `file_offset` and fill the given `vec`. - pub fn read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec) -> PollReadVec<'a, F> - where - Self: Unpin, - { - PollReadVec { - reader: self, - file_offset, - vec: Some(vec), - pending_waker: None, - } - } - - /// write from the given `vec` to the file starting at `file_offset`. - pub fn write_from_vec<'a>(&'a self, file_offset: u64, vec: Vec) -> PollWriteVec<'a, F> - where - Self: Unpin, - { - PollWriteVec { - writer: self, - file_offset, - vec: Some(vec), - pending_waker: None, - } - } - /// Read a u64 from the current offset. Avoid seeking Fds that don't support `pread`. pub fn read_u64(&self) -> PollReadU64<'_, F> { PollReadU64 { @@ -119,82 +73,6 @@ impl PollSource { } } - /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. - pub fn read_to_mem<'a>( - &'a self, - file_offset: u64, - mem: Rc, - mem_offsets: &'a [MemRegion], - ) -> PollReadMem<'a, F> - where - Self: Unpin, - { - PollReadMem { - reader: self, - file_offset, - mem, - mem_offsets, - pending_waker: None, - } - } - - /// Runs fallocate _synchronously_. There isn't an async equivalent for starting an fallocate. - pub fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()> { - let ret = unsafe { - libc::fallocate64( - self.source.as_raw_fd(), - mode as libc::c_int, - file_offset as libc::off64_t, - len as libc::off64_t, - ) - }; - if ret == 0 { - Ok(()) - } else { - Err(Error::Fallocate(base::Error::last())) - } - } - - /// Runs fsync _synchronously_. There isn't an async equivalent for starting an fsync. - pub fn fsync(&self) -> Result<()> { - let ret = unsafe { libc::fsync(self.source.as_raw_fd()) }; - if ret == 0 { - Ok(()) - } else { - Err(Error::Fsync(base::Error::last())) - } - } - - /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. - pub fn write_from_mem<'a>( - &'a self, - file_offset: u64, - mem: Rc, - mem_offsets: &'a [MemRegion], - ) -> PollWriteMem<'a, F> - where - Self: Unpin, - { - PollWriteMem { - writer: self, - file_offset, - mem, - mem_offsets, - pending_waker: None, - } - } - - /// Waits for the soruce to be readable. - pub fn wait_readable(&self) -> PollWaitReadable - where - Self: Unpin, - { - PollWaitReadable { - pollee: self, - pending_waker: None, - } - } - /// Return the inner source. pub fn into_source(self) -> F { self.source @@ -209,6 +87,138 @@ impl Deref for PollSource { } } +#[async_trait(?Send)] +impl ReadAsync for PollSource { + /// Reads from the iosource at `file_offset` and fill the given `vec`. + async fn read_to_vec<'a>( + &'a self, + file_offset: u64, + vec: Vec, + ) -> AsyncResult<(usize, Vec)> { + let fut = PollReadVec { + reader: self, + file_offset, + vec: Some(vec), + pending_waker: None, + }; + fut.await + .map(|(n, vec)| (n as usize, vec)) + .map_err(AsyncError::Poll) + } + + /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. + async fn read_to_mem<'a>( + &'a self, + file_offset: u64, + mem: Rc, + mem_offsets: &'a [MemRegion], + ) -> AsyncResult { + let fut = PollReadMem { + reader: self, + file_offset, + mem, + mem_offsets, + pending_waker: None, + }; + fut.await.map(|n| n as usize).map_err(AsyncError::Poll) + } + + /// Wait for the FD of `self` to be readable. + async fn wait_readable(&self) -> AsyncResult<()> { + let fut = PollWaitReadable { + pollee: self, + pending_waker: None, + }; + fut.await.map_err(AsyncError::Poll) + } + + async fn read_u64(&self) -> AsyncResult { + PollSource::read_u64(self).await.map_err(AsyncError::Poll) + } +} + +#[async_trait(?Send)] +impl WriteAsync for PollSource { + /// Writes from the given `vec` to the file starting at `file_offset`. + async fn write_from_vec<'a>( + &'a self, + file_offset: u64, + vec: Vec, + ) -> AsyncResult<(usize, Vec)> { + let fut = PollWriteVec { + writer: self, + file_offset, + vec: Some(vec), + pending_waker: None, + }; + fut.await + .map(|(n, vec)| (n as usize, vec)) + .map_err(AsyncError::Poll) + } + + /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. + async fn write_from_mem<'a>( + &'a self, + file_offset: u64, + mem: Rc, + mem_offsets: &'a [MemRegion], + ) -> AsyncResult { + let fut = PollWriteMem { + writer: self, + file_offset, + mem, + mem_offsets, + pending_waker: None, + }; + fut.await.map(|n| n as usize).map_err(AsyncError::Poll) + } + + /// See `fallocate(2)` for details. + async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> { + let ret = unsafe { + libc::fallocate64( + self.source.as_raw_fd(), + mode as libc::c_int, + file_offset as libc::off64_t, + len as libc::off64_t, + ) + }; + if ret == 0 { + Ok(()) + } else { + Err(AsyncError::Poll(Error::Fallocate(base::Error::last()))) + } + } + + /// Sync all completed write operations to the backing storage. + async fn fsync(&self) -> AsyncResult<()> { + let ret = unsafe { libc::fsync(self.source.as_raw_fd()) }; + if ret == 0 { + Ok(()) + } else { + Err(AsyncError::Poll(Error::Fsync(base::Error::last()))) + } + } +} + +#[async_trait(?Send)] +impl IoSourceExt for PollSource { + /// Yields the underlying IO source. + fn into_source(self: Box) -> F { + self.source + } + + /// Provides a mutable ref to the underlying IO source. + fn as_source_mut(&mut self) -> &mut F { + &mut self.source + } + + /// Provides a ref to the underlying IO source. + fn as_source(&self) -> &F { + &self.source + } +} + impl DerefMut for PollSource { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.source @@ -621,19 +631,28 @@ mod tests { #[test] fn fallocate() { - let dir = tempfile::TempDir::new().unwrap(); - let mut file_path = PathBuf::from(dir.path()); - file_path.push("test"); + async fn go() { + let dir = tempfile::TempDir::new().unwrap(); + let mut file_path = PathBuf::from(dir.path()); + file_path.push("test"); - let f = OpenOptions::new() - .create(true) - .write(true) - .open(&file_path) + let f = OpenOptions::new() + .create(true) + .write(true) + .open(&file_path) + .unwrap(); + let source = PollSource::new(f).unwrap(); + source.fallocate(0, 4096, 0).await.unwrap(); + + let meta_data = std::fs::metadata(&file_path).unwrap(); + assert_eq!(meta_data.len(), 4096); + } + + let fut = go(); + pin_mut!(fut); + crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() .unwrap(); - let source = PollSource::new(f).unwrap(); - source.fallocate(0, 4096, 0).unwrap(); - - let meta_data = std::fs::metadata(&file_path).unwrap(); - assert_eq!(meta_data.len(), 4096); } } diff --git a/cros_async/src/uring_futures/fallocate.rs b/cros_async/src/uring_futures/fallocate.rs index 4dfdd9009c..1cc60d3f1a 100644 --- a/cros_async/src/uring_futures/fallocate.rs +++ b/cros_async/src/uring_futures/fallocate.rs @@ -15,12 +15,12 @@ use super::uring_fut::UringFutState; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Fallocate<'a, R: IoSource + ?Sized> { - reader: Pin<&'a R>, + reader: &'a R, state: UringFutState<(u64, u64, u32), ()>, } impl<'a, R: IoSource + ?Sized> Fallocate<'a, R> { - pub(crate) fn new(reader: Pin<&'a R>, file_offset: u64, len: u64, mode: u32) -> Self { + pub(crate) fn new(reader: &'a R, file_offset: u64, len: u64, mode: u32) -> Self { Fallocate { reader, state: UringFutState::new((file_offset, len, mode)), @@ -34,10 +34,8 @@ impl Future for Fallocate<'_, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let state = std::mem::replace(&mut self.state, UringFutState::Processing); let (new_state, ret) = match state.advance( - |(file_offset, len, mode)| { - Ok((self.reader.as_ref().fallocate(file_offset, len, mode)?, ())) - }, - |op| self.reader.as_ref().poll_complete(cx, op), + |(file_offset, len, mode)| Ok((self.reader.fallocate(file_offset, len, mode)?, ())), + |op| self.reader.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -62,7 +60,7 @@ mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::WriteAsync; use crate::UringSource; #[test] @@ -80,7 +78,7 @@ mod tests { let source = UringSource::new(f).unwrap(); if let Err(e) = source.fallocate(0, 4096, 0).await { match e { - crate::uring_executor::Error::Io(io_err) => { + crate::io_ext::Error::Uring(crate::uring_executor::Error::Io(io_err)) => { if io_err.kind() == std::io::ErrorKind::InvalidInput { // Skip the test on kernels before fallocate support. return; diff --git a/cros_async/src/uring_futures/fsync.rs b/cros_async/src/uring_futures/fsync.rs index 16bf0e957c..caeed57325 100644 --- a/cros_async/src/uring_futures/fsync.rs +++ b/cros_async/src/uring_futures/fsync.rs @@ -15,12 +15,12 @@ use super::uring_fut::UringFutState; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Fsync<'a, R: IoSource + ?Sized> { - reader: Pin<&'a R>, + reader: &'a R, state: UringFutState<(), ()>, } impl<'a, R: IoSource + ?Sized> Fsync<'a, R> { - pub(crate) fn new(reader: Pin<&'a R>) -> Self { + pub(crate) fn new(reader: &'a R) -> Self { Fsync { reader, state: UringFutState::new(()), @@ -34,8 +34,8 @@ impl Future for Fsync<'_, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let state = std::mem::replace(&mut self.state, UringFutState::Processing); let (new_state, ret) = match state.advance( - |()| Ok((self.reader.as_ref().fsync()?, ())), - |op| self.reader.as_ref().poll_complete(cx, op), + |()| Ok((self.reader.fsync()?, ())), + |op| self.reader.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -57,7 +57,7 @@ impl Future for Fsync<'_, R> { mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::WriteAsync; use crate::UringSource; #[test] diff --git a/cros_async/src/uring_futures/poll_fd.rs b/cros_async/src/uring_futures/poll_fd.rs index 5c0973a776..6e88701b35 100644 --- a/cros_async/src/uring_futures/poll_fd.rs +++ b/cros_async/src/uring_futures/poll_fd.rs @@ -15,12 +15,12 @@ use super::uring_fut::UringFutState; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct PollFd<'a, R: IoSource + ?Sized> { - reader: Pin<&'a R>, + reader: &'a R, state: UringFutState<(), ()>, } impl<'a, R: IoSource + ?Sized> PollFd<'a, R> { - pub(crate) fn new(reader: Pin<&'a R>) -> Self { + pub(crate) fn new(reader: &'a R) -> Self { PollFd { reader, state: UringFutState::new(()), @@ -34,8 +34,8 @@ impl Future for PollFd<'_, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let state = std::mem::replace(&mut self.state, UringFutState::Processing); let (new_state, ret) = match state.advance( - |()| Ok((self.reader.as_ref().wait_readable()?, ())), - |op| self.reader.as_ref().poll_complete(cx, op), + |()| Ok((self.reader.wait_readable()?, ())), + |op| self.reader.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -59,7 +59,7 @@ mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::ReadAsync; use crate::UringSource; #[test] diff --git a/cros_async/src/uring_futures/read_mem.rs b/cros_async/src/uring_futures/read_mem.rs index ada9feeb0d..c3b1188dfa 100644 --- a/cros_async/src/uring_futures/read_mem.rs +++ b/cros_async/src/uring_futures/read_mem.rs @@ -16,13 +16,13 @@ use super::uring_fut::UringFutState; /// Future for the `read_to_mem` function. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ReadMem<'a, 'b, R: IoSource + ?Sized> { - reader: Pin<&'a R>, + reader: &'a R, state: UringFutState<(u64, Rc, &'b [MemRegion]), Rc>, } impl<'a, 'b, R: IoSource + ?Sized> ReadMem<'a, 'b, R> { pub(crate) fn new( - reader: Pin<&'a R>, + reader: &'a R, file_offset: u64, mem: Rc, mem_offsets: &'b [MemRegion], @@ -50,12 +50,11 @@ impl Future for ReadMem<'_, '_, R> { |(file_offset, mem, mem_offsets)| { Ok(( self.reader - .as_ref() .read_to_mem(file_offset, Rc::clone(&mem), mem_offsets)?, mem, )) }, - |op| self.reader.as_ref().poll_complete(cx, op), + |op| self.reader.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -77,7 +76,7 @@ mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::ReadAsync; use crate::uring_mem::{MemRegion, VecIoWrapper}; use crate::UringSource; diff --git a/cros_async/src/uring_futures/read_vec.rs b/cros_async/src/uring_futures/read_vec.rs index 7f1aff1db0..f80472b21d 100644 --- a/cros_async/src/uring_futures/read_vec.rs +++ b/cros_async/src/uring_futures/read_vec.rs @@ -16,12 +16,12 @@ use super::uring_fut::UringFutState; /// Future for the `read_to_vec` function. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ReadVec<'a, R: IoSource + ?Sized> { - reader: Pin<&'a R>, + reader: &'a R, state: UringFutState<(u64, Rc), Rc>, } impl<'a, R: IoSource + ?Sized> ReadVec<'a, R> { - pub(crate) fn new(reader: Pin<&'a R>, file_offset: u64, vec: Vec) -> Self { + pub(crate) fn new(reader: &'a R, file_offset: u64, vec: Vec) -> Self { ReadVec { reader, state: UringFutState::new((file_offset, Rc::new(VecIoWrapper::from(vec)))), @@ -37,7 +37,7 @@ impl Future for ReadVec<'_, R> { let (new_state, ret) = match state.advance( |(file_offset, wrapped_vec)| { Ok(( - self.reader.as_ref().read_to_mem( + self.reader.read_to_mem( file_offset, Rc::::clone(&wrapped_vec), &[MemRegion { @@ -48,7 +48,7 @@ impl Future for ReadVec<'_, R> { wrapped_vec, )) }, - |op| self.reader.as_ref().poll_complete(cx, op), + |op| self.reader.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -81,7 +81,7 @@ mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::ReadAsync; use crate::UringSource; #[test] diff --git a/cros_async/src/uring_futures/uring_source.rs b/cros_async/src/uring_futures/uring_source.rs index 893ddb3a76..be447b998e 100644 --- a/cros_async/src/uring_futures/uring_source.rs +++ b/cros_async/src/uring_futures/uring_source.rs @@ -4,13 +4,16 @@ use std::ops::{Deref, DerefMut}; use std::os::unix::io::AsRawFd; -use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use crate::io_source::IoSource; use crate::uring_executor::{self, PendingOperation, RegisteredSource, Result}; +use crate::uring_futures; use crate::uring_mem::{BackingMemory, MemRegion}; +use crate::AsyncError; +use crate::AsyncResult; +use async_trait::async_trait; /// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around /// registering an IO source with the uring that provides an `IoSource` implementation. @@ -19,9 +22,9 @@ use crate::uring_mem::{BackingMemory, MemRegion}; /// # Example /// ```rust /// use std::fs::File; -/// use cros_async::{UringSource, IoSourceExt}; +/// use cros_async::{UringSource, ReadAsync}; /// -/// async fn read_four_bytes(source: &UringSource) -> (u32, Vec) { +/// async fn read_four_bytes(source: &UringSource) -> (usize, Vec) { /// let mem = vec![0u8; 4]; /// source.read_to_vec(0, mem).await.unwrap() /// } @@ -59,7 +62,7 @@ impl UringSource { impl IoSource for UringSource { fn read_to_mem( - self: Pin<&Self>, + &self, file_offset: u64, mem: Rc, mem_offsets: &[MemRegion], @@ -69,7 +72,7 @@ impl IoSource for UringSource { } fn write_from_mem( - self: Pin<&Self>, + &self, file_offset: u64, mem: Rc, mem_offsets: &[MemRegion], @@ -78,34 +81,133 @@ impl IoSource for UringSource { .start_write_from_mem(file_offset, mem, mem_offsets) } - fn fallocate( - self: Pin<&Self>, - file_offset: u64, - len: u64, - mode: u32, - ) -> Result { + fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result { self.registered_source .start_fallocate(file_offset, len, mode) } - fn fsync(self: Pin<&Self>) -> Result { + fn fsync(&self) -> Result { self.registered_source.start_fsync() } // wait for the inner source to be readable and return a refernce to it. - fn wait_readable(self: Pin<&Self>) -> Result { + fn wait_readable(&self) -> Result { self.registered_source.poll_fd_readable() } - fn poll_complete( - self: Pin<&Self>, - cx: &mut Context, - token: &mut PendingOperation, - ) -> Poll> { + fn poll_complete(&self, cx: &mut Context, token: &mut PendingOperation) -> Poll> { self.registered_source.poll_complete(cx, token) } } +#[async_trait(?Send)] +impl crate::ReadAsync for UringSource { + /// Reads from the iosource at `file_offset` and fill the given `vec`. + async fn read_to_vec<'a>( + &'a self, + file_offset: u64, + vec: Vec, + ) -> AsyncResult<(usize, Vec)> { + uring_futures::ReadVec::new(self, file_offset, vec) + .await + .map(|(n, vec)| (n as usize, vec)) + .map_err(AsyncError::Uring) + } + + /// Wait for the FD of `self` to be readable. + async fn wait_readable(&self) -> AsyncResult<()> { + uring_futures::PollFd::new(self) + .await + .map_err(AsyncError::Uring) + } + + /// Reads a single u64 from the current offset. + async fn read_u64(&self) -> AsyncResult { + crate::IoSourceExt::wait_readable(self).await?; + let mut bytes = 0u64.to_ne_bytes(); + // Safe to read to the buffer of known length. + let ret = + unsafe { libc::read(self.as_raw_fd(), bytes.as_mut_ptr() as *mut _, bytes.len()) }; + if ret < 0 { + return Err(AsyncError::ReadingInner(ret)); + } + Ok(u64::from_ne_bytes(bytes)) + } + + /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. + async fn read_to_mem<'a>( + &'a self, + file_offset: u64, + mem: Rc, + mem_offsets: &'a [MemRegion], + ) -> AsyncResult { + uring_futures::ReadMem::new(self, file_offset, mem, mem_offsets) + .await + .map(|n| n as usize) + .map_err(AsyncError::Uring) + } +} + +#[async_trait(?Send)] +impl crate::WriteAsync for UringSource { + /// Writes from the given `vec` to the file starting at `file_offset`. + async fn write_from_vec<'a>( + &'a self, + file_offset: u64, + vec: Vec, + ) -> AsyncResult<(usize, Vec)> { + uring_futures::WriteVec::new(self, file_offset, vec) + .await + .map(|(n, vec)| (n as usize, vec)) + .map_err(AsyncError::Uring) + } + + /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. + async fn write_from_mem<'a>( + &'a self, + file_offset: u64, + mem: Rc, + mem_offsets: &'a [MemRegion], + ) -> AsyncResult { + uring_futures::WriteMem::new(self, file_offset, mem, mem_offsets) + .await + .map(|n| n as usize) + .map_err(AsyncError::Uring) + } + + /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. + async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> { + uring_futures::Fallocate::new(self, file_offset, len, mode) + .await + .map_err(AsyncError::Uring) + } + + /// Sync all completed write operations to the backing storage. + async fn fsync(&self) -> AsyncResult<()> { + uring_futures::Fsync::new(self) + .await + .map_err(AsyncError::Uring) + } +} + +#[async_trait(?Send)] +impl crate::IoSourceExt for UringSource { + /// Yields the underlying IO source. + fn into_source(self: Box) -> F { + self.source + } + + /// Provides a mutable ref to the underlying IO source. + fn as_source(&self) -> &F { + &self.source + } + + /// Provides a ref to the underlying IO source. + fn as_source_mut(&mut self) -> &mut F { + &mut self.source + } +} + impl Deref for UringSource { type Target = F; @@ -124,6 +226,7 @@ impl DerefMut for UringSource { mod tests { use futures::pin_mut; use std::future::Future; + use std::pin::Pin; use super::*; diff --git a/cros_async/src/uring_futures/write_mem.rs b/cros_async/src/uring_futures/write_mem.rs index 20633bd631..f5eebbe26d 100644 --- a/cros_async/src/uring_futures/write_mem.rs +++ b/cros_async/src/uring_futures/write_mem.rs @@ -16,13 +16,13 @@ use super::uring_fut::UringFutState; /// Future for the `write_from_mem` function. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct WriteMem<'a, 'b, W: IoSource + ?Sized> { - writer: Pin<&'a W>, + writer: &'a W, state: UringFutState<(u64, Rc, &'b [MemRegion]), Rc>, } impl<'a, 'b, W: IoSource + ?Sized> WriteMem<'a, 'b, W> { pub(crate) fn new( - writer: Pin<&'a W>, + writer: &'a W, file_offset: u64, mem: Rc, mem_offsets: &'b [MemRegion], @@ -49,15 +49,12 @@ impl Future for WriteMem<'_, '_, W> { let (new_state, ret) = match state.advance( |(file_offset, mem, mem_offsets)| { Ok(( - self.writer.as_ref().write_from_mem( - file_offset, - Rc::clone(&mem), - mem_offsets, - )?, + self.writer + .write_from_mem(file_offset, Rc::clone(&mem), mem_offsets)?, mem, )) }, - |op| self.writer.as_ref().poll_complete(cx, op), + |op| self.writer.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -79,7 +76,7 @@ mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::WriteAsync; use crate::uring_mem::MemRegion; use crate::UringSource; diff --git a/cros_async/src/uring_futures/write_vec.rs b/cros_async/src/uring_futures/write_vec.rs index 8d84de1a37..564facfc86 100644 --- a/cros_async/src/uring_futures/write_vec.rs +++ b/cros_async/src/uring_futures/write_vec.rs @@ -17,12 +17,12 @@ use super::uring_fut::UringFutState; /// Future for the `write_to_vec` function. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct WriteVec<'a, W: IoSource + ?Sized> { - writer: Pin<&'a W>, + writer: &'a W, state: UringFutState<(u64, Rc), Rc>, } impl<'a, W: IoSource + ?Sized> WriteVec<'a, W> { - pub(crate) fn new(writer: Pin<&'a W>, file_offset: u64, vec: Vec) -> Self { + pub(crate) fn new(writer: &'a W, file_offset: u64, vec: Vec) -> Self { WriteVec { writer, state: UringFutState::new((file_offset, Rc::new(VecIoWrapper::from(vec)))), @@ -38,7 +38,7 @@ impl Future for WriteVec<'_, W> { let (new_state, ret) = match state.advance( |(file_offset, wrapped_vec)| { Ok(( - self.writer.as_ref().write_from_mem( + self.writer.write_from_mem( file_offset, Rc::::clone(&wrapped_vec), &[MemRegion { @@ -49,7 +49,7 @@ impl Future for WriteVec<'_, W> { wrapped_vec, )) }, - |op| self.writer.as_ref().poll_complete(cx, op), + |op| self.writer.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -80,7 +80,7 @@ mod tests { use futures::pin_mut; use std::fs::OpenOptions; - use crate::io_ext::IoSourceExt; + use crate::io_ext::WriteAsync; use crate::UringSource; #[test] diff --git a/devices/src/virtio/queue.rs b/devices/src/virtio/queue.rs index eabcf9a902..60a7fc6193 100644 --- a/devices/src/virtio/queue.rs +++ b/devices/src/virtio/queue.rs @@ -10,7 +10,7 @@ use std::rc::Rc; use std::sync::atomic::{fence, Ordering}; use base::error; -use cros_async::{AsyncError, U64Source}; +use cros_async::{AsyncError, EventAsync}; use virtio_sys::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use vm_memory::{GuestAddress, GuestMemory}; @@ -389,14 +389,14 @@ impl Queue { pub async fn next_async( &mut self, mem: &GuestMemory, - event: &mut U64Source, + eventfd: &mut EventAsync, ) -> std::result::Result { loop { // Check if there are more descriptors available. if let Some(chain) = self.pop(mem) { return Ok(chain); } - event.next_val().await?; + eventfd.next_val().await?; } } diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 468eef58ed..7182dafc6f 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -11,7 +11,7 @@ path = "src/disk.rs" composite-disk = ["protos", "protobuf"] [dependencies] -async-trait = "*" +async-trait = "0.1.36" base = { path = "../base" } libc = "*" protobuf = { version = "2.3", optional = true } diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 0b0b4d5799..70d58b280d 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -372,17 +372,17 @@ pub trait AsyncDisk: DiskGetLen + FileSetLen + FileAllocate { async fn write_zeroes_at(&self, file_offset: u64, length: u64) -> Result<()>; } -use cros_async::PollOrRing; +use cros_async::IoSourceExt; /// A disk backed by a single file that implements `AsyncDisk` for access. pub struct SingleFileDisk { - inner: PollOrRing, + inner: Box>, } impl TryFrom for SingleFileDisk { type Error = Error; fn try_from(inner: File) -> Result { - PollOrRing::new(inner) + cros_async::new(inner) .map_err(Error::CreateSingleFileDisk) .map(|inner| SingleFileDisk { inner }) } @@ -390,19 +390,19 @@ impl TryFrom for SingleFileDisk { impl DiskGetLen for SingleFileDisk { fn get_len(&self) -> io::Result { - self.inner.get_len() + self.inner.as_source().get_len() } } impl FileSetLen for SingleFileDisk { fn set_len(&self, len: u64) -> io::Result<()> { - self.inner.set_len(len) + self.inner.as_source().set_len(len) } } impl FileAllocate for SingleFileDisk { fn allocate(&mut self, offset: u64, len: u64) -> io::Result<()> { - self.inner.allocate(offset, len) + self.inner.as_source_mut().allocate(offset, len) } } diff --git a/msg_socket/src/lib.rs b/msg_socket/src/lib.rs index b40a8740fc..0a9dc24edb 100644 --- a/msg_socket/src/lib.rs +++ b/msg_socket/src/lib.rs @@ -4,13 +4,11 @@ mod msg_on_socket; +use base::{handle_eintr, net::UnixSeqpacket, Error as SysError, ScmSocket, UnsyncMarker}; use std::io::{IoSlice, Result}; use std::marker::PhantomData; use std::os::unix::io::{AsRawFd, RawFd}; -use base::{handle_eintr, net::UnixSeqpacket, Error as SysError, ScmSocket, UnsyncMarker}; -use cros_async::PollOrRing; - pub use crate::msg_on_socket::*; pub use msg_on_socket_derive::*; @@ -213,7 +211,7 @@ impl<'a, I: MsgOnSocket, O: MsgOnSocket> AsyncReceiver<'a, I, O> { } pub async fn next(&mut self) -> MsgResult { - let p = PollOrRing::new(self.inner).unwrap(); + let p = cros_async::new(self.inner).unwrap(); p.wait_readable().await.unwrap(); self.inner.recv() }