From 5767987e5f717710b1bbf5f3d21657ed4ad237c8 Mon Sep 17 00:00:00 2001 From: Noah Gold Date: Wed, 16 Sep 2020 18:21:58 -0700 Subject: [PATCH] Replace PollOrRing w/ async trait IoSourceExt. This CL makes the following fundamental changes to cros_async: 1. Removes PollOrRing and replaces it with IoSourceExt, and the subtraits ReadAsync & WriteAsync. The blanket implementation of IoSourceExt has been dropped, and replaced with source specific implementations of the trait. Those implementations are where the code from PollOrRing has been moved. 2. Pinning for IoSource has been dropped from UringSource & the uring futures. This appears to be safe because the IoSource doesn't contain any self refs, or perform any operations beyond forwarding to the RegisteredSource. (The FD is duped before being passed to RingWakerState by RegisteredSource, so there doesn't seem to be any data which would require pinning.) 3. U64Source was replaced by EventAsync. It also switches all Error enums to use thiserror, which reduces boilerplate. BUG=None TEST=cargo test -p cros_async Cq-Depend: chromium:2421742 Change-Id: Ie1dd958da2e1f8dec1ae1fd8c0b4e754223d330d Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2416996 Reviewed-by: Chirantan Ekbote Tested-by: kokoro Commit-Queue: Noah Gold --- Cargo.lock | 22 + cros_async/Cargo.toml | 2 + cros_async/src/event.rs | 47 ++ cros_async/src/io_ext.rs | 377 +++++++++++--- cros_async/src/io_source.rs | 22 +- cros_async/src/lib.rs | 32 +- cros_async/src/poll_or_ring.rs | 502 ------------------- cros_async/src/poll_source.rs | 319 ++++++------ cros_async/src/uring_futures/fallocate.rs | 14 +- cros_async/src/uring_futures/fsync.rs | 10 +- cros_async/src/uring_futures/poll_fd.rs | 10 +- cros_async/src/uring_futures/read_mem.rs | 9 +- cros_async/src/uring_futures/read_vec.rs | 10 +- cros_async/src/uring_futures/uring_source.rs | 139 ++++- cros_async/src/uring_futures/write_mem.rs | 15 +- cros_async/src/uring_futures/write_vec.rs | 10 +- devices/src/virtio/queue.rs | 6 +- disk/Cargo.toml | 2 +- disk/src/disk.rs | 12 +- msg_socket/src/lib.rs | 6 +- 20 files changed, 739 insertions(+), 827 deletions(-) create mode 100644 cros_async/src/event.rs delete mode 100644 cros_async/src/poll_or_ring.rs diff --git a/Cargo.lock b/Cargo.lock index 0563448215..8e1c4d5193 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,6 +124,7 @@ dependencies = [ name = "cros_async" version = "0.1.0" dependencies = [ + "async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", "base 0.1.0", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "io_uring 0.1.0", @@ -132,6 +133,7 @@ dependencies = [ "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "syscall_defines 0.1.0", + "thiserror 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -760,6 +762,24 @@ dependencies = [ "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "thiserror" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "thiserror-impl 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tpm2" version = "0.1.0" @@ -922,5 +942,7 @@ dependencies = [ "checksum remain 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "99c861227fc40c8da6fdaa3d58144ac84c0537080a43eb1d7d45c28f88dcb888" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)" = "af6f3550d8dff9ef7dc34d384ac6f107e5d31c8f57d9f28e0081503f547ac8f5" +"checksum thiserror 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)" = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08" +"checksum thiserror-impl 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)" = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" "checksum unicode-width 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "882386231c45df4700b275c7ff55b6f3698780a650026380e72dabe76fa46526" "checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" diff --git a/cros_async/Cargo.toml b/cros_async/Cargo.toml index 741fcfee7f..b502d24883 100644 --- a/cros_async/Cargo.toml +++ b/cros_async/Cargo.toml @@ -5,6 +5,7 @@ authors = ["The Chromium OS Authors"] edition = "2018" [dependencies] +async-trait = "0.1.36" io_uring = { path = "../io_uring" } libc = "*" paste = "*" @@ -12,6 +13,7 @@ pin-utils = "0.1.0-alpha.4" base = { path = "../base" } syscall_defines = { path = "../syscall_defines" } slab = "0.4" +thiserror = "1.0.20" [dependencies.futures] version = "*" diff --git a/cros_async/src/event.rs b/cros_async/src/event.rs new file mode 100644 index 0000000000..852b58eeaa --- /dev/null +++ b/cros_async/src/event.rs @@ -0,0 +1,47 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use crate::{new, AsyncResult, IoSourceExt}; +use std::os::unix::io::AsRawFd; + +/// An async version of sys_util::EventFd. +pub struct EventAsync { + io_source: Box + 'static>, +} + +impl EventAsync { + /// Creates a new EventAsync wrapper around the provided eventfd. + #[allow(dead_code)] + pub fn new(f: F) -> AsyncResult> { + Ok(EventAsync { io_source: new(f)? }) + } + + /// Gets the next value from the eventfd. + #[allow(dead_code)] + pub async fn next_val(&self) -> AsyncResult { + self.io_source.read_u64().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use base::Event; + use futures::pin_mut; + + #[test] + fn next_val_reads_value() { + async fn go(event: Event) -> u64 { + let event_async = EventAsync::new(event).unwrap(); + event_async.next_val().await.unwrap() + } + + let eventfd = Event::new().unwrap(); + eventfd.write(0xaa).unwrap(); + let fut = go(eventfd); + pin_mut!(fut); + let val = crate::run_executor(crate::RunOne::new(fut)).unwrap(); + assert_eq!(val, 0xaa); + } +} diff --git a/cros_async/src/io_ext.rs b/cros_async/src/io_ext.rs index 0a9e8c698b..8e15f98a85 100644 --- a/cros_async/src/io_ext.rs +++ b/cros_async/src/io_ext.rs @@ -4,11 +4,10 @@ //! # `IoSourceExt` //! -//! Extension functions to asynchronously access files. -//! `IoSourceExt` is the interface exposed to users for `IoSource` objects. Using `IoSource` -//! directly is inconvenient and requires dealing with state machines for the backing uring and -//! future libraries. `IoSourceExt` instead provides users with a future that can be `await`ed from -//! async context. +//! User functions to asynchronously access files. +//! Using `IoSource` directly is inconvenient and requires dealing with state +//! machines for the backing uring, future libraries, etc. `IoSourceExt` instead +//! provides users with a future that can be `await`ed from async context. //! //! Each member of `IoSourceExt` returns a future for the supported operation. One or more //! operation can be pending at a time. @@ -16,84 +15,332 @@ //! Operations can only access memory in a `Vec` or an implementor of `BackingMemory`. See the //! `URingExecutor` documentation for an explaination of why. -use std::pin::Pin; +use crate::poll_source::PollSource; +use crate::UringSource; +use async_trait::async_trait; +use std::os::unix::io::AsRawFd; use std::rc::Rc; +use thiserror::Error as ThisError; -use crate::io_source::IoSource; -use crate::uring_futures; use crate::uring_mem::{BackingMemory, MemRegion}; -/// Extends IoSource with ergonomic methods to perform asynchronous IO. -pub trait IoSourceExt: IoSource { - /// Reads from the iosource at `file_offset` and fill the given `vec`. - fn read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec) -> uring_futures::ReadVec<'a, Self> - where - Self: Unpin, - { - uring_futures::ReadVec::new(Pin::new(self), file_offset, vec) - } +#[derive(ThisError, Debug)] +pub enum Error { + /// An error with a polled(FD) source. + #[error("An error with a poll source: {0}")] + Poll(crate::poll_source::Error), + /// An error reading from a wrapped source. + #[error("An error from the source: {0}")] + ReadingInner(isize), + /// An error with a uring source. + #[error("An error with a uring source: {0}")] + Uring(crate::uring_executor::Error), +} +pub type Result = std::result::Result; +/// Ergonomic methods for async reads. +#[async_trait(?Send)] +pub trait ReadAsync { + /// Reads from the iosource at `file_offset` and fill the given `vec`. + async fn read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec) -> Result<(usize, Vec)>; + + /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. + async fn read_to_mem<'a>( + &'a self, + file_offset: u64, + mem: Rc, + mem_offsets: &'a [MemRegion], + ) -> Result; + + /// Wait for the FD of `self` to be readable. + async fn wait_readable(&self) -> Result<()>; + + /// Reads a single u64 from the current offset. + async fn read_u64(&self) -> Result; +} + +/// Ergonomic methods for async writes. +#[async_trait(?Send)] +pub trait WriteAsync { /// Writes from the given `vec` to the file starting at `file_offset`. - fn write_from_vec<'a>( + async fn write_from_vec<'a>( &'a self, file_offset: u64, vec: Vec, - ) -> uring_futures::WriteVec<'a, Self> - where - Self: Unpin, - { - uring_futures::WriteVec::new(Pin::new(self), file_offset, vec) - } - - /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. - fn read_to_mem<'a, 'b>( - &'a self, - file_offset: u64, - mem: Rc, - mem_offsets: &'b [MemRegion], - ) -> uring_futures::ReadMem<'a, 'b, Self> - where - Self: Unpin, - { - uring_futures::ReadMem::new(Pin::new(self), file_offset, mem, mem_offsets) - } + ) -> Result<(usize, Vec)>; /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. - fn write_from_mem<'a, 'b>( + async fn write_from_mem<'a>( &'a self, file_offset: u64, mem: Rc, - mem_offsets: &'b [MemRegion], - ) -> uring_futures::WriteMem<'a, 'b, Self> - where - Self: Unpin, - { - uring_futures::WriteMem::new(Pin::new(self), file_offset, mem, mem_offsets) - } + mem_offsets: &'a [MemRegion], + ) -> Result; - /// See `fallocate(2)` for details. - fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> uring_futures::Fallocate - where - Self: Unpin, - { - uring_futures::Fallocate::new(Pin::new(self), file_offset, len, mode) - } + /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. + async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>; /// Sync all completed write operations to the backing storage. - fn fsync(&self) -> uring_futures::Fsync - where - Self: Unpin, - { - uring_futures::Fsync::new(Pin::new(self)) - } + async fn fsync(&self) -> Result<()>; +} - /// Wait for the FD of `self` to be readable. - fn wait_readable(&self) -> uring_futures::PollFd<'_, Self> - where - Self: Unpin, - { - uring_futures::PollFd::new(Pin::new(self)) +/// Subtrait for general async IO. +#[async_trait(?Send)] +pub trait IoSourceExt: ReadAsync + WriteAsync { + /// Yields the underlying IO source. + fn into_source(self: Box) -> F; + + /// Provides a mutable ref to the underlying IO source. + fn as_source_mut(&mut self) -> &mut F; + + /// Provides a ref to the underlying IO source. + fn as_source(&self) -> &F; +} + +/// Creates a concrete `IoSourceExt` that uses uring if available or falls back to the fd_executor if not. +/// Note that on older kernels (pre 5.6) FDs such as event or timer FDs are unreliable when +/// having readvwritev performed through io_uring. To deal with EventFd or TimerFd, use +/// `IoSourceExt::read_u64`. +pub fn new<'a, F: AsRawFd + 'a>(f: F) -> Result + 'a>> { + if crate::uring_executor::use_uring() { + new_uring(f) + } else { + new_poll(f) } } -impl IoSourceExt for T {} +/// Creates a concrete `IoSourceExt` using Uring. +pub(crate) fn new_uring<'a, F: AsRawFd + 'a>(f: F) -> Result + 'a>> { + UringSource::new(f) + .map(|u| Box::new(u) as Box>) + .map_err(Error::Uring) +} + +/// Creates a concrete `IoSourceExt` using the fd_executor. +pub(crate) fn new_poll<'a, F: AsRawFd + 'a>(f: F) -> Result + 'a>> { + PollSource::new(f) + .map(|u| Box::new(u) as Box>) + .map_err(Error::Poll) +} + +#[cfg(test)] +mod tests { + use std::fs::{File, OpenOptions}; + + use futures::pin_mut; + + use super::*; + use crate::uring_mem::{MemRegion, VecIoWrapper}; + use crate::Executor; + use std::os::unix::io::AsRawFd; + use std::rc::Rc; + + #[test] + fn readvec() { + async fn go(async_source: Box>) { + let v = vec![0x55u8; 32]; + let v_ptr = v.as_ptr(); + let ret = async_source.read_to_vec(0, v).await.unwrap(); + assert_eq!(ret.0, 32); + let ret_v = ret.1; + assert_eq!(v_ptr, ret_v.as_ptr()); + assert!(ret_v.iter().all(|&b| b == 0)); + } + + let f = File::open("/dev/zero").unwrap(); + let uring_source = new_uring(f).unwrap(); + let fut = go(uring_source); + pin_mut!(fut); + crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + + let f = File::open("/dev/zero").unwrap(); + let poll_source = new_poll(f).unwrap(); + let fut = go(poll_source); + pin_mut!(fut); + crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + } + + #[test] + fn writevec() { + async fn go(async_source: Box>) { + let v = vec![0x55u8; 32]; + let v_ptr = v.as_ptr(); + let ret = async_source.write_from_vec(0, v).await.unwrap(); + assert_eq!(ret.0, 32); + let ret_v = ret.1; + assert_eq!(v_ptr, ret_v.as_ptr()); + } + + let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); + let uring_source = new_uring(f).unwrap(); + let fut = go(uring_source); + pin_mut!(fut); + crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + + let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); + let poll_source = new_poll(f).unwrap(); + let fut = go(poll_source); + pin_mut!(fut); + crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + } + + #[test] + fn readmem() { + async fn go(async_source: Box>) { + let mem = Rc::new(VecIoWrapper::from(vec![0x55u8; 8192])); + let ret = async_source + .read_to_mem( + 0, + Rc::::clone(&mem), + &[ + MemRegion { offset: 0, len: 32 }, + MemRegion { + offset: 200, + len: 56, + }, + ], + ) + .await + .unwrap(); + assert_eq!(ret, 32 + 56); + let vec: Vec = match Rc::try_unwrap(mem) { + Ok(v) => v.into(), + Err(_) => panic!("Too many vec refs"), + }; + assert!(vec.iter().take(32).all(|&b| b == 0)); + assert!(vec.iter().skip(32).take(168).all(|&b| b == 0x55)); + assert!(vec.iter().skip(200).take(56).all(|&b| b == 0)); + assert!(vec.iter().skip(256).all(|&b| b == 0x55)); + } + + let f = File::open("/dev/zero").unwrap(); + let uring_source = new_uring(f).unwrap(); + let fut = go(uring_source); + pin_mut!(fut); + crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + + let f = File::open("/dev/zero").unwrap(); + let poll_source = new_poll(f).unwrap(); + let fut = go(poll_source); + pin_mut!(fut); + crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + } + + #[test] + fn writemem() { + async fn go(async_source: Box>) { + let mem = Rc::new(VecIoWrapper::from(vec![0x55u8; 8192])); + let ret = async_source + .write_from_mem( + 0, + Rc::::clone(&mem), + &[MemRegion { offset: 0, len: 32 }], + ) + .await + .unwrap(); + assert_eq!(ret, 32); + } + + let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); + let uring_source = new_uring(f).unwrap(); + let fut = go(uring_source); + pin_mut!(fut); + crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + + let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); + let poll_source = new_poll(f).unwrap(); + let fut = go(poll_source); + pin_mut!(fut); + crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + } + + #[test] + fn read_u64s() { + async fn go(async_source: F) -> u64 { + let source = new(async_source).unwrap(); + source.read_u64().await.unwrap() + } + + let f = File::open("/dev/zero").unwrap(); + let fut = go(f); + pin_mut!(fut); + let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + assert_eq!(val, 0); + } + + #[test] + fn read_eventfds() { + use base::EventFd; + + async fn go(source: Box>) -> u64 { + source.read_u64().await.unwrap() + } + + let eventfd = EventFd::new().unwrap(); + eventfd.write(0x55).unwrap(); + let fut = go(new(eventfd).unwrap()); + pin_mut!(fut); + let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + assert_eq!(val, 0x55); + + let eventfd = EventFd::new().unwrap(); + eventfd.write(0xaa).unwrap(); + let fut = go(new_poll(eventfd).unwrap()); + pin_mut!(fut); + let val = crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() + .unwrap(); + assert_eq!(val, 0xaa); + } + + #[test] + fn fsync() { + async fn go(source: Box>) { + let v = vec![0x55u8; 32]; + let v_ptr = v.as_ptr(); + let ret = source.write_from_vec(0, v).await.unwrap(); + assert_eq!(ret.0, 32); + let ret_v = ret.1; + assert_eq!(v_ptr, ret_v.as_ptr()); + source.fsync().await.unwrap(); + } + + let f = tempfile::tempfile().unwrap(); + let source = new(f).unwrap(); + + let fut = go(source); + pin_mut!(fut); + crate::run_one(fut).unwrap(); + } +} diff --git a/cros_async/src/io_source.rs b/cros_async/src/io_source.rs index bd9ad2e38f..0cfd1e91be 100644 --- a/cros_async/src/io_source.rs +++ b/cros_async/src/io_source.rs @@ -2,7 +2,6 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; @@ -15,7 +14,7 @@ pub trait IoSource { /// Starts an operation to read from `self` at `file_offset` to `mem` at the addresses and /// lengths given in `mem_offsets`. fn read_to_mem( - self: Pin<&Self>, + &self, file_offset: u64, mem: Rc, mem_offsets: &[MemRegion], @@ -24,32 +23,23 @@ pub trait IoSource { /// Starts an operation to write to `self` at `file_offset` from `mem` at the addresses and /// lengths given in `mem_offsets`. fn write_from_mem( - self: Pin<&Self>, + &self, file_offset: u64, mem: Rc, mem_offsets: &[MemRegion], ) -> Result; /// Asynchronously `fallocate(2)` - fn fallocate( - self: Pin<&Self>, - file_offset: u64, - len: u64, - mode: u32, - ) -> Result; + fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result; /// Starts the fsync operation that will sync all completed writes to the backing storage. - fn fsync(self: Pin<&Self>) -> Result; + fn fsync(&self) -> Result; /// Waits for the inner source to be readable. This is similar to calling `poll` with the FD of /// `self`. However, this returns a `PendingOperation` which can be polled asynchronously for /// completion. - fn wait_readable(self: Pin<&Self>) -> Result; + fn wait_readable(&self) -> Result; /// Checks if a `PendingOperation` returned from another trait method is complete. - fn poll_complete( - self: Pin<&Self>, - cx: &mut Context, - token: &mut PendingOperation, - ) -> Poll>; + fn poll_complete(&self, cx: &mut Context, token: &mut PendingOperation) -> Poll>; } diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs index 399fd5f032..7438ec818c 100644 --- a/cros_async/src/lib.rs +++ b/cros_async/src/lib.rs @@ -50,20 +50,20 @@ //! long as kernels < 5.4 are supported. //! The other method submits operations to io_uring and is signaled when they complete. This is more //! efficient, but only supported on kernel 5.4+. -//! If `PollOrRing` is used to interface with async IO, then the correct backend will be chosen +//! If `IoSourceExt::new` is used to interface with async IO, then the correct backend will be chosen //! automatically. //! //! # Examples //! -//! See the docs for `PollOrRing` if support for kernels <5.4 is required. Focus on `UringSource` if +//! See the docs for `IoSourceExt` if support for kernels <5.4 is required. Focus on `UringSource` if //! all systems have support for io_uring. mod complete; +mod event; mod executor; mod fd_executor; mod io_ext; mod io_source; -mod poll_or_ring; mod poll_source; mod select; mod uring_executor; @@ -71,44 +71,36 @@ mod uring_futures; pub mod uring_mem; mod waker; +pub use event::EventAsync; pub use executor::Executor; -pub use io_ext::*; -pub use poll_or_ring::Error as AsyncError; -pub use poll_or_ring::{PollOrRing, U64Source}; +pub use io_ext::{ + new, Error as AsyncError, IoSourceExt, ReadAsync, Result as AsyncResult, WriteAsync, +}; +pub use poll_source::PollSource; pub use select::SelectResult; pub use uring_futures::UringSource; pub use uring_mem::{BackingMemory, MemRegion}; -use std::fmt::{self, Display}; use std::future::Future; use std::pin::Pin; use executor::{FutureList, RunOne}; use fd_executor::FdExecutor; +use thiserror::Error as ThisError; use uring_executor::URingExecutor; use waker::WakerToken; -#[derive(Debug)] +#[derive(ThisError, Debug)] pub enum Error { /// Error from the FD executor. + #[error("Failure in the FD executor: {0}")] FdExecutor(fd_executor::Error), /// Error from the uring executor. + #[error("Failure in the uring executor: {0}")] URingExecutor(uring_executor::Error), } pub type Result = std::result::Result; -impl Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - use self::Error::*; - - match self { - FdExecutor(e) => write!(f, "Failure in the FD executor: {}", e), - URingExecutor(e) => write!(f, "Failure in the uring executor: {}", e), - } - } -} -impl std::error::Error for Error {} - // Runs an executor with the given future list. // Chooses the uring executor if available, otherwise falls back to the FD executor. fn run_executor(future_list: T) -> Result { diff --git a/cros_async/src/poll_or_ring.rs b/cros_async/src/poll_or_ring.rs deleted file mode 100644 index 1ccd93b045..0000000000 --- a/cros_async/src/poll_or_ring.rs +++ /dev/null @@ -1,502 +0,0 @@ -// Copyright 2020 The Chromium OS Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -//! Temporarily to be used to hide the poll vs uring distinction from the rest of crosvm. - -use std::fmt::{self, Display}; -use std::os::unix::io::AsRawFd; -use std::rc::Rc; - -use crate::io_ext::IoSourceExt; -use crate::poll_source::PollSource; -use crate::uring_mem::{BackingMemory, MemRegion}; -use crate::UringSource; - -#[derive(Debug)] -/// Errors when executing the future. -pub enum Error { - /// An error with a polled(FD) source. - Poll(crate::poll_source::Error), - /// An error reading from a wrapped source. - ReadingInner(isize), - /// An error with a uring source. - Uring(crate::uring_executor::Error), -} -pub type Result = std::result::Result; - -impl Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - use self::Error::*; - - match self { - Poll(e) => write!(f, "An error with a poll source: {}.", e), - ReadingInner(e) => write!(f, "An error reading from the source: {}.", e), - Uring(e) => write!(f, "An error with a uring source: {}.", e), - } - } -} -impl std::error::Error for Error {} - -/// Either a polled or uring driven IO source. -/// Provided to allow for a common interface while older kernels without uring support are still -/// used. -/// Because async functions aren't allowed in traits, this wrapper is needed to abstract the two -/// implementations. -/// -/// # Example -/// ```rust -/// use std::fs::File; -/// use cros_async::{PollOrRing}; -/// -/// async fn read_four_bytes(source: &PollOrRing) -> (usize, Vec) { -/// let mem = vec![0u8; 4]; -/// source.read_to_vec(0, mem).await.unwrap() -/// } -/// -/// fn read_file(f: File) -> Result<(), Box> { -/// let async_source = PollOrRing::new(f)?; -/// let read_future = read_four_bytes(&async_source); -/// futures::pin_mut!(read_future); -/// let (nread, vec) = cros_async::run_one(read_future)?; -/// assert_eq!(nread, 4); -/// assert_eq!(vec.len(), 4); -/// Ok(()) -/// } -/// ``` -pub enum PollOrRing { - Poll(PollSource), - Uring(UringSource), -} - -impl PollOrRing { - /// Creates a `PollOrRing` that uses uring if available or falls back to the fd_executor if not. - /// Note that on older kernels (pre 5.6) FDs such as event or timer FDs are unreliable when - /// having readvwritev performed through io_uring. To deal with Event or Timer, use - /// `U64Source` instead. - pub fn new(f: F) -> Result { - if crate::uring_executor::use_uring() { - Self::new_uring(f) - } else { - Self::new_poll(f) - } - } - - /// Creates a `PollOrRing` that uses uring. - pub fn new_uring(f: F) -> Result { - UringSource::new(f) - .map_err(Error::Uring) - .map(PollOrRing::Uring) - } - - /// Creates a `PollOrRing` that uses polled FDs. - pub fn new_poll(f: F) -> Result { - PollSource::new(f) - .map_err(Error::Poll) - .map(PollOrRing::Poll) - } - - pub fn into_source(self) -> F { - match self { - PollOrRing::Poll(s) => s.into_source(), - PollOrRing::Uring(s) => s.into_source(), - } - } - - /// Reads from the iosource at `file_offset` and fill the given `vec`. - pub async fn read_to_vec(&self, file_offset: u64, vec: Vec) -> Result<(usize, Vec)> { - match &self { - PollOrRing::Poll(s) => s - .read_to_vec(file_offset, vec) - .await - .map_err(Error::Poll) - .map(|(n, vec)| (n as usize, vec)), - PollOrRing::Uring(s) => s - .read_to_vec(file_offset, vec) - .await - .map_err(Error::Uring) - .map(|(n, vec)| (n as usize, vec)), - } - } - - /// Writes from the given `vec` to the file starting at `file_offset`. - pub async fn write_from_vec(&self, file_offset: u64, vec: Vec) -> Result<(usize, Vec)> { - match &self { - PollOrRing::Poll(s) => s - .write_from_vec(file_offset, vec) - .await - .map_err(Error::Poll) - .map(|(n, vec)| (n as usize, vec)), - PollOrRing::Uring(s) => s - .write_from_vec(file_offset, vec) - .await - .map_err(Error::Uring) - .map(|(n, vec)| (n as usize, vec)), - } - } - - /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. - pub async fn read_to_mem<'a>( - &'a self, - file_offset: u64, - mem: Rc, - mem_offsets: &'a [MemRegion], - ) -> Result - where - Self: Unpin, - { - match &self { - PollOrRing::Poll(s) => s - .read_to_mem(file_offset, mem, mem_offsets) - .await - .map_err(Error::Poll) - .map(|n| n as usize), - PollOrRing::Uring(s) => s - .read_to_mem(file_offset, mem, mem_offsets) - .await - .map_err(Error::Uring) - .map(|n| n as usize), - } - } - - /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. - pub async fn write_from_mem<'a>( - &'a self, - file_offset: u64, - mem: Rc, - mem_offsets: &'a [MemRegion], - ) -> Result - where - Self: Unpin, - { - match &self { - PollOrRing::Poll(s) => s - .write_from_mem(file_offset, mem, mem_offsets) - .await - .map_err(Error::Poll) - .map(|n| n as usize), - PollOrRing::Uring(s) => s - .write_from_mem(file_offset, mem, mem_offsets) - .await - .map_err(Error::Uring) - .map(|n| n as usize), - } - } - - /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. - pub async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()> { - match &self { - PollOrRing::Poll(s) => s.fallocate(file_offset, len, mode).map_err(Error::Poll), - PollOrRing::Uring(s) => s - .fallocate(file_offset, len, mode) - .await - .map_err(Error::Uring), - } - } - - /// Sync all completed operations to the disk. Note this op is synchronous when using the Polled - /// backend. - pub async fn fsync(&self) -> Result<()> { - match &self { - PollOrRing::Poll(s) => s.fsync().map_err(Error::Poll), - PollOrRing::Uring(s) => s.fsync().await.map_err(Error::Uring), - } - } - - /// Wait for the soruce to be readable. - pub async fn wait_readable(&self) -> Result<()> { - match &self { - PollOrRing::Poll(s) => s.wait_readable().await.map_err(Error::Poll), - PollOrRing::Uring(s) => s.wait_readable().await.map_err(Error::Uring), - } - } -} - -impl std::ops::Deref for PollOrRing { - type Target = F; - fn deref(&self) -> &Self::Target { - match &self { - PollOrRing::Poll(s) => &s, - PollOrRing::Uring(s) => &s, - } - } -} - -impl std::ops::DerefMut for PollOrRing { - fn deref_mut(&mut self) -> &mut Self::Target { - match self { - PollOrRing::Poll(s) => s, - PollOrRing::Uring(s) => s, - } - } -} - -/// Convenience helper for reading a series of u64s which is common for timer and event. -pub struct U64Source { - inner: PollOrRing, -} - -impl U64Source { - pub fn new(source: F) -> Result { - Ok(U64Source { - inner: PollOrRing::new(source)?, - }) - } - - pub fn new_poll(source: F) -> Result { - Ok(U64Source { - inner: PollOrRing::new_poll(source)?, - }) - } - - pub async fn next_val(&mut self) -> Result { - match &mut self.inner { - PollOrRing::Poll(s) => s.read_u64().await.map_err(Error::Poll), - PollOrRing::Uring(s) => { - s.wait_readable().await.map_err(Error::Uring)?; - let mut bytes = 0u64.to_ne_bytes(); - // Safe to read to the buffer of known length. - let ret = - unsafe { libc::read(s.as_raw_fd(), bytes.as_mut_ptr() as *mut _, bytes.len()) }; - if ret < 0 { - return Err(Error::ReadingInner(ret)); - } - Ok(u64::from_ne_bytes(bytes)) - } - } - } -} - -impl std::ops::Deref for U64Source { - type Target = F; - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl std::ops::DerefMut for U64Source { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -#[cfg(test)] -mod tests { - use std::fs::{File, OpenOptions}; - - use futures::pin_mut; - - use crate::uring_mem::{MemRegion, VecIoWrapper}; - use crate::Executor; - - use super::*; - - #[test] - fn readvec() { - async fn go(async_source: PollOrRing) { - let v = vec![0x55u8; 32]; - let v_ptr = v.as_ptr(); - let ret = async_source.read_to_vec(0, v).await.unwrap(); - assert_eq!(ret.0, 32); - let ret_v = ret.1; - assert_eq!(v_ptr, ret_v.as_ptr()); - assert!(ret_v.iter().all(|&b| b == 0)); - } - - let f = File::open("/dev/zero").unwrap(); - let uring_source = PollOrRing::new_uring(f).unwrap(); - let fut = go(uring_source); - pin_mut!(fut); - crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - - let f = File::open("/dev/zero").unwrap(); - let poll_source = PollOrRing::new_poll(f).unwrap(); - let fut = go(poll_source); - pin_mut!(fut); - crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - } - - #[test] - fn writevec() { - async fn go(async_source: PollOrRing) { - let v = vec![0x55u8; 32]; - let v_ptr = v.as_ptr(); - let ret = async_source.write_from_vec(0, v).await.unwrap(); - assert_eq!(ret.0, 32); - let ret_v = ret.1; - assert_eq!(v_ptr, ret_v.as_ptr()); - } - - let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let uring_source = PollOrRing::new_uring(f).unwrap(); - let fut = go(uring_source); - pin_mut!(fut); - crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - - let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let poll_source = PollOrRing::new_poll(f).unwrap(); - let fut = go(poll_source); - pin_mut!(fut); - crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - } - - #[test] - fn readmem() { - async fn go(async_source: PollOrRing) { - let mem = Rc::new(VecIoWrapper::from(vec![0x55u8; 8192])); - let ret = async_source - .read_to_mem( - 0, - Rc::::clone(&mem), - &[ - MemRegion { offset: 0, len: 32 }, - MemRegion { - offset: 200, - len: 56, - }, - ], - ) - .await - .unwrap(); - assert_eq!(ret, 32 + 56); - let vec: Vec = match Rc::try_unwrap(mem) { - Ok(v) => v.into(), - Err(_) => panic!("Too many vec refs"), - }; - assert!(vec.iter().take(32).all(|&b| b == 0)); - assert!(vec.iter().skip(32).take(168).all(|&b| b == 0x55)); - assert!(vec.iter().skip(200).take(56).all(|&b| b == 0)); - assert!(vec.iter().skip(256).all(|&b| b == 0x55)); - } - - let f = File::open("/dev/zero").unwrap(); - let uring_source = PollOrRing::new_uring(f).unwrap(); - let fut = go(uring_source); - pin_mut!(fut); - crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - - let f = File::open("/dev/zero").unwrap(); - let poll_source = PollOrRing::new_poll(f).unwrap(); - let fut = go(poll_source); - pin_mut!(fut); - crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - } - - #[test] - fn writemem() { - async fn go(async_source: PollOrRing) { - let mem = Rc::new(VecIoWrapper::from(vec![0x55u8; 8192])); - let ret = async_source - .write_from_mem( - 0, - Rc::::clone(&mem), - &[MemRegion { offset: 0, len: 32 }], - ) - .await - .unwrap(); - assert_eq!(ret, 32); - } - - let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let uring_source = PollOrRing::new_uring(f).unwrap(); - let fut = go(uring_source); - pin_mut!(fut); - crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - - let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let poll_source = PollOrRing::new_poll(f).unwrap(); - let fut = go(poll_source); - pin_mut!(fut); - crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - } - - #[test] - fn read_u64s() { - async fn go(async_source: F) -> u64 { - let mut source = U64Source::new(async_source).unwrap(); - source.next_val().await.unwrap() - } - - let f = File::open("/dev/zero").unwrap(); - let fut = go(f); - pin_mut!(fut); - let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - assert_eq!(val, 0); - } - - #[test] - fn read_events() { - use base::Event; - - async fn go(mut source: U64Source) -> u64 { - source.next_val().await.unwrap() - } - - let event = Event::new().unwrap(); - event.write(0x55).unwrap(); - let fut = go(U64Source::new(event).unwrap()); - pin_mut!(fut); - let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - assert_eq!(val, 0x55); - - let event = Event::new().unwrap(); - event.write(0xaa).unwrap(); - let fut = go(U64Source::new_poll(event).unwrap()); - pin_mut!(fut); - let val = crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) - .unwrap() - .run() - .unwrap(); - assert_eq!(val, 0xaa); - } - - #[test] - fn fsync() { - async fn go(source: PollOrRing) { - let v = vec![0x55u8; 32]; - let v_ptr = v.as_ptr(); - let ret = source.write_from_vec(0, v).await.unwrap(); - assert_eq!(ret.0, 32); - let ret_v = ret.1; - assert_eq!(v_ptr, ret_v.as_ptr()); - source.fsync().await.unwrap(); - } - - let f = tempfile::tempfile().unwrap(); - let source = PollOrRing::new(f).unwrap(); - - let fut = go(source); - pin_mut!(fut); - crate::run_one(fut).unwrap(); - } -} diff --git a/cros_async/src/poll_source.rs b/cros_async/src/poll_source.rs index 0814da53ae..21b295dd9a 100644 --- a/cros_async/src/poll_source.rs +++ b/cros_async/src/poll_source.rs @@ -3,10 +3,10 @@ // found in the LICENSE file. //! A wrapped IO source that uses FdExecutor to drive asynchronous completion. Used from -//! `PollOrRing` when uring isn't available in the kernel. +//! `IoSourceExt::new` when uring isn't available in the kernel. +use async_trait::async_trait; use std::borrow::Borrow; -use std::fmt::{self, Display}; use std::future::Future; use std::ops::{Deref, DerefMut}; use std::os::unix::io::{AsRawFd, RawFd}; @@ -18,61 +18,41 @@ use libc::O_NONBLOCK; use crate::fd_executor::{self, add_read_waker, add_write_waker, PendingWaker}; use crate::uring_mem::{BackingMemory, BorrowedIoVec, MemRegion}; +use crate::AsyncError; +use crate::AsyncResult; +use crate::{IoSourceExt, ReadAsync, WriteAsync}; use base::{self, add_fd_flags}; +use thiserror::Error as ThisError; -#[derive(Debug)] +#[derive(ThisError, Debug)] pub enum Error { /// An error occurred attempting to register a waker with the executor. + #[error("An error occurred attempting to register a waker with the executor: {0}.")] AddingWaker(fd_executor::Error), /// An error occurred when executing fallocate synchronously. + #[error("An error occurred when executing fallocate synchronously: {0}")] Fallocate(base::Error), /// An error occurred when executing fsync synchronously. + #[error("An error occurred when executing fsync synchronously: {0}")] Fsync(base::Error), /// An error occurred when reading the FD. + /// + #[error("An error occurred when reading the FD: {0}.")] Read(base::Error), /// Can't seek file. + #[error("An error occurred when seeking the FD: {0}.")] Seeking(base::Error), /// An error occurred when setting the FD non-blocking. + #[error("An error occurred setting the FD non-blocking: {0}.")] SettingNonBlocking(base::Error), /// An error occurred when writing the FD. + #[error("An error occurred when writing the FD: {0}.")] Write(base::Error), } pub type Result = std::result::Result; -impl std::error::Error for Error {} - -impl Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - use self::Error::*; - - match self { - AddingWaker(e) => write!( - f, - "An error occurred attempting to register a waker with the executor: {}.", - e - ), - Fallocate(e) => write!( - f, - "An error occurred when executing fallocate synchronously: {}", - e - ), - Fsync(e) => write!( - f, - "An error occurred when executing fsync synchronously: {}", - e - ), - Read(e) => write!(f, "An error occurred when reading the FD: {}.", e), - Seeking(e) => write!(f, "An error occurred when seeking the FD: {}.", e), - SettingNonBlocking(e) => { - write!(f, "An error occurred setting the FD non-blocking: {}.", e) - } - Write(e) => write!(f, "An error occurred when writing the FD: {}.", e), - } - } -} - /// Async wrapper for an IO source that uses the FD executor to drive async operations. -/// Used by `PollOrRing` when uring isn't available. +/// Used by `IoSourceExt::new` when uring isn't available. pub struct PollSource { source: F, } @@ -85,32 +65,6 @@ impl PollSource { Ok(Self { source: f }) } - /// read from the iosource at `file_offset` and fill the given `vec`. - pub fn read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec) -> PollReadVec<'a, F> - where - Self: Unpin, - { - PollReadVec { - reader: self, - file_offset, - vec: Some(vec), - pending_waker: None, - } - } - - /// write from the given `vec` to the file starting at `file_offset`. - pub fn write_from_vec<'a>(&'a self, file_offset: u64, vec: Vec) -> PollWriteVec<'a, F> - where - Self: Unpin, - { - PollWriteVec { - writer: self, - file_offset, - vec: Some(vec), - pending_waker: None, - } - } - /// Read a u64 from the current offset. Avoid seeking Fds that don't support `pread`. pub fn read_u64(&self) -> PollReadU64<'_, F> { PollReadU64 { @@ -119,82 +73,6 @@ impl PollSource { } } - /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. - pub fn read_to_mem<'a>( - &'a self, - file_offset: u64, - mem: Rc, - mem_offsets: &'a [MemRegion], - ) -> PollReadMem<'a, F> - where - Self: Unpin, - { - PollReadMem { - reader: self, - file_offset, - mem, - mem_offsets, - pending_waker: None, - } - } - - /// Runs fallocate _synchronously_. There isn't an async equivalent for starting an fallocate. - pub fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()> { - let ret = unsafe { - libc::fallocate64( - self.source.as_raw_fd(), - mode as libc::c_int, - file_offset as libc::off64_t, - len as libc::off64_t, - ) - }; - if ret == 0 { - Ok(()) - } else { - Err(Error::Fallocate(base::Error::last())) - } - } - - /// Runs fsync _synchronously_. There isn't an async equivalent for starting an fsync. - pub fn fsync(&self) -> Result<()> { - let ret = unsafe { libc::fsync(self.source.as_raw_fd()) }; - if ret == 0 { - Ok(()) - } else { - Err(Error::Fsync(base::Error::last())) - } - } - - /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. - pub fn write_from_mem<'a>( - &'a self, - file_offset: u64, - mem: Rc, - mem_offsets: &'a [MemRegion], - ) -> PollWriteMem<'a, F> - where - Self: Unpin, - { - PollWriteMem { - writer: self, - file_offset, - mem, - mem_offsets, - pending_waker: None, - } - } - - /// Waits for the soruce to be readable. - pub fn wait_readable(&self) -> PollWaitReadable - where - Self: Unpin, - { - PollWaitReadable { - pollee: self, - pending_waker: None, - } - } - /// Return the inner source. pub fn into_source(self) -> F { self.source @@ -209,6 +87,138 @@ impl Deref for PollSource { } } +#[async_trait(?Send)] +impl ReadAsync for PollSource { + /// Reads from the iosource at `file_offset` and fill the given `vec`. + async fn read_to_vec<'a>( + &'a self, + file_offset: u64, + vec: Vec, + ) -> AsyncResult<(usize, Vec)> { + let fut = PollReadVec { + reader: self, + file_offset, + vec: Some(vec), + pending_waker: None, + }; + fut.await + .map(|(n, vec)| (n as usize, vec)) + .map_err(AsyncError::Poll) + } + + /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. + async fn read_to_mem<'a>( + &'a self, + file_offset: u64, + mem: Rc, + mem_offsets: &'a [MemRegion], + ) -> AsyncResult { + let fut = PollReadMem { + reader: self, + file_offset, + mem, + mem_offsets, + pending_waker: None, + }; + fut.await.map(|n| n as usize).map_err(AsyncError::Poll) + } + + /// Wait for the FD of `self` to be readable. + async fn wait_readable(&self) -> AsyncResult<()> { + let fut = PollWaitReadable { + pollee: self, + pending_waker: None, + }; + fut.await.map_err(AsyncError::Poll) + } + + async fn read_u64(&self) -> AsyncResult { + PollSource::read_u64(self).await.map_err(AsyncError::Poll) + } +} + +#[async_trait(?Send)] +impl WriteAsync for PollSource { + /// Writes from the given `vec` to the file starting at `file_offset`. + async fn write_from_vec<'a>( + &'a self, + file_offset: u64, + vec: Vec, + ) -> AsyncResult<(usize, Vec)> { + let fut = PollWriteVec { + writer: self, + file_offset, + vec: Some(vec), + pending_waker: None, + }; + fut.await + .map(|(n, vec)| (n as usize, vec)) + .map_err(AsyncError::Poll) + } + + /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. + async fn write_from_mem<'a>( + &'a self, + file_offset: u64, + mem: Rc, + mem_offsets: &'a [MemRegion], + ) -> AsyncResult { + let fut = PollWriteMem { + writer: self, + file_offset, + mem, + mem_offsets, + pending_waker: None, + }; + fut.await.map(|n| n as usize).map_err(AsyncError::Poll) + } + + /// See `fallocate(2)` for details. + async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> { + let ret = unsafe { + libc::fallocate64( + self.source.as_raw_fd(), + mode as libc::c_int, + file_offset as libc::off64_t, + len as libc::off64_t, + ) + }; + if ret == 0 { + Ok(()) + } else { + Err(AsyncError::Poll(Error::Fallocate(base::Error::last()))) + } + } + + /// Sync all completed write operations to the backing storage. + async fn fsync(&self) -> AsyncResult<()> { + let ret = unsafe { libc::fsync(self.source.as_raw_fd()) }; + if ret == 0 { + Ok(()) + } else { + Err(AsyncError::Poll(Error::Fsync(base::Error::last()))) + } + } +} + +#[async_trait(?Send)] +impl IoSourceExt for PollSource { + /// Yields the underlying IO source. + fn into_source(self: Box) -> F { + self.source + } + + /// Provides a mutable ref to the underlying IO source. + fn as_source_mut(&mut self) -> &mut F { + &mut self.source + } + + /// Provides a ref to the underlying IO source. + fn as_source(&self) -> &F { + &self.source + } +} + impl DerefMut for PollSource { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.source @@ -621,19 +631,28 @@ mod tests { #[test] fn fallocate() { - let dir = tempfile::TempDir::new().unwrap(); - let mut file_path = PathBuf::from(dir.path()); - file_path.push("test"); + async fn go() { + let dir = tempfile::TempDir::new().unwrap(); + let mut file_path = PathBuf::from(dir.path()); + file_path.push("test"); - let f = OpenOptions::new() - .create(true) - .write(true) - .open(&file_path) + let f = OpenOptions::new() + .create(true) + .write(true) + .open(&file_path) + .unwrap(); + let source = PollSource::new(f).unwrap(); + source.fallocate(0, 4096, 0).await.unwrap(); + + let meta_data = std::fs::metadata(&file_path).unwrap(); + assert_eq!(meta_data.len(), 4096); + } + + let fut = go(); + pin_mut!(fut); + crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) + .unwrap() + .run() .unwrap(); - let source = PollSource::new(f).unwrap(); - source.fallocate(0, 4096, 0).unwrap(); - - let meta_data = std::fs::metadata(&file_path).unwrap(); - assert_eq!(meta_data.len(), 4096); } } diff --git a/cros_async/src/uring_futures/fallocate.rs b/cros_async/src/uring_futures/fallocate.rs index 4dfdd9009c..1cc60d3f1a 100644 --- a/cros_async/src/uring_futures/fallocate.rs +++ b/cros_async/src/uring_futures/fallocate.rs @@ -15,12 +15,12 @@ use super::uring_fut::UringFutState; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Fallocate<'a, R: IoSource + ?Sized> { - reader: Pin<&'a R>, + reader: &'a R, state: UringFutState<(u64, u64, u32), ()>, } impl<'a, R: IoSource + ?Sized> Fallocate<'a, R> { - pub(crate) fn new(reader: Pin<&'a R>, file_offset: u64, len: u64, mode: u32) -> Self { + pub(crate) fn new(reader: &'a R, file_offset: u64, len: u64, mode: u32) -> Self { Fallocate { reader, state: UringFutState::new((file_offset, len, mode)), @@ -34,10 +34,8 @@ impl Future for Fallocate<'_, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let state = std::mem::replace(&mut self.state, UringFutState::Processing); let (new_state, ret) = match state.advance( - |(file_offset, len, mode)| { - Ok((self.reader.as_ref().fallocate(file_offset, len, mode)?, ())) - }, - |op| self.reader.as_ref().poll_complete(cx, op), + |(file_offset, len, mode)| Ok((self.reader.fallocate(file_offset, len, mode)?, ())), + |op| self.reader.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -62,7 +60,7 @@ mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::WriteAsync; use crate::UringSource; #[test] @@ -80,7 +78,7 @@ mod tests { let source = UringSource::new(f).unwrap(); if let Err(e) = source.fallocate(0, 4096, 0).await { match e { - crate::uring_executor::Error::Io(io_err) => { + crate::io_ext::Error::Uring(crate::uring_executor::Error::Io(io_err)) => { if io_err.kind() == std::io::ErrorKind::InvalidInput { // Skip the test on kernels before fallocate support. return; diff --git a/cros_async/src/uring_futures/fsync.rs b/cros_async/src/uring_futures/fsync.rs index 16bf0e957c..caeed57325 100644 --- a/cros_async/src/uring_futures/fsync.rs +++ b/cros_async/src/uring_futures/fsync.rs @@ -15,12 +15,12 @@ use super::uring_fut::UringFutState; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Fsync<'a, R: IoSource + ?Sized> { - reader: Pin<&'a R>, + reader: &'a R, state: UringFutState<(), ()>, } impl<'a, R: IoSource + ?Sized> Fsync<'a, R> { - pub(crate) fn new(reader: Pin<&'a R>) -> Self { + pub(crate) fn new(reader: &'a R) -> Self { Fsync { reader, state: UringFutState::new(()), @@ -34,8 +34,8 @@ impl Future for Fsync<'_, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let state = std::mem::replace(&mut self.state, UringFutState::Processing); let (new_state, ret) = match state.advance( - |()| Ok((self.reader.as_ref().fsync()?, ())), - |op| self.reader.as_ref().poll_complete(cx, op), + |()| Ok((self.reader.fsync()?, ())), + |op| self.reader.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -57,7 +57,7 @@ impl Future for Fsync<'_, R> { mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::WriteAsync; use crate::UringSource; #[test] diff --git a/cros_async/src/uring_futures/poll_fd.rs b/cros_async/src/uring_futures/poll_fd.rs index 5c0973a776..6e88701b35 100644 --- a/cros_async/src/uring_futures/poll_fd.rs +++ b/cros_async/src/uring_futures/poll_fd.rs @@ -15,12 +15,12 @@ use super::uring_fut::UringFutState; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct PollFd<'a, R: IoSource + ?Sized> { - reader: Pin<&'a R>, + reader: &'a R, state: UringFutState<(), ()>, } impl<'a, R: IoSource + ?Sized> PollFd<'a, R> { - pub(crate) fn new(reader: Pin<&'a R>) -> Self { + pub(crate) fn new(reader: &'a R) -> Self { PollFd { reader, state: UringFutState::new(()), @@ -34,8 +34,8 @@ impl Future for PollFd<'_, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let state = std::mem::replace(&mut self.state, UringFutState::Processing); let (new_state, ret) = match state.advance( - |()| Ok((self.reader.as_ref().wait_readable()?, ())), - |op| self.reader.as_ref().poll_complete(cx, op), + |()| Ok((self.reader.wait_readable()?, ())), + |op| self.reader.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -59,7 +59,7 @@ mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::ReadAsync; use crate::UringSource; #[test] diff --git a/cros_async/src/uring_futures/read_mem.rs b/cros_async/src/uring_futures/read_mem.rs index ada9feeb0d..c3b1188dfa 100644 --- a/cros_async/src/uring_futures/read_mem.rs +++ b/cros_async/src/uring_futures/read_mem.rs @@ -16,13 +16,13 @@ use super::uring_fut::UringFutState; /// Future for the `read_to_mem` function. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ReadMem<'a, 'b, R: IoSource + ?Sized> { - reader: Pin<&'a R>, + reader: &'a R, state: UringFutState<(u64, Rc, &'b [MemRegion]), Rc>, } impl<'a, 'b, R: IoSource + ?Sized> ReadMem<'a, 'b, R> { pub(crate) fn new( - reader: Pin<&'a R>, + reader: &'a R, file_offset: u64, mem: Rc, mem_offsets: &'b [MemRegion], @@ -50,12 +50,11 @@ impl Future for ReadMem<'_, '_, R> { |(file_offset, mem, mem_offsets)| { Ok(( self.reader - .as_ref() .read_to_mem(file_offset, Rc::clone(&mem), mem_offsets)?, mem, )) }, - |op| self.reader.as_ref().poll_complete(cx, op), + |op| self.reader.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -77,7 +76,7 @@ mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::ReadAsync; use crate::uring_mem::{MemRegion, VecIoWrapper}; use crate::UringSource; diff --git a/cros_async/src/uring_futures/read_vec.rs b/cros_async/src/uring_futures/read_vec.rs index 7f1aff1db0..f80472b21d 100644 --- a/cros_async/src/uring_futures/read_vec.rs +++ b/cros_async/src/uring_futures/read_vec.rs @@ -16,12 +16,12 @@ use super::uring_fut::UringFutState; /// Future for the `read_to_vec` function. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ReadVec<'a, R: IoSource + ?Sized> { - reader: Pin<&'a R>, + reader: &'a R, state: UringFutState<(u64, Rc), Rc>, } impl<'a, R: IoSource + ?Sized> ReadVec<'a, R> { - pub(crate) fn new(reader: Pin<&'a R>, file_offset: u64, vec: Vec) -> Self { + pub(crate) fn new(reader: &'a R, file_offset: u64, vec: Vec) -> Self { ReadVec { reader, state: UringFutState::new((file_offset, Rc::new(VecIoWrapper::from(vec)))), @@ -37,7 +37,7 @@ impl Future for ReadVec<'_, R> { let (new_state, ret) = match state.advance( |(file_offset, wrapped_vec)| { Ok(( - self.reader.as_ref().read_to_mem( + self.reader.read_to_mem( file_offset, Rc::::clone(&wrapped_vec), &[MemRegion { @@ -48,7 +48,7 @@ impl Future for ReadVec<'_, R> { wrapped_vec, )) }, - |op| self.reader.as_ref().poll_complete(cx, op), + |op| self.reader.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -81,7 +81,7 @@ mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::ReadAsync; use crate::UringSource; #[test] diff --git a/cros_async/src/uring_futures/uring_source.rs b/cros_async/src/uring_futures/uring_source.rs index 893ddb3a76..be447b998e 100644 --- a/cros_async/src/uring_futures/uring_source.rs +++ b/cros_async/src/uring_futures/uring_source.rs @@ -4,13 +4,16 @@ use std::ops::{Deref, DerefMut}; use std::os::unix::io::AsRawFd; -use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use crate::io_source::IoSource; use crate::uring_executor::{self, PendingOperation, RegisteredSource, Result}; +use crate::uring_futures; use crate::uring_mem::{BackingMemory, MemRegion}; +use crate::AsyncError; +use crate::AsyncResult; +use async_trait::async_trait; /// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around /// registering an IO source with the uring that provides an `IoSource` implementation. @@ -19,9 +22,9 @@ use crate::uring_mem::{BackingMemory, MemRegion}; /// # Example /// ```rust /// use std::fs::File; -/// use cros_async::{UringSource, IoSourceExt}; +/// use cros_async::{UringSource, ReadAsync}; /// -/// async fn read_four_bytes(source: &UringSource) -> (u32, Vec) { +/// async fn read_four_bytes(source: &UringSource) -> (usize, Vec) { /// let mem = vec![0u8; 4]; /// source.read_to_vec(0, mem).await.unwrap() /// } @@ -59,7 +62,7 @@ impl UringSource { impl IoSource for UringSource { fn read_to_mem( - self: Pin<&Self>, + &self, file_offset: u64, mem: Rc, mem_offsets: &[MemRegion], @@ -69,7 +72,7 @@ impl IoSource for UringSource { } fn write_from_mem( - self: Pin<&Self>, + &self, file_offset: u64, mem: Rc, mem_offsets: &[MemRegion], @@ -78,34 +81,133 @@ impl IoSource for UringSource { .start_write_from_mem(file_offset, mem, mem_offsets) } - fn fallocate( - self: Pin<&Self>, - file_offset: u64, - len: u64, - mode: u32, - ) -> Result { + fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result { self.registered_source .start_fallocate(file_offset, len, mode) } - fn fsync(self: Pin<&Self>) -> Result { + fn fsync(&self) -> Result { self.registered_source.start_fsync() } // wait for the inner source to be readable and return a refernce to it. - fn wait_readable(self: Pin<&Self>) -> Result { + fn wait_readable(&self) -> Result { self.registered_source.poll_fd_readable() } - fn poll_complete( - self: Pin<&Self>, - cx: &mut Context, - token: &mut PendingOperation, - ) -> Poll> { + fn poll_complete(&self, cx: &mut Context, token: &mut PendingOperation) -> Poll> { self.registered_source.poll_complete(cx, token) } } +#[async_trait(?Send)] +impl crate::ReadAsync for UringSource { + /// Reads from the iosource at `file_offset` and fill the given `vec`. + async fn read_to_vec<'a>( + &'a self, + file_offset: u64, + vec: Vec, + ) -> AsyncResult<(usize, Vec)> { + uring_futures::ReadVec::new(self, file_offset, vec) + .await + .map(|(n, vec)| (n as usize, vec)) + .map_err(AsyncError::Uring) + } + + /// Wait for the FD of `self` to be readable. + async fn wait_readable(&self) -> AsyncResult<()> { + uring_futures::PollFd::new(self) + .await + .map_err(AsyncError::Uring) + } + + /// Reads a single u64 from the current offset. + async fn read_u64(&self) -> AsyncResult { + crate::IoSourceExt::wait_readable(self).await?; + let mut bytes = 0u64.to_ne_bytes(); + // Safe to read to the buffer of known length. + let ret = + unsafe { libc::read(self.as_raw_fd(), bytes.as_mut_ptr() as *mut _, bytes.len()) }; + if ret < 0 { + return Err(AsyncError::ReadingInner(ret)); + } + Ok(u64::from_ne_bytes(bytes)) + } + + /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. + async fn read_to_mem<'a>( + &'a self, + file_offset: u64, + mem: Rc, + mem_offsets: &'a [MemRegion], + ) -> AsyncResult { + uring_futures::ReadMem::new(self, file_offset, mem, mem_offsets) + .await + .map(|n| n as usize) + .map_err(AsyncError::Uring) + } +} + +#[async_trait(?Send)] +impl crate::WriteAsync for UringSource { + /// Writes from the given `vec` to the file starting at `file_offset`. + async fn write_from_vec<'a>( + &'a self, + file_offset: u64, + vec: Vec, + ) -> AsyncResult<(usize, Vec)> { + uring_futures::WriteVec::new(self, file_offset, vec) + .await + .map(|(n, vec)| (n as usize, vec)) + .map_err(AsyncError::Uring) + } + + /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. + async fn write_from_mem<'a>( + &'a self, + file_offset: u64, + mem: Rc, + mem_offsets: &'a [MemRegion], + ) -> AsyncResult { + uring_futures::WriteMem::new(self, file_offset, mem, mem_offsets) + .await + .map(|n| n as usize) + .map_err(AsyncError::Uring) + } + + /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. + async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> { + uring_futures::Fallocate::new(self, file_offset, len, mode) + .await + .map_err(AsyncError::Uring) + } + + /// Sync all completed write operations to the backing storage. + async fn fsync(&self) -> AsyncResult<()> { + uring_futures::Fsync::new(self) + .await + .map_err(AsyncError::Uring) + } +} + +#[async_trait(?Send)] +impl crate::IoSourceExt for UringSource { + /// Yields the underlying IO source. + fn into_source(self: Box) -> F { + self.source + } + + /// Provides a mutable ref to the underlying IO source. + fn as_source(&self) -> &F { + &self.source + } + + /// Provides a ref to the underlying IO source. + fn as_source_mut(&mut self) -> &mut F { + &mut self.source + } +} + impl Deref for UringSource { type Target = F; @@ -124,6 +226,7 @@ impl DerefMut for UringSource { mod tests { use futures::pin_mut; use std::future::Future; + use std::pin::Pin; use super::*; diff --git a/cros_async/src/uring_futures/write_mem.rs b/cros_async/src/uring_futures/write_mem.rs index 20633bd631..f5eebbe26d 100644 --- a/cros_async/src/uring_futures/write_mem.rs +++ b/cros_async/src/uring_futures/write_mem.rs @@ -16,13 +16,13 @@ use super::uring_fut::UringFutState; /// Future for the `write_from_mem` function. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct WriteMem<'a, 'b, W: IoSource + ?Sized> { - writer: Pin<&'a W>, + writer: &'a W, state: UringFutState<(u64, Rc, &'b [MemRegion]), Rc>, } impl<'a, 'b, W: IoSource + ?Sized> WriteMem<'a, 'b, W> { pub(crate) fn new( - writer: Pin<&'a W>, + writer: &'a W, file_offset: u64, mem: Rc, mem_offsets: &'b [MemRegion], @@ -49,15 +49,12 @@ impl Future for WriteMem<'_, '_, W> { let (new_state, ret) = match state.advance( |(file_offset, mem, mem_offsets)| { Ok(( - self.writer.as_ref().write_from_mem( - file_offset, - Rc::clone(&mem), - mem_offsets, - )?, + self.writer + .write_from_mem(file_offset, Rc::clone(&mem), mem_offsets)?, mem, )) }, - |op| self.writer.as_ref().poll_complete(cx, op), + |op| self.writer.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -79,7 +76,7 @@ mod tests { use futures::pin_mut; - use crate::io_ext::IoSourceExt; + use crate::io_ext::WriteAsync; use crate::uring_mem::MemRegion; use crate::UringSource; diff --git a/cros_async/src/uring_futures/write_vec.rs b/cros_async/src/uring_futures/write_vec.rs index 8d84de1a37..564facfc86 100644 --- a/cros_async/src/uring_futures/write_vec.rs +++ b/cros_async/src/uring_futures/write_vec.rs @@ -17,12 +17,12 @@ use super::uring_fut::UringFutState; /// Future for the `write_to_vec` function. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct WriteVec<'a, W: IoSource + ?Sized> { - writer: Pin<&'a W>, + writer: &'a W, state: UringFutState<(u64, Rc), Rc>, } impl<'a, W: IoSource + ?Sized> WriteVec<'a, W> { - pub(crate) fn new(writer: Pin<&'a W>, file_offset: u64, vec: Vec) -> Self { + pub(crate) fn new(writer: &'a W, file_offset: u64, vec: Vec) -> Self { WriteVec { writer, state: UringFutState::new((file_offset, Rc::new(VecIoWrapper::from(vec)))), @@ -38,7 +38,7 @@ impl Future for WriteVec<'_, W> { let (new_state, ret) = match state.advance( |(file_offset, wrapped_vec)| { Ok(( - self.writer.as_ref().write_from_mem( + self.writer.write_from_mem( file_offset, Rc::::clone(&wrapped_vec), &[MemRegion { @@ -49,7 +49,7 @@ impl Future for WriteVec<'_, W> { wrapped_vec, )) }, - |op| self.writer.as_ref().poll_complete(cx, op), + |op| self.writer.poll_complete(cx, op), ) { Ok(d) => d, Err(e) => return Poll::Ready(Err(e)), @@ -80,7 +80,7 @@ mod tests { use futures::pin_mut; use std::fs::OpenOptions; - use crate::io_ext::IoSourceExt; + use crate::io_ext::WriteAsync; use crate::UringSource; #[test] diff --git a/devices/src/virtio/queue.rs b/devices/src/virtio/queue.rs index eabcf9a902..60a7fc6193 100644 --- a/devices/src/virtio/queue.rs +++ b/devices/src/virtio/queue.rs @@ -10,7 +10,7 @@ use std::rc::Rc; use std::sync::atomic::{fence, Ordering}; use base::error; -use cros_async::{AsyncError, U64Source}; +use cros_async::{AsyncError, EventAsync}; use virtio_sys::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use vm_memory::{GuestAddress, GuestMemory}; @@ -389,14 +389,14 @@ impl Queue { pub async fn next_async( &mut self, mem: &GuestMemory, - event: &mut U64Source, + eventfd: &mut EventAsync, ) -> std::result::Result { loop { // Check if there are more descriptors available. if let Some(chain) = self.pop(mem) { return Ok(chain); } - event.next_val().await?; + eventfd.next_val().await?; } } diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 468eef58ed..7182dafc6f 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -11,7 +11,7 @@ path = "src/disk.rs" composite-disk = ["protos", "protobuf"] [dependencies] -async-trait = "*" +async-trait = "0.1.36" base = { path = "../base" } libc = "*" protobuf = { version = "2.3", optional = true } diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 0b0b4d5799..70d58b280d 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -372,17 +372,17 @@ pub trait AsyncDisk: DiskGetLen + FileSetLen + FileAllocate { async fn write_zeroes_at(&self, file_offset: u64, length: u64) -> Result<()>; } -use cros_async::PollOrRing; +use cros_async::IoSourceExt; /// A disk backed by a single file that implements `AsyncDisk` for access. pub struct SingleFileDisk { - inner: PollOrRing, + inner: Box>, } impl TryFrom for SingleFileDisk { type Error = Error; fn try_from(inner: File) -> Result { - PollOrRing::new(inner) + cros_async::new(inner) .map_err(Error::CreateSingleFileDisk) .map(|inner| SingleFileDisk { inner }) } @@ -390,19 +390,19 @@ impl TryFrom for SingleFileDisk { impl DiskGetLen for SingleFileDisk { fn get_len(&self) -> io::Result { - self.inner.get_len() + self.inner.as_source().get_len() } } impl FileSetLen for SingleFileDisk { fn set_len(&self, len: u64) -> io::Result<()> { - self.inner.set_len(len) + self.inner.as_source().set_len(len) } } impl FileAllocate for SingleFileDisk { fn allocate(&mut self, offset: u64, len: u64) -> io::Result<()> { - self.inner.allocate(offset, len) + self.inner.as_source_mut().allocate(offset, len) } } diff --git a/msg_socket/src/lib.rs b/msg_socket/src/lib.rs index b40a8740fc..0a9dc24edb 100644 --- a/msg_socket/src/lib.rs +++ b/msg_socket/src/lib.rs @@ -4,13 +4,11 @@ mod msg_on_socket; +use base::{handle_eintr, net::UnixSeqpacket, Error as SysError, ScmSocket, UnsyncMarker}; use std::io::{IoSlice, Result}; use std::marker::PhantomData; use std::os::unix::io::{AsRawFd, RawFd}; -use base::{handle_eintr, net::UnixSeqpacket, Error as SysError, ScmSocket, UnsyncMarker}; -use cros_async::PollOrRing; - pub use crate::msg_on_socket::*; pub use msg_on_socket_derive::*; @@ -213,7 +211,7 @@ impl<'a, I: MsgOnSocket, O: MsgOnSocket> AsyncReceiver<'a, I, O> { } pub async fn next(&mut self) -> MsgResult { - let p = PollOrRing::new(self.inner).unwrap(); + let p = cros_async::new(self.inner).unwrap(); p.wait_readable().await.unwrap(); self.inner.recv() }