diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/fd_executor.rs index 06924c5897..aa9872af91 100644 --- a/cros_async/src/fd_executor.rs +++ b/cros_async/src/fd_executor.rs @@ -49,6 +49,9 @@ pub enum Error { /// PollContext failure. #[error("PollContext failure: {0}")] PollContextError(sys_util::Error), + /// An error occurred when setting the FD non-blocking. + #[error("An error occurred setting the FD non-blocking: {0}.")] + SettingNonBlocking(sys_util::Error), /// Failed to submit the waker to the polling context. #[error("An error adding to the Aio context: {0}")] SubmittingWaker(sys_util::Error), @@ -70,6 +73,62 @@ enum OpStatus { Completed, } +// An IO source previously registered with an FdExecutor. Used to initiate asynchronous IO with the +// associated executor. +pub struct RegisteredSource { + source: F, + ex: Weak, +} + +impl RegisteredSource { + // Start an asynchronous operation to wait for this source to become readable. The returned + // future will not be ready until the source is readable. + pub fn wait_readable(&self) -> Result { + let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?; + + let token = + ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_read())?; + + Ok(PendingOperation { + token: Some(token), + ex: self.ex.clone(), + }) + } + + // Start an asynchronous operation to wait for this source to become writable. The returned + // future will not be ready until the source is writable. + pub fn wait_writable(&self) -> Result { + let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?; + + let token = + ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_write())?; + + Ok(PendingOperation { + token: Some(token), + ex: self.ex.clone(), + }) + } +} + +impl RegisteredSource { + // Consume this RegisteredSource and return the inner IO source. + pub fn into_source(self) -> F { + self.source + } +} + +impl AsRef for RegisteredSource { + fn as_ref(&self) -> &F { + &self.source + } +} + +impl AsMut for RegisteredSource { + fn as_mut(&mut self) -> &mut F { + &mut self.source + } +} + /// A token returned from `add_operation` that can be used to cancel the waker before it completes. /// Used to manage getting the result from the underlying executor for a completed operation. /// Dropping a `PendingOperation` will get the result from the executor. @@ -431,24 +490,10 @@ impl FdExecutor { self.raw.run(&mut ctx, f) } - pub fn wait_readable(&self, f: &F) -> Result { - let token = self - .raw - .add_operation(f.as_raw_fd(), WatchingEvents::empty().set_read())?; - - Ok(PendingOperation { - token: Some(token), - ex: Arc::downgrade(&self.raw), - }) - } - - pub fn wait_writable(&self, f: &F) -> Result { - let token = self - .raw - .add_operation(f.as_raw_fd(), WatchingEvents::empty().set_read())?; - - Ok(PendingOperation { - token: Some(token), + pub(crate) fn register_source(&self, f: F) -> Result> { + add_fd_flags(f.as_raw_fd(), libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?; + Ok(RegisteredSource { + source: f, ex: Arc::downgrade(&self.raw), }) } @@ -480,7 +525,8 @@ mod test { async fn do_test(ex: &FdExecutor) { let (r, _w) = sys_util::pipe(true).unwrap(); let done = Box::pin(async { 5usize }); - let pending = ex.wait_readable(&r).unwrap(); + let source = ex.register_source(r).unwrap(); + let pending = source.wait_readable().unwrap(); match futures::future::select(pending, done).await { Either::Right((5, pending)) => std::mem::drop(pending), _ => panic!("unexpected select result"), @@ -521,7 +567,8 @@ mod test { let ex = FdExecutor::new().unwrap(); - let op = ex.wait_writable(&tx).unwrap(); + let source = ex.register_source(tx.try_clone().unwrap()).unwrap(); + let op = source.wait_writable().unwrap(); ex.spawn_local(write_value(tx)).detach(); ex.spawn_local(check_op(op)).detach(); diff --git a/cros_async/src/poll_source.rs b/cros_async/src/poll_source.rs index 748da78daf..7fc674c62b 100644 --- a/cros_async/src/poll_source.rs +++ b/cros_async/src/poll_source.rs @@ -10,11 +10,9 @@ use std::ops::{Deref, DerefMut}; use std::os::unix::io::AsRawFd; use std::sync::Arc; -use libc::O_NONBLOCK; -use sys_util::{self, add_fd_flags}; use thiserror::Error as ThisError; -use crate::fd_executor::{self, FdExecutor}; +use crate::fd_executor::{self, FdExecutor, RegisteredSource}; use crate::mem::{BackingMemory, MemRegion}; use crate::{AsyncError, AsyncResult}; use crate::{IoSourceExt, ReadAsync, WriteAsync}; @@ -35,15 +33,11 @@ pub enum Error { #[error("An error occurred when executing fsync synchronously: {0}")] Fsync(sys_util::Error), /// An error occurred when reading the FD. - /// #[error("An error occurred when reading the FD: {0}.")] Read(sys_util::Error), /// Can't seek file. #[error("An error occurred when seeking the FD: {0}.")] Seeking(sys_util::Error), - /// An error occurred when setting the FD non-blocking. - #[error("An error occurred setting the FD non-blocking: {0}.")] - SettingNonBlocking(sys_util::Error), /// An error occurred when writing the FD. #[error("An error occurred when writing the FD: {0}.")] Write(sys_util::Error), @@ -52,25 +46,19 @@ pub type Result = std::result::Result; /// Async wrapper for an IO source that uses the FD executor to drive async operations. /// Used by `IoSourceExt::new` when uring isn't available. -pub struct PollSource { - source: F, - ex: FdExecutor, -} +pub struct PollSource(RegisteredSource); impl PollSource { /// Create a new `PollSource` from the given IO source. pub fn new(f: F, ex: &FdExecutor) -> Result { - let fd = f.as_raw_fd(); - add_fd_flags(fd, O_NONBLOCK).map_err(Error::SettingNonBlocking)?; - Ok(Self { - source: f, - ex: ex.clone(), - }) + ex.register_source(f) + .map(PollSource) + .map_err(Error::Executor) } /// Return the inner source. pub fn into_source(self) -> F { - self.source + self.0.into_source() } } @@ -78,7 +66,13 @@ impl Deref for PollSource { type Target = F; fn deref(&self) -> &Self::Target { - &self.source + self.0.as_ref() + } +} + +impl DerefMut for PollSource { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.as_mut() } } @@ -94,7 +88,7 @@ impl ReadAsync for PollSource { // Safe because this will only modify `vec` and we check the return value. let res = unsafe { libc::pread64( - self.source.as_raw_fd(), + self.as_raw_fd(), vec.as_mut_ptr() as *mut libc::c_void, vec.len(), file_offset as libc::off64_t, @@ -107,10 +101,7 @@ impl ReadAsync for PollSource { match sys_util::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { - let op = self - .ex - .wait_readable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; } e => return Err(Error::Read(e).into()), @@ -135,7 +126,7 @@ impl ReadAsync for PollSource { // guaranteed to be valid from the pointer by io_slice_mut. let res = unsafe { libc::preadv64( - self.source.as_raw_fd(), + self.as_raw_fd(), iovecs.as_mut_ptr() as *mut _, iovecs.len() as i32, file_offset as libc::off64_t, @@ -148,10 +139,7 @@ impl ReadAsync for PollSource { match sys_util::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { - let op = self - .ex - .wait_readable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; } e => return Err(Error::Read(e).into()), @@ -161,10 +149,7 @@ impl ReadAsync for PollSource { /// Wait for the FD of `self` to be readable. async fn wait_readable(&self) -> AsyncResult<()> { - let op = self - .ex - .wait_readable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; Ok(()) } @@ -175,7 +160,7 @@ impl ReadAsync for PollSource { // Safe because this will only modify `buf` and we check the return value. let res = unsafe { libc::read( - self.source.as_raw_fd(), + self.as_raw_fd(), buf.as_mut_ptr() as *mut libc::c_void, buf.len(), ) @@ -187,10 +172,7 @@ impl ReadAsync for PollSource { match sys_util::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { - let op = self - .ex - .wait_readable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; } e => return Err(Error::Read(e).into()), @@ -211,7 +193,7 @@ impl WriteAsync for PollSource { // Safe because this will not modify any memory and we check the return value. let res = unsafe { libc::pwrite64( - self.source.as_raw_fd(), + self.as_raw_fd(), vec.as_ptr() as *const libc::c_void, vec.len(), file_offset as libc::off64_t, @@ -224,10 +206,7 @@ impl WriteAsync for PollSource { match sys_util::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { - let op = self - .ex - .wait_writable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_writable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; } e => return Err(Error::Write(e).into()), @@ -253,7 +232,7 @@ impl WriteAsync for PollSource { // guaranteed to be valid from the pointer by io_slice_mut. let res = unsafe { libc::pwritev64( - self.source.as_raw_fd(), + self.as_raw_fd(), iovecs.as_ptr() as *mut _, iovecs.len() as i32, file_offset as libc::off64_t, @@ -266,10 +245,7 @@ impl WriteAsync for PollSource { match sys_util::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { - let op = self - .ex - .wait_writable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_writable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; } e => return Err(Error::Write(e).into()), @@ -281,7 +257,7 @@ impl WriteAsync for PollSource { async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> { let ret = unsafe { libc::fallocate64( - self.source.as_raw_fd(), + self.as_raw_fd(), mode as libc::c_int, file_offset as libc::off64_t, len as libc::off64_t, @@ -296,7 +272,7 @@ impl WriteAsync for PollSource { /// Sync all completed write operations to the backing storage. async fn fsync(&self) -> AsyncResult<()> { - let ret = unsafe { libc::fsync(self.source.as_raw_fd()) }; + let ret = unsafe { libc::fsync(self.as_raw_fd()) }; if ret == 0 { Ok(()) } else { @@ -309,23 +285,17 @@ impl WriteAsync for PollSource { impl IoSourceExt for PollSource { /// Yields the underlying IO source. fn into_source(self: Box) -> F { - self.source + self.0.into_source() } /// Provides a mutable ref to the underlying IO source. fn as_source_mut(&mut self) -> &mut F { - &mut self.source + self } /// Provides a ref to the underlying IO source. fn as_source(&self) -> &F { - &self.source - } -} - -impl DerefMut for PollSource { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.source + self } } @@ -393,4 +363,23 @@ mod tests { let ex = FdExecutor::new().unwrap(); ex.run_until(go(&ex)).unwrap(); } + + #[test] + fn memory_leak() { + // This test needs to run under ASAN to detect memory leaks. + + async fn owns_poll_source(source: PollSource) { + let _ = source.wait_readable().await; + } + + let (rx, _tx) = sys_util::pipe(true).unwrap(); + let ex = FdExecutor::new().unwrap(); + let source = PollSource::new(rx, &ex).unwrap(); + ex.spawn_local(owns_poll_source(source)).detach(); + + // Drop `ex` without running. This would cause a memory leak if PollSource owned a strong + // reference to the executor because it owns a reference to the future that owns PollSource + // (via its Runnable). The strong reference prevents the drop impl from running, which would + // otherwise poll the future and have it return with an error. + } }