cros_async: Depend on base, not sys_util

base was previously providing some async types which now would
cause a circular dependency. Those have been moved into cros_async.

BUG=b:22320646
TEST=presubmit

Change-Id: I1f526ccfc5882f3a64404f714b13ac92ebfddcd6
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3533614
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
This commit is contained in:
Dennis Kempin 2022-03-17 13:11:41 -07:00
parent 55c6a3b5cd
commit 4193d87a97
19 changed files with 110 additions and 110 deletions

View file

@ -9,7 +9,6 @@ chromeos = ["sys_util/chromeos"]
[dependencies]
audio_streams = { path = "../common/audio_streams" }
cros_async = { path = "../cros_async" }
data_model = { path = "../common/data_model" }
libc = "*"
remain = "0.2"

View file

@ -7,6 +7,7 @@ pub use sys_util::drop_capabilities;
pub use sys_util::handle_eintr_errno;
pub use sys_util::iov_max;
pub use sys_util::kernel_has_memfd;
pub use sys_util::new_pipe_full;
pub use sys_util::pipe;
pub use sys_util::read_raw_stdin;
pub use sys_util::syscall;
@ -52,7 +53,6 @@ pub use sys_util::{
};
pub use sys_util::{PollToken, WatchingEvents};
mod async_types;
mod event;
mod ioctl;
mod mmap;
@ -61,7 +61,6 @@ mod timer;
mod tube;
mod wait_context;
pub use async_types::*;
pub use event::{Event, EventReadResult, ScopedEvent};
pub use ioctl::{
ioctl, ioctl_with_mut_ptr, ioctl_with_mut_ref, ioctl_with_ptr, ioctl_with_ref, ioctl_with_val,
@ -75,7 +74,7 @@ pub use sys_util::{
FileReadWriteVolatile, FileSetLen, FileSync, WriteZeroesAt,
};
pub use timer::{FakeTimer, Timer};
pub use tube::{AsyncTube, Error as TubeError, Result as TubeResult, Tube};
pub use tube::{Error as TubeError, Result as TubeResult, Tube};
pub use wait_context::{EventToken, EventType, TriggeredEvent, WaitContext};
/// Wraps an AsRawDescriptor in the simple Descriptor struct, which

View file

@ -4,13 +4,11 @@
use std::io::{self, IoSlice};
use std::marker::PhantomData;
use std::ops::Deref;
use std::os::unix::prelude::{AsRawFd, RawFd};
use std::time::Duration;
use crate::{FromRawDescriptor, SafeDescriptor, ScmSocket, UnixSeqpacket, UnsyncMarker};
use cros_async::{Executor, IntoAsync, IoSourceExt};
use remain::sorted;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sys_util::{
@ -23,8 +21,6 @@ use thiserror::Error as ThisError;
pub enum Error {
#[error("failed to clone UnixSeqpacket: {0}")]
Clone(io::Error),
#[error("failed to create async tube: {0}")]
CreateAsync(cros_async::AsyncError),
#[error("tube was disconnected")]
Disconnected,
#[error("failed to serialize/deserialize json from packet: {0}")]
@ -68,11 +64,6 @@ impl Tube {
}
}
pub fn into_async_tube(self, ex: &Executor) -> Result<AsyncTube> {
let inner = ex.async_from(self).map_err(Error::CreateAsync)?;
Ok(AsyncTube { inner })
}
pub fn try_clone(&self) -> Result<Self> {
self.socket.try_clone().map(Tube::new).map_err(Error::Clone)
}
@ -147,33 +138,6 @@ impl AsRawFd for Tube {
}
}
impl IntoAsync for Tube {}
pub struct AsyncTube {
inner: Box<dyn IoSourceExt<Tube>>,
}
impl AsyncTube {
pub async fn next<T: DeserializeOwned>(&self) -> Result<T> {
self.inner.wait_readable().await.unwrap();
self.inner.as_source().recv()
}
}
impl Deref for AsyncTube {
type Target = Tube;
fn deref(&self) -> &Self::Target {
self.inner.as_source()
}
}
impl From<AsyncTube> for Tube {
fn from(at: AsyncTube) -> Tube {
at.inner.into_source()
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -17,10 +17,11 @@ pin-utils = "0.1.0-alpha.4"
remain = "0.2"
slab = "0.4"
sync = { path = "../common/sync" } # provided by ebuild
sys_util = { path = "../common/sys_util" } # provided by ebuild
base = { path = "../base" } # provided by ebuild
thiserror = "1.0.20"
audio_streams = { path = "../common/audio_streams" } # provided by ebuild
anyhow = "1.0"
serde = "*"
[dependencies.futures]
version = "*"

View file

@ -2,8 +2,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::AsRawDescriptor;
use cros_async::IntoAsync;
use crate::{Executor, IntoAsync, IoSourceExt};
use base::{AsRawDescriptor, Tube, TubeResult};
use serde::de::DeserializeOwned;
use std::ops::Deref;
use std::os::unix::io::{AsRawFd, RawFd};
/// Like `cros_async::IntoAsync`, except for use with crosvm's AsRawDescriptor
@ -23,3 +25,36 @@ where
}
}
impl<T> IntoAsync for DescriptorAdapter<T> where T: DescriptorIntoAsync {}
impl IntoAsync for Tube {}
pub struct AsyncTube {
inner: Box<dyn IoSourceExt<Tube>>,
}
impl AsyncTube {
pub fn new(ex: &Executor, tube: Tube) -> std::io::Result<AsyncTube> {
return Ok(AsyncTube {
inner: ex.async_from(tube)?,
});
}
pub async fn next<T: DeserializeOwned>(&self) -> TubeResult<T> {
self.inner.wait_readable().await.unwrap();
self.inner.as_source().recv()
}
}
impl Deref for AsyncTube {
type Target = Tube;
fn deref(&self) -> &Self::Target {
self.inner.as_source()
}
}
impl From<AsyncTube> for Tube {
fn from(at: AsyncTube) -> Tube {
at.inner.into_source()
}
}

View file

@ -16,9 +16,9 @@ use std::{
};
use async_task::{Runnable, Task};
use base::{error, warn};
use slab::Slab;
use sync::{Condvar, Mutex};
use sys_util::{error, warn};
const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);

