diff --git a/base/Cargo.toml b/base/Cargo.toml index 8deacd0d6a..56d4c6d107 100644 --- a/base/Cargo.toml +++ b/base/Cargo.toml @@ -9,7 +9,6 @@ chromeos = ["sys_util/chromeos"] [dependencies] audio_streams = { path = "../common/audio_streams" } -cros_async = { path = "../cros_async" } data_model = { path = "../common/data_model" } libc = "*" remain = "0.2" diff --git a/base/src/lib.rs b/base/src/lib.rs index 856c761ed2..2fb2552bee 100644 --- a/base/src/lib.rs +++ b/base/src/lib.rs @@ -7,6 +7,7 @@ pub use sys_util::drop_capabilities; pub use sys_util::handle_eintr_errno; pub use sys_util::iov_max; pub use sys_util::kernel_has_memfd; +pub use sys_util::new_pipe_full; pub use sys_util::pipe; pub use sys_util::read_raw_stdin; pub use sys_util::syscall; @@ -52,7 +53,6 @@ pub use sys_util::{ }; pub use sys_util::{PollToken, WatchingEvents}; -mod async_types; mod event; mod ioctl; mod mmap; @@ -61,7 +61,6 @@ mod timer; mod tube; mod wait_context; -pub use async_types::*; pub use event::{Event, EventReadResult, ScopedEvent}; pub use ioctl::{ ioctl, ioctl_with_mut_ptr, ioctl_with_mut_ref, ioctl_with_ptr, ioctl_with_ref, ioctl_with_val, @@ -75,7 +74,7 @@ pub use sys_util::{ FileReadWriteVolatile, FileSetLen, FileSync, WriteZeroesAt, }; pub use timer::{FakeTimer, Timer}; -pub use tube::{AsyncTube, Error as TubeError, Result as TubeResult, Tube}; +pub use tube::{Error as TubeError, Result as TubeResult, Tube}; pub use wait_context::{EventToken, EventType, TriggeredEvent, WaitContext}; /// Wraps an AsRawDescriptor in the simple Descriptor struct, which diff --git a/base/src/tube.rs b/base/src/tube.rs index 784d53e533..56f006a185 100644 --- a/base/src/tube.rs +++ b/base/src/tube.rs @@ -4,13 +4,11 @@ use std::io::{self, IoSlice}; use std::marker::PhantomData; -use std::ops::Deref; use std::os::unix::prelude::{AsRawFd, RawFd}; use std::time::Duration; use crate::{FromRawDescriptor, SafeDescriptor, ScmSocket, UnixSeqpacket, UnsyncMarker}; -use cros_async::{Executor, IntoAsync, IoSourceExt}; use remain::sorted; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sys_util::{ @@ -23,8 +21,6 @@ use thiserror::Error as ThisError; pub enum Error { #[error("failed to clone UnixSeqpacket: {0}")] Clone(io::Error), - #[error("failed to create async tube: {0}")] - CreateAsync(cros_async::AsyncError), #[error("tube was disconnected")] Disconnected, #[error("failed to serialize/deserialize json from packet: {0}")] @@ -68,11 +64,6 @@ impl Tube { } } - pub fn into_async_tube(self, ex: &Executor) -> Result { - let inner = ex.async_from(self).map_err(Error::CreateAsync)?; - Ok(AsyncTube { inner }) - } - pub fn try_clone(&self) -> Result { self.socket.try_clone().map(Tube::new).map_err(Error::Clone) } @@ -147,33 +138,6 @@ impl AsRawFd for Tube { } } -impl IntoAsync for Tube {} - -pub struct AsyncTube { - inner: Box>, -} - -impl AsyncTube { - pub async fn next(&self) -> Result { - self.inner.wait_readable().await.unwrap(); - self.inner.as_source().recv() - } -} - -impl Deref for AsyncTube { - type Target = Tube; - - fn deref(&self) -> &Self::Target { - self.inner.as_source() - } -} - -impl From for Tube { - fn from(at: AsyncTube) -> Tube { - at.inner.into_source() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/cros_async/Cargo.toml b/cros_async/Cargo.toml index 7f3e924a7a..a2c6a47be0 100644 --- a/cros_async/Cargo.toml +++ b/cros_async/Cargo.toml @@ -17,10 +17,11 @@ pin-utils = "0.1.0-alpha.4" remain = "0.2" slab = "0.4" sync = { path = "../common/sync" } # provided by ebuild -sys_util = { path = "../common/sys_util" } # provided by ebuild +base = { path = "../base" } # provided by ebuild thiserror = "1.0.20" audio_streams = { path = "../common/audio_streams" } # provided by ebuild anyhow = "1.0" +serde = "*" [dependencies.futures] version = "*" diff --git a/base/src/async_types.rs b/cros_async/src/async_types.rs similarity index 50% rename from base/src/async_types.rs rename to cros_async/src/async_types.rs index 4fbe53e93e..de49f64119 100644 --- a/base/src/async_types.rs +++ b/cros_async/src/async_types.rs @@ -2,8 +2,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use crate::AsRawDescriptor; -use cros_async::IntoAsync; +use crate::{Executor, IntoAsync, IoSourceExt}; +use base::{AsRawDescriptor, Tube, TubeResult}; +use serde::de::DeserializeOwned; +use std::ops::Deref; use std::os::unix::io::{AsRawFd, RawFd}; /// Like `cros_async::IntoAsync`, except for use with crosvm's AsRawDescriptor @@ -23,3 +25,36 @@ where } } impl IntoAsync for DescriptorAdapter where T: DescriptorIntoAsync {} + +impl IntoAsync for Tube {} + +pub struct AsyncTube { + inner: Box>, +} + +impl AsyncTube { + pub fn new(ex: &Executor, tube: Tube) -> std::io::Result { + return Ok(AsyncTube { + inner: ex.async_from(tube)?, + }); + } + + pub async fn next(&self) -> TubeResult { + self.inner.wait_readable().await.unwrap(); + self.inner.as_source().recv() + } +} + +impl Deref for AsyncTube { + type Target = Tube; + + fn deref(&self) -> &Self::Target { + self.inner.as_source() + } +} + +impl From for Tube { + fn from(at: AsyncTube) -> Tube { + at.inner.into_source() + } +} diff --git a/cros_async/src/blocking/pool.rs b/cros_async/src/blocking/pool.rs index 4bf45020ee..26df32e974 100644 --- a/cros_async/src/blocking/pool.rs +++ b/cros_async/src/blocking/pool.rs @@ -16,9 +16,9 @@ use std::{ }; use async_task::{Runnable, Task}; +use base::{error, warn}; use slab::Slab; use sync::{Condvar, Mutex}; -use sys_util::{error, warn}; const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); diff --git a/cros_async/src/event.rs b/cros_async/src/event.rs index 318ad4ed0f..6f65d0cbcd 100644 --- a/cros_async/src/event.rs +++ b/cros_async/src/event.rs @@ -2,11 +2,11 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use sys_util::EventFd; +use base::EventFd; use super::{AsyncResult, Executor, IntoAsync, IoSourceExt}; -/// An async version of `sys_util::EventFd`. +/// An async version of `base::EventFd`. pub struct EventAsync { io_source: Box>, } diff --git a/cros_async/src/executor.rs b/cros_async/src/executor.rs index 7f6cfce930..e79790901b 100644 --- a/cros_async/src/executor.rs +++ b/cros_async/src/executor.rs @@ -86,7 +86,7 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>( /// # fn do_it() -> Result<(), Box> { /// let ex = Executor::new()?; /// -/// let (rx, tx) = sys_util::pipe(true)?; +/// let (rx, tx) = base::pipe(true)?; /// let zero = File::open("/dev/zero")?; /// let zero_bytes = CHUNK_SIZE * 7; /// let zero_to_pipe = transfer_data( diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/fd_executor.rs index 9439749f27..85ee796373 100644 --- a/cros_async/src/fd_executor.rs +++ b/cros_async/src/fd_executor.rs @@ -23,12 +23,12 @@ use std::{ }; use async_task::Task; +use base::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents}; use futures::task::noop_waker; use pin_utils::pin_mut; use remain::sorted; use slab::Slab; use sync::Mutex; -use sys_util::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents}; use thiserror::Error as ThisError; use super::{ @@ -42,28 +42,28 @@ use super::{ pub enum Error { /// Failed to clone the EventFd for waking the executor. #[error("Failed to clone the EventFd for waking the executor: {0}")] - CloneEventFd(sys_util::Error), + CloneEventFd(base::Error), /// Failed to create the EventFd for waking the executor. #[error("Failed to create the EventFd for waking the executor: {0}")] - CreateEventFd(sys_util::Error), + CreateEventFd(base::Error), /// Creating a context to wait on FDs failed. #[error("An error creating the fd waiting context: {0}")] - CreatingContext(sys_util::Error), + CreatingContext(base::Error), /// Failed to copy the FD for the polling context. #[error("Failed to copy the FD for the polling context: {0}")] - DuplicatingFd(sys_util::Error), + DuplicatingFd(base::Error), /// The Executor is gone. #[error("The FDExecutor is gone")] ExecutorGone, /// PollContext failure. #[error("PollContext failure: {0}")] - PollContextError(sys_util::Error), + PollContextError(base::Error), /// An error occurred when setting the FD non-blocking. #[error("An error occurred setting the FD non-blocking: {0}.")] - SettingNonBlocking(sys_util::Error), + SettingNonBlocking(base::Error), /// Failed to submit the waker to the polling context. #[error("An error adding to the Aio context: {0}")] - SubmittingWaker(sys_util::Error), + SubmittingWaker(base::Error), /// A Waker was canceled, but the operation isn't running. #[error("Unknown waker")] UnknownWaker, @@ -551,7 +551,7 @@ impl FdExecutor { unsafe fn dup_fd(fd: RawFd) -> Result { let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0); if ret < 0 { - Err(Error::DuplicatingFd(sys_util::Error::last())) + Err(Error::DuplicatingFd(base::Error::last())) } else { Ok(ret) } @@ -572,7 +572,7 @@ mod test { #[test] fn test_it() { async fn do_test(ex: &FdExecutor) { - let (r, _w) = sys_util::pipe(true).unwrap(); + let (r, _w) = base::pipe(true).unwrap(); let done = Box::pin(async { 5usize }); let source = ex.register_source(r).unwrap(); let pending = source.wait_readable().unwrap(); @@ -612,7 +612,7 @@ mod test { } } - let (mut rx, tx) = sys_util::pipe(true).expect("Pipe failed"); + let (mut rx, tx) = base::pipe(true).expect("Pipe failed"); let ex = FdExecutor::new().unwrap(); diff --git a/cros_async/src/io_ext.rs b/cros_async/src/io_ext.rs index 010b716e3f..6b3e6e827e 100644 --- a/cros_async/src/io_ext.rs +++ b/cros_async/src/io_ext.rs @@ -24,8 +24,8 @@ use std::{ }; use async_trait::async_trait; +use base::UnixSeqpacket; use remain::sorted; -use sys_util::net::UnixSeqpacket; use thiserror::Error as ThisError; use super::{BackingMemory, MemRegion}; @@ -116,7 +116,7 @@ pub trait IoSourceExt: ReadAsync + WriteAsync { } /// Marker trait signifying that the implementor is suitable for use with -/// cros_async. Examples of this include File, and sys_util::net::UnixSeqpacket. +/// cros_async. Examples of this include File, and base::net::UnixSeqpacket. /// /// (Note: it'd be really nice to implement a TryFrom for any implementors, and /// remove our factory functions. Unfortunately @@ -434,7 +434,7 @@ mod tests { if !use_uring() { return; } - use sys_util::EventFd; + use base::EventFd; async fn go(source: Box>) -> u64 { source.read_u64().await.unwrap() diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs index b353a2e035..e266f232b2 100644 --- a/cros_async/src/lib.rs +++ b/cros_async/src/lib.rs @@ -58,6 +58,7 @@ //! See the docs for `IoSourceExt` if support for kernels <5.4 is required. Focus on `UringSource` if //! all systems have support for io_uring. +mod async_types; pub mod audio_streams_async; mod blocking; mod complete; @@ -75,6 +76,8 @@ mod uring_executor; mod uring_source; mod waker; +pub use async_types::*; +pub use base; pub use blocking::{block_on, BlockingPool}; pub use event::EventAsync; pub use executor::Executor; @@ -86,7 +89,6 @@ pub use io_ext::{ pub use mem::{BackingMemory, MemRegion}; pub use poll_source::PollSource; pub use select::SelectResult; -pub use sys_util; pub use timer::TimerAsync; pub use uring_executor::URingExecutor; pub use uring_source::UringSource; @@ -113,7 +115,7 @@ pub enum Error { TimerAsync(AsyncError), /// Error from TimerFd. #[error("Failure in TimerFd: {0}")] - TimerFd(sys_util::Error), + TimerFd(base::Error), /// Error from the uring executor. #[error("Failure in the uring executor: {0}")] URingExecutor(uring_executor::Error), diff --git a/cros_async/src/poll_source.rs b/cros_async/src/poll_source.rs index 4a904ba790..26866b0bf5 100644 --- a/cros_async/src/poll_source.rs +++ b/cros_async/src/poll_source.rs @@ -36,19 +36,19 @@ pub enum Error { Executor(fd_executor::Error), /// An error occurred when executing fallocate synchronously. #[error("An error occurred when executing fallocate synchronously: {0}")] - Fallocate(sys_util::Error), + Fallocate(base::Error), /// An error occurred when executing fsync synchronously. #[error("An error occurred when executing fsync synchronously: {0}")] - Fsync(sys_util::Error), + Fsync(base::Error), /// An error occurred when reading the FD. #[error("An error occurred when reading the FD: {0}.")] - Read(sys_util::Error), + Read(base::Error), /// Can't seek file. #[error("An error occurred when seeking the FD: {0}.")] - Seeking(sys_util::Error), + Seeking(base::Error), /// An error occurred when writing the FD. #[error("An error occurred when writing the FD: {0}.")] - Write(sys_util::Error), + Write(base::Error), } pub type Result = std::result::Result; @@ -132,7 +132,7 @@ impl ReadAsync for PollSource { return Ok((res as usize, vec)); } - match sys_util::Error::last() { + match base::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; @@ -180,7 +180,7 @@ impl ReadAsync for PollSource { return Ok(res as usize); } - match sys_util::Error::last() { + match base::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; @@ -213,7 +213,7 @@ impl ReadAsync for PollSource { return Ok(u64::from_ne_bytes(buf)); } - match sys_util::Error::last() { + match base::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; @@ -257,7 +257,7 @@ impl WriteAsync for PollSource { return Ok((res as usize, vec)); } - match sys_util::Error::last() { + match base::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { let op = self.0.wait_writable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; @@ -306,7 +306,7 @@ impl WriteAsync for PollSource { return Ok(res as usize); } - match sys_util::Error::last() { + match base::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { let op = self.0.wait_writable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; @@ -329,7 +329,7 @@ impl WriteAsync for PollSource { if ret == 0 { Ok(()) } else { - Err(AsyncError::Poll(Error::Fallocate(sys_util::Error::last()))) + Err(AsyncError::Poll(Error::Fallocate(base::Error::last()))) } } @@ -339,7 +339,7 @@ impl WriteAsync for PollSource { if ret == 0 { Ok(()) } else { - Err(AsyncError::Poll(Error::Fsync(sys_util::Error::last()))) + Err(AsyncError::Poll(Error::Fsync(base::Error::last()))) } } } @@ -437,7 +437,7 @@ mod tests { let _ = source.wait_readable().await; } - let (rx, _tx) = sys_util::pipe(true).unwrap(); + let (rx, _tx) = base::pipe(true).unwrap(); let ex = FdExecutor::new().unwrap(); let source = PollSource::new(rx, &ex).unwrap(); ex.spawn_local(owns_poll_source(source)).detach(); diff --git a/cros_async/src/timer.rs b/cros_async/src/timer.rs index f9624e0e9c..52df7053ce 100644 --- a/cros_async/src/timer.rs +++ b/cros_async/src/timer.rs @@ -4,14 +4,14 @@ use std::time::Duration; -use sys_util::{Result as SysResult, TimerFd}; +use base::{Result as SysResult, TimerFd}; use super::{AsyncResult, Error, Executor, IntoAsync, IoSourceExt}; #[cfg(test)] use super::{FdExecutor, URingExecutor}; -/// An async version of sys_util::TimerFd. +/// An async version of base::TimerFd. pub struct TimerAsync { io_source: Box>, } diff --git a/cros_async/src/uring_executor.rs b/cros_async/src/uring_executor.rs index 63c0bb7453..743939b755 100644 --- a/cros_async/src/uring_executor.rs +++ b/cros_async/src/uring_executor.rs @@ -73,6 +73,7 @@ use std::{ }; use async_task::Task; +use base::{warn, WatchingEvents}; use futures::task::noop_waker; use io_uring::URingContext; use once_cell::sync::Lazy; @@ -80,7 +81,6 @@ use pin_utils::pin_mut; use remain::sorted; use slab::Slab; use sync::Mutex; -use sys_util::{warn, WatchingEvents}; use thiserror::Error as ThisError; use super::{ @@ -98,7 +98,7 @@ pub enum Error { CreatingContext(io_uring::Error), /// Failed to copy the FD for the polling context. #[error("Failed to copy the FD for the polling context: {0}")] - DuplicatingFd(sys_util::Error), + DuplicatingFd(base::Error), /// The Executor is gone. #[error("The URingExecutor is gone")] ExecutorGone, @@ -529,7 +529,7 @@ impl RawExecutor { fn submit_poll( &self, source: &RegisteredSource, - events: &sys_util::WatchingEvents, + events: &base::WatchingEvents, ) -> Result { let mut ring = self.ring.lock(); let src = ring @@ -836,7 +836,7 @@ impl URingExecutor { unsafe fn dup_fd(fd: RawFd) -> Result { let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0); if ret < 0 { - Err(Error::DuplicatingFd(sys_util::Error::last())) + Err(Error::DuplicatingFd(base::Error::last())) } else { Ok(ret) } @@ -944,7 +944,7 @@ mod tests { Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc; // Use pipes to create a future that will block forever. - let (rx, mut tx) = sys_util::pipe(true).unwrap(); + let (rx, mut tx) = base::pipe(true).unwrap(); // Set up the TLS for the uring_executor by creating one. let ex = URingExecutor::new().unwrap(); @@ -988,7 +988,7 @@ mod tests { Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc; // Use pipes to create a future that will block forever. - let (mut rx, tx) = sys_util::new_pipe_full().expect("Pipe failed"); + let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed"); // Set up the TLS for the uring_executor by creating one. let ex = URingExecutor::new().unwrap(); @@ -1014,7 +1014,7 @@ mod tests { // Finishing the operation should put the Arc count back to 1. // write to the pipe to wake the read pipe and then wait for the uring result in the // executor. - let mut buf = vec![0u8; sys_util::round_up_to_page_size(1)]; + let mut buf = vec![0u8; base::round_up_to_page_size(1)]; rx.read_exact(&mut buf).expect("read to empty failed"); ex.run_until(UringQueueEmpty { ex: &ex }) .expect("Failed to wait for write pipe ready"); @@ -1039,7 +1039,7 @@ mod tests { let bm = Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc; - let (rx, tx) = sys_util::pipe(true).expect("Pipe failed"); + let (rx, tx) = base::pipe(true).expect("Pipe failed"); let ex = URingExecutor::new().unwrap(); @@ -1079,7 +1079,7 @@ mod tests { } } - let (mut rx, mut tx) = sys_util::pipe(true).expect("Pipe failed"); + let (mut rx, mut tx) = base::pipe(true).expect("Pipe failed"); let ex = URingExecutor::new().unwrap(); @@ -1127,7 +1127,7 @@ mod tests { // Leave an uncompleted operation in the queue so that the drop impl will try to drive it to // completion. - let (_rx, tx) = sys_util::pipe(true).expect("Pipe failed"); + let (_rx, tx) = base::pipe(true).expect("Pipe failed"); let tx = ex.register_source(&tx).expect("Failed to register source"); let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec())); let op = tx diff --git a/cros_async/src/uring_source.rs b/cros_async/src/uring_source.rs index c5d44452c8..9d8525eae5 100644 --- a/cros_async/src/uring_source.rs +++ b/cros_async/src/uring_source.rs @@ -343,7 +343,7 @@ mod tests { return; } - use sys_util::EventFd; + use base::EventFd; async fn write_event(ev: EventFd, wait: EventFd, ex: &URingExecutor) { let wait = UringSource::new(wait, ex).unwrap(); @@ -389,7 +389,7 @@ mod tests { use futures::future::Either; async fn do_test(ex: &URingExecutor) { - let (read_source, mut w) = sys_util::pipe(true).unwrap(); + let (read_source, mut w) = base::pipe(true).unwrap(); let source = UringSource::new(read_source, ex).unwrap(); let done = Box::pin(async { 5usize }); let pending = Box::pin(read_u64(&source)); diff --git a/devices/src/virtio/balloon.rs b/devices/src/virtio/balloon.rs index a72cea0110..a4235fe453 100644 --- a/devices/src/virtio/balloon.rs +++ b/devices/src/virtio/balloon.rs @@ -12,8 +12,10 @@ use remain::sorted; use thiserror::Error as ThisError; use balloon_control::{BalloonStats, BalloonTubeCommand, BalloonTubeResult}; -use base::{self, error, warn, AsRawDescriptor, AsyncTube, Event, RawDescriptor, Tube}; -use cros_async::{block_on, select6, select7, sync::Mutex as AsyncMutex, EventAsync, Executor}; +use base::{self, error, warn, AsRawDescriptor, Event, RawDescriptor, Tube}; +use cros_async::{ + block_on, select6, select7, sync::Mutex as AsyncMutex, AsyncTube, EventAsync, Executor, +}; use data_model::{DataInit, Le16, Le32, Le64}; use vm_memory::{GuestAddress, GuestMemory}; @@ -458,7 +460,7 @@ fn run_worker( let interrupt = Rc::new(RefCell::new(interrupt)); let ex = Executor::new().unwrap(); - let command_tube = command_tube.into_async_tube(&ex).unwrap(); + let command_tube = AsyncTube::new(&ex, command_tube).unwrap(); // We need a block to release all references to command_tube at the end before returning it. { diff --git a/devices/src/virtio/block/asynchronous.rs b/devices/src/virtio/block/asynchronous.rs index fdaa69b114..58a89fe173 100644 --- a/devices/src/virtio/block/asynchronous.rs +++ b/devices/src/virtio/block/asynchronous.rs @@ -21,11 +21,11 @@ use thiserror::Error as ThisError; use base::Error as SysError; use base::Result as SysResult; use base::{ - error, info, iov_max, warn, AsRawDescriptor, AsyncTube, Event, RawDescriptor, Timer, Tube, - TubeError, + error, info, iov_max, warn, AsRawDescriptor, Event, RawDescriptor, Timer, Tube, TubeError, }; use cros_async::{ - select5, sync::Mutex as AsyncMutex, AsyncError, EventAsync, Executor, SelectResult, TimerAsync, + select5, sync::Mutex as AsyncMutex, AsyncError, AsyncTube, EventAsync, Executor, SelectResult, + TimerAsync, }; use data_model::DataInit; use disk::{AsyncDisk, ToAsyncDisk}; @@ -767,7 +767,7 @@ impl VirtioDevice for BlockAsync { let ex = Executor::new().expect("Failed to create an executor"); let async_control = control_tube - .map(|c| c.into_async_tube(&ex).expect("failed to create async tube")); + .map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube")); let async_image = match disk_image.to_async_disk(&ex) { Ok(d) => d, Err(e) => panic!("Failed to create async disk {}", e), diff --git a/devices/src/virtio/iommu.rs b/devices/src/virtio/iommu.rs index c73e37193f..c7a7ea2277 100644 --- a/devices/src/virtio/iommu.rs +++ b/devices/src/virtio/iommu.rs @@ -15,10 +15,10 @@ use std::{result, thread}; use acpi_tables::sdt::SDT; use anyhow::Context; use base::{ - error, pagesize, warn, AsRawDescriptor, AsyncTube, Error as SysError, Event, RawDescriptor, + error, pagesize, warn, AsRawDescriptor, Error as SysError, Event, RawDescriptor, Result as SysResult, Tube, TubeError, }; -use cros_async::{AsyncError, EventAsync, Executor}; +use cros_async::{AsyncError, AsyncTube, EventAsync, Executor}; use data_model::{DataInit, Le64}; use futures::{select, FutureExt}; use remain::sorted; @@ -618,17 +618,14 @@ impl Worker { let f_resample = async_utils::handle_irq_resample(&ex, interrupt.clone()); let f_kill = async_utils::await_and_exit(&ex, kill_evt); - let request_tube = translate_request_rx.map(|t| { - t.into_async_tube(&ex) - .expect("Failed to create async tube for rx") - }); + let request_tube = translate_request_rx + .map(|t| AsyncTube::new(&ex, t).expect("Failed to create async tube for rx")); let response_tubes = translate_response_senders.map(|m| { m.into_iter() .map(|x| { ( x.0, - x.1.into_async_tube(&ex) - .expect("Failed to create async tube"), + AsyncTube::new(&ex, x.1).expect("Failed to create async tube"), ) }) .collect() @@ -638,7 +635,7 @@ impl Worker { handle_translate_request(&endpoints, request_tube, response_tubes); let f_request = self.request_queue(req_queue, req_evt, interrupt_ref, &endpoints); - let command_tube = iommu_device_tube.into_async_tube(&ex).unwrap(); + let command_tube = AsyncTube::new(&ex, iommu_device_tube).unwrap(); // Future to handle command messages from host, such as passing vfio containers. let f_cmd = Self::handle_command_tube(&mem, command_tube, &endpoints, &hp_endpoints_ranges); diff --git a/devices/src/virtio/vhost/user/device/gpu.rs b/devices/src/virtio/vhost/user/device/gpu.rs index 08c9301c72..a10c38e4a2 100644 --- a/devices/src/virtio/vhost/user/device/gpu.rs +++ b/devices/src/virtio/vhost/user/device/gpu.rs @@ -11,7 +11,7 @@ use base::{ clone_descriptor, error, warn, Event, FromRawDescriptor, IntoRawDescriptor, SafeDescriptor, TimerFd, Tube, UnixSeqpacketListener, UnlinkUnixSeqpacketListener, }; -use cros_async::{AsyncWrapper, EventAsync, Executor, IoSourceExt, TimerAsync}; +use cros_async::{AsyncTube, AsyncWrapper, EventAsync, Executor, IoSourceExt, TimerAsync}; use futures::{ future::{select, Either}, pin_mut, @@ -194,9 +194,10 @@ impl VhostUserBackend for GpuBackend { } fn set_device_request_channel(&mut self, channel: File) -> anyhow::Result<()> { - let tube = unsafe { Tube::from_raw_descriptor(channel.into_raw_descriptor()) } - .into_async_tube(&self.ex) - .context("failed to create AsyncTube")?; + let tube = AsyncTube::new(&self.ex, unsafe { + Tube::from_raw_descriptor(channel.into_raw_descriptor()) + }) + .context("failed to create AsyncTube")?; // We need a PciAddress in order to initialize the pci bar but this isn't part of the // vhost-user protocol. Instead we expect this to be the first message the crosvm main