View file

@ -2,11 +2,11 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use sys_util::EventFd;
use base::EventFd;
use super::{AsyncResult, Executor, IntoAsync, IoSourceExt};
/// An async version of `sys_util::EventFd`.
/// An async version of `base::EventFd`.
pub struct EventAsync {
io_source: Box<dyn IoSourceExt<EventFd>>,
}

View file

@ -86,7 +86,7 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>(
/// # fn do_it() -> Result<(), Box<dyn Error>> {
/// let ex = Executor::new()?;
///
/// let (rx, tx) = sys_util::pipe(true)?;
/// let (rx, tx) = base::pipe(true)?;
/// let zero = File::open("/dev/zero")?;
/// let zero_bytes = CHUNK_SIZE * 7;
/// let zero_to_pipe = transfer_data(

View file

@ -23,12 +23,12 @@ use std::{
};
use async_task::Task;
use base::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents};
use futures::task::noop_waker;
use pin_utils::pin_mut;
use remain::sorted;
use slab::Slab;
use sync::Mutex;
use sys_util::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents};
use thiserror::Error as ThisError;
use super::{
@ -42,28 +42,28 @@ use super::{
pub enum Error {
/// Failed to clone the EventFd for waking the executor.
#[error("Failed to clone the EventFd for waking the executor: {0}")]
CloneEventFd(sys_util::Error),
CloneEventFd(base::Error),
/// Failed to create the EventFd for waking the executor.
#[error("Failed to create the EventFd for waking the executor: {0}")]
CreateEventFd(sys_util::Error),
CreateEventFd(base::Error),
/// Creating a context to wait on FDs failed.
#[error("An error creating the fd waiting context: {0}")]
CreatingContext(sys_util::Error),
CreatingContext(base::Error),
/// Failed to copy the FD for the polling context.
#[error("Failed to copy the FD for the polling context: {0}")]
DuplicatingFd(sys_util::Error),
DuplicatingFd(base::Error),
/// The Executor is gone.
#[error("The FDExecutor is gone")]
ExecutorGone,
/// PollContext failure.
#[error("PollContext failure: {0}")]
PollContextError(sys_util::Error),
PollContextError(base::Error),
/// An error occurred when setting the FD non-blocking.
#[error("An error occurred setting the FD non-blocking: {0}.")]
SettingNonBlocking(sys_util::Error),
SettingNonBlocking(base::Error),
/// Failed to submit the waker to the polling context.
#[error("An error adding to the Aio context: {0}")]
SubmittingWaker(sys_util::Error),
SubmittingWaker(base::Error),
/// A Waker was canceled, but the operation isn't running.
#[error("Unknown waker")]
UnknownWaker,
@ -551,7 +551,7 @@ impl FdExecutor {
unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
if ret < 0 {
Err(Error::DuplicatingFd(sys_util::Error::last()))
Err(Error::DuplicatingFd(base::Error::last()))
} else {
Ok(ret)
}
@ -572,7 +572,7 @@ mod test {
#[test]
fn test_it() {
async fn do_test(ex: &FdExecutor) {
let (r, _w) = sys_util::pipe(true).unwrap();
let (r, _w) = base::pipe(true).unwrap();
let done = Box::pin(async { 5usize });
let source = ex.register_source(r).unwrap();
let pending = source.wait_readable().unwrap();
@ -612,7 +612,7 @@ mod test {
}
}
let (mut rx, tx) = sys_util::pipe(true).expect("Pipe failed");
let (mut rx, tx) = base::pipe(true).expect("Pipe failed");
let ex = FdExecutor::new().unwrap();

View file

@ -24,8 +24,8 @@ use std::{
};
use async_trait::async_trait;
use base::UnixSeqpacket;
use remain::sorted;
use sys_util::net::UnixSeqpacket;
use thiserror::Error as ThisError;
use super::{BackingMemory, MemRegion};
@ -116,7 +116,7 @@ pub trait IoSourceExt<F>: ReadAsync + WriteAsync {
}
/// Marker trait signifying that the implementor is suitable for use with
/// cros_async. Examples of this include File, and sys_util::net::UnixSeqpacket.
/// cros_async. Examples of this include File, and base::net::UnixSeqpacket.
///
/// (Note: it'd be really nice to implement a TryFrom for any implementors, and
/// remove our factory functions. Unfortunately
@ -434,7 +434,7 @@ mod tests {
if !use_uring() {
return;
}
use sys_util::EventFd;
use base::EventFd;
async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) -> u64 {
source.read_u64().await.unwrap()

View file

@ -58,6 +58,7 @@
//! See the docs for `IoSourceExt` if support for kernels <5.4 is required. Focus on `UringSource` if
//! all systems have support for io_uring.
mod async_types;
pub mod audio_streams_async;
mod blocking;
mod complete;
@ -75,6 +76,8 @@ mod uring_executor;
mod uring_source;
mod waker;
pub use async_types::*;
pub use base;
pub use blocking::{block_on, BlockingPool};
pub use event::EventAsync;
pub use executor::Executor;
@ -86,7 +89,6 @@ pub use io_ext::{
pub use mem::{BackingMemory, MemRegion};
pub use poll_source::PollSource;
pub use select::SelectResult;
pub use sys_util;
pub use timer::TimerAsync;
pub use uring_executor::URingExecutor;
pub use uring_source::UringSource;
@ -113,7 +115,7 @@ pub enum Error {
TimerAsync(AsyncError),
/// Error from TimerFd.
#[error("Failure in TimerFd: {0}")]
TimerFd(sys_util::Error),
TimerFd(base::Error),
/// Error from the uring executor.
#[error("Failure in the uring executor: {0}")]
URingExecutor(uring_executor::Error),

View file

@ -36,19 +36,19 @@ pub enum Error {
Executor(fd_executor::Error),
/// An error occurred when executing fallocate synchronously.
#[error("An error occurred when executing fallocate synchronously: {0}")]
Fallocate(sys_util::Error),
Fallocate(base::Error),
/// An error occurred when executing fsync synchronously.
#[error("An error occurred when executing fsync synchronously: {0}")]
Fsync(sys_util::Error),
Fsync(base::Error),
/// An error occurred when reading the FD.
#[error("An error occurred when reading the FD: {0}.")]
Read(sys_util::Error),
Read(base::Error),
/// Can't seek file.
#[error("An error occurred when seeking the FD: {0}.")]
Seeking(sys_util::Error),
Seeking(base::Error),
/// An error occurred when writing the FD.
#[error("An error occurred when writing the FD: {0}.")]
Write(sys_util::Error),
Write(base::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
@ -132,7 +132,7 @@ impl<F: AsRawFd> ReadAsync for PollSource<F> {
return Ok((res as usize, vec));
}
match sys_util::Error::last() {
match base::Error::last() {
e if e.errno() == libc::EWOULDBLOCK => {
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
@ -180,7 +180,7 @@ impl<F: AsRawFd> ReadAsync for PollSource<F> {
return Ok(res as usize);
}
match sys_util::Error::last() {
match base::Error::last() {
e if e.errno() == libc::EWOULDBLOCK => {
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
@ -213,7 +213,7 @@ impl<F: AsRawFd> ReadAsync for PollSource<F> {
return Ok(u64::from_ne_bytes(buf));
}
match sys_util::Error::last() {
match base::Error::last() {
e if e.errno() == libc::EWOULDBLOCK => {
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
@ -257,7 +257,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
return Ok((res as usize, vec));
}
match sys_util::Error::last() {
match base::Error::last() {
e if e.errno() == libc::EWOULDBLOCK => {
let op = self.0.wait_writable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
@ -306,7 +306,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
return Ok(res as usize);
}
match sys_util::Error::last() {
match base::Error::last() {
e if e.errno() == libc::EWOULDBLOCK => {
let op = self.0.wait_writable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
@ -329,7 +329,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
if ret == 0 {
Ok(())
} else {
Err(AsyncError::Poll(Error::Fallocate(sys_util::Error::last())))
Err(AsyncError::Poll(Error::Fallocate(base::Error::last())))
}
}
@ -339,7 +339,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
if ret == 0 {
Ok(())
} else {
Err(AsyncError::Poll(Error::Fsync(sys_util::Error::last())))
Err(AsyncError::Poll(Error::Fsync(base::Error::last())))
}
}
}
@ -437,7 +437,7 @@ mod tests {
let _ = source.wait_readable().await;
}
let (rx, _tx) = sys_util::pipe(true).unwrap();
let (rx, _tx) = base::pipe(true).unwrap();
let ex = FdExecutor::new().unwrap();
let source = PollSource::new(rx, &ex).unwrap();
ex.spawn_local(owns_poll_source(source)).detach();

View file

@ -4,14 +4,14 @@
use std::time::Duration;
use sys_util::{Result as SysResult, TimerFd};
use base::{Result as SysResult, TimerFd};
use super::{AsyncResult, Error, Executor, IntoAsync, IoSourceExt};
#[cfg(test)]
use super::{FdExecutor, URingExecutor};
/// An async version of sys_util::TimerFd.
/// An async version of base::TimerFd.
pub struct TimerAsync {
io_source: Box<dyn IoSourceExt<TimerFd>>,
}

View file

@ -73,6 +73,7 @@ use std::{
};
use async_task::Task;
use base::{warn, WatchingEvents};
use futures::task::noop_waker;
use io_uring::URingContext;
use once_cell::sync::Lazy;
@ -80,7 +81,6 @@ use pin_utils::pin_mut;
use remain::sorted;
use slab::Slab;
use sync::Mutex;
use sys_util::{warn, WatchingEvents};
use thiserror::Error as ThisError;
use super::{
@ -98,7 +98,7 @@ pub enum Error {
CreatingContext(io_uring::Error),
/// Failed to copy the FD for the polling context.
#[error("Failed to copy the FD for the polling context: {0}")]
DuplicatingFd(sys_util::Error),
DuplicatingFd(base::Error),
/// The Executor is gone.
#[error("The URingExecutor is gone")]
ExecutorGone,
@ -529,7 +529,7 @@ impl RawExecutor {
fn submit_poll(
&self,
source: &RegisteredSource,
events: &sys_util::WatchingEvents,
events: &base::WatchingEvents,
) -> Result<WakerToken> {
let mut ring = self.ring.lock();
let src = ring
@ -836,7 +836,7 @@ impl URingExecutor {
unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
if ret < 0 {
Err(Error::DuplicatingFd(sys_util::Error::last()))
Err(Error::DuplicatingFd(base::Error::last()))
} else {
Ok(ret)
}
@ -944,7 +944,7 @@ mod tests {
Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
// Use pipes to create a future that will block forever.
let (rx, mut tx) = sys_util::pipe(true).unwrap();
let (rx, mut tx) = base::pipe(true).unwrap();
// Set up the TLS for the uring_executor by creating one.
let ex = URingExecutor::new().unwrap();
@ -988,7 +988,7 @@ mod tests {
Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
// Use pipes to create a future that will block forever.
let (mut rx, tx) = sys_util::new_pipe_full().expect("Pipe failed");
let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed");
// Set up the TLS for the uring_executor by creating one.
let ex = URingExecutor::new().unwrap();
@ -1014,7 +1014,7 @@ mod tests {
// Finishing the operation should put the Arc count back to 1.
// write to the pipe to wake the read pipe and then wait for the uring result in the
// executor.
let mut buf = vec![0u8; sys_util::round_up_to_page_size(1)];
let mut buf = vec![0u8; base::round_up_to_page_size(1)];
rx.read_exact(&mut buf).expect("read to empty failed");
ex.run_until(UringQueueEmpty { ex: &ex })
.expect("Failed to wait for write pipe ready");
@ -1039,7 +1039,7 @@ mod tests {
let bm =
Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
let (rx, tx) = sys_util::pipe(true).expect("Pipe failed");
let (rx, tx) = base::pipe(true).expect("Pipe failed");
let ex = URingExecutor::new().unwrap();
@ -1079,7 +1079,7 @@ mod tests {
}
}
let (mut rx, mut tx) = sys_util::pipe(true).expect("Pipe failed");
let (mut rx, mut tx) = base::pipe(true).expect("Pipe failed");
let ex = URingExecutor::new().unwrap();
@ -1127,7 +1127,7 @@ mod tests {
// Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
// completion.
let (_rx, tx) = sys_util::pipe(true).expect("Pipe failed");
let (_rx, tx) = base::pipe(true).expect("Pipe failed");
let tx = ex.register_source(&tx).expect("Failed to register source");
let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
let op = tx

View file

@ -343,7 +343,7 @@ mod tests {
return;
}
use sys_util::EventFd;
use base::EventFd;
async fn write_event(ev: EventFd, wait: EventFd, ex: &URingExecutor) {
let wait = UringSource::new(wait, ex).unwrap();
@ -389,7 +389,7 @@ mod tests {
use futures::future::Either;
async fn do_test(ex: &URingExecutor) {
let (read_source, mut w) = sys_util::pipe(true).unwrap();
let (read_source, mut w) = base::pipe(true).unwrap();
let source = UringSource::new(read_source, ex).unwrap();
let done = Box::pin(async { 5usize });
let pending = Box::pin(read_u64(&source));

View file

@ -12,8 +12,10 @@ use remain::sorted;
use thiserror::Error as ThisError;
use balloon_control::{BalloonStats, BalloonTubeCommand, BalloonTubeResult};
use base::{self, error, warn, AsRawDescriptor, AsyncTube, Event, RawDescriptor, Tube};
use cros_async::{block_on, select6, select7, sync::Mutex as AsyncMutex, EventAsync, Executor};
use base::{self, error, warn, AsRawDescriptor, Event, RawDescriptor, Tube};
use cros_async::{
block_on, select6, select7, sync::Mutex as AsyncMutex, AsyncTube, EventAsync, Executor,
};
use data_model::{DataInit, Le16, Le32, Le64};
use vm_memory::{GuestAddress, GuestMemory};
@ -458,7 +460,7 @@ fn run_worker(
let interrupt = Rc::new(RefCell::new(interrupt));
let ex = Executor::new().unwrap();
let command_tube = command_tube.into_async_tube(&ex).unwrap();
let command_tube = AsyncTube::new(&ex, command_tube).unwrap();
// We need a block to release all references to command_tube at the end before returning it.
{

View file

@ -21,11 +21,11 @@ use thiserror::Error as ThisError;
use base::Error as SysError;
use base::Result as SysResult;
use base::{
error, info, iov_max, warn, AsRawDescriptor, AsyncTube, Event, RawDescriptor, Timer, Tube,
TubeError,
error, info, iov_max, warn, AsRawDescriptor, Event, RawDescriptor, Timer, Tube, TubeError,
};
use cros_async::{
select5, sync::Mutex as AsyncMutex, AsyncError, EventAsync, Executor, SelectResult, TimerAsync,
select5, sync::Mutex as AsyncMutex, AsyncError, AsyncTube, EventAsync, Executor, SelectResult,
TimerAsync,
};
use data_model::DataInit;
use disk::{AsyncDisk, ToAsyncDisk};
@ -767,7 +767,7 @@ impl VirtioDevice for BlockAsync {
let ex = Executor::new().expect("Failed to create an executor");
let async_control = control_tube
.map(|c| c.into_async_tube(&ex).expect("failed to create async tube"));
.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
let async_image = match disk_image.to_async_disk(&ex) {
Ok(d) => d,
Err(e) => panic!("Failed to create async disk {}", e),

View file

@ -15,10 +15,10 @@ use std::{result, thread};
use acpi_tables::sdt::SDT;
use anyhow::Context;
use base::{
error, pagesize, warn, AsRawDescriptor, AsyncTube, Error as SysError, Event, RawDescriptor,
error, pagesize, warn, AsRawDescriptor, Error as SysError, Event, RawDescriptor,
Result as SysResult, Tube, TubeError,
};
use cros_async::{AsyncError, EventAsync, Executor};
use cros_async::{AsyncError, AsyncTube, EventAsync, Executor};
use data_model::{DataInit, Le64};
use futures::{select, FutureExt};
use remain::sorted;
@ -618,17 +618,14 @@ impl Worker {
let f_resample = async_utils::handle_irq_resample(&ex, interrupt.clone());
let f_kill = async_utils::await_and_exit(&ex, kill_evt);
let request_tube = translate_request_rx.map(|t| {
t.into_async_tube(&ex)
.expect("Failed to create async tube for rx")
});
let request_tube = translate_request_rx
.map(|t| AsyncTube::new(&ex, t).expect("Failed to create async tube for rx"));
let response_tubes = translate_response_senders.map(|m| {
m.into_iter()
.map(|x| {
(
x.0,
x.1.into_async_tube(&ex)
.expect("Failed to create async tube"),
AsyncTube::new(&ex, x.1).expect("Failed to create async tube"),
)
})
.collect()
@ -638,7 +635,7 @@ impl Worker {
handle_translate_request(&endpoints, request_tube, response_tubes);
let f_request = self.request_queue(req_queue, req_evt, interrupt_ref, &endpoints);
let command_tube = iommu_device_tube.into_async_tube(&ex).unwrap();
let command_tube = AsyncTube::new(&ex, iommu_device_tube).unwrap();
// Future to handle command messages from host, such as passing vfio containers.
let f_cmd = Self::handle_command_tube(&mem, command_tube, &endpoints, &hp_endpoints_ranges);

View file

@ -11,7 +11,7 @@ use base::{
clone_descriptor, error, warn, Event, FromRawDescriptor, IntoRawDescriptor, SafeDescriptor,
TimerFd, Tube, UnixSeqpacketListener, UnlinkUnixSeqpacketListener,
};
use cros_async::{AsyncWrapper, EventAsync, Executor, IoSourceExt, TimerAsync};
use cros_async::{AsyncTube, AsyncWrapper, EventAsync, Executor, IoSourceExt, TimerAsync};
use futures::{
future::{select, Either},
pin_mut,
@ -194,9 +194,10 @@ impl VhostUserBackend for GpuBackend {
}
fn set_device_request_channel(&mut self, channel: File) -> anyhow::Result<()> {
let tube = unsafe { Tube::from_raw_descriptor(channel.into_raw_descriptor()) }
.into_async_tube(&self.ex)
.context("failed to create AsyncTube")?;
let tube = AsyncTube::new(&self.ex, unsafe {
Tube::from_raw_descriptor(channel.into_raw_descriptor())
})
.context("failed to create AsyncTube")?;
// We need a PciAddress in order to initialize the pci bar but this isn't part of the
// vhost-user protocol. Instead we expect this to be the first message the crosvm main