From 8e80902ce3f94e6c5595569be2d93beb158c1bd7 Mon Sep 17 00:00:00 2001 From: Chirantan Ekbote Date: Mon, 15 Mar 2021 18:18:20 +0900 Subject: [PATCH] cros_async: Fix circular reference memory leak PollSource keeps a strong reference to the FdExecutor, which can lead to a memory leak via a circular reference if the caller spawns a future that owns a PollSource and then detaches it. Avoid this by using weak references instead. With this change, we now only use weak references internally. The only way to increase the strong reference count is by cloning the FdExecutor. BUG=none TEST=unit tests Change-Id: Ic58ff475a31c6fca831c3ced73b26b87ceeda028 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2760378 Tested-by: kokoro Reviewed-by: Dylan Reid Commit-Queue: Daniel Verkamp --- cros_async/src/fd_executor.rs | 87 ++++++++++++++++++++------- cros_async/src/poll_source.rs | 107 +++++++++++++++------------------- 2 files changed, 115 insertions(+), 79 deletions(-) diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/fd_executor.rs index 06924c5897..aa9872af91 100644 --- a/cros_async/src/fd_executor.rs +++ b/cros_async/src/fd_executor.rs @@ -49,6 +49,9 @@ pub enum Error { /// PollContext failure. #[error("PollContext failure: {0}")] PollContextError(sys_util::Error), + /// An error occurred when setting the FD non-blocking. + #[error("An error occurred setting the FD non-blocking: {0}.")] + SettingNonBlocking(sys_util::Error), /// Failed to submit the waker to the polling context. #[error("An error adding to the Aio context: {0}")] SubmittingWaker(sys_util::Error), @@ -70,6 +73,62 @@ enum OpStatus { Completed, } +// An IO source previously registered with an FdExecutor. Used to initiate asynchronous IO with the +// associated executor. +pub struct RegisteredSource { + source: F, + ex: Weak, +} + +impl RegisteredSource { + // Start an asynchronous operation to wait for this source to become readable. The returned + // future will not be ready until the source is readable. + pub fn wait_readable(&self) -> Result { + let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?; + + let token = + ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_read())?; + + Ok(PendingOperation { + token: Some(token), + ex: self.ex.clone(), + }) + } + + // Start an asynchronous operation to wait for this source to become writable. The returned + // future will not be ready until the source is writable. + pub fn wait_writable(&self) -> Result { + let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?; + + let token = + ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_write())?; + + Ok(PendingOperation { + token: Some(token), + ex: self.ex.clone(), + }) + } +} + +impl RegisteredSource { + // Consume this RegisteredSource and return the inner IO source. + pub fn into_source(self) -> F { + self.source + } +} + +impl AsRef for RegisteredSource { + fn as_ref(&self) -> &F { + &self.source + } +} + +impl AsMut for RegisteredSource { + fn as_mut(&mut self) -> &mut F { + &mut self.source + } +} + /// A token returned from `add_operation` that can be used to cancel the waker before it completes. /// Used to manage getting the result from the underlying executor for a completed operation. /// Dropping a `PendingOperation` will get the result from the executor. @@ -431,24 +490,10 @@ impl FdExecutor { self.raw.run(&mut ctx, f) } - pub fn wait_readable(&self, f: &F) -> Result { - let token = self - .raw - .add_operation(f.as_raw_fd(), WatchingEvents::empty().set_read())?; - - Ok(PendingOperation { - token: Some(token), - ex: Arc::downgrade(&self.raw), - }) - } - - pub fn wait_writable(&self, f: &F) -> Result { - let token = self - .raw - .add_operation(f.as_raw_fd(), WatchingEvents::empty().set_read())?; - - Ok(PendingOperation { - token: Some(token), + pub(crate) fn register_source(&self, f: F) -> Result> { + add_fd_flags(f.as_raw_fd(), libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?; + Ok(RegisteredSource { + source: f, ex: Arc::downgrade(&self.raw), }) } @@ -480,7 +525,8 @@ mod test { async fn do_test(ex: &FdExecutor) { let (r, _w) = sys_util::pipe(true).unwrap(); let done = Box::pin(async { 5usize }); - let pending = ex.wait_readable(&r).unwrap(); + let source = ex.register_source(r).unwrap(); + let pending = source.wait_readable().unwrap(); match futures::future::select(pending, done).await { Either::Right((5, pending)) => std::mem::drop(pending), _ => panic!("unexpected select result"), @@ -521,7 +567,8 @@ mod test { let ex = FdExecutor::new().unwrap(); - let op = ex.wait_writable(&tx).unwrap(); + let source = ex.register_source(tx.try_clone().unwrap()).unwrap(); + let op = source.wait_writable().unwrap(); ex.spawn_local(write_value(tx)).detach(); ex.spawn_local(check_op(op)).detach(); diff --git a/cros_async/src/poll_source.rs b/cros_async/src/poll_source.rs index 748da78daf..7fc674c62b 100644 --- a/cros_async/src/poll_source.rs +++ b/cros_async/src/poll_source.rs @@ -10,11 +10,9 @@ use std::ops::{Deref, DerefMut}; use std::os::unix::io::AsRawFd; use std::sync::Arc; -use libc::O_NONBLOCK; -use sys_util::{self, add_fd_flags}; use thiserror::Error as ThisError; -use crate::fd_executor::{self, FdExecutor}; +use crate::fd_executor::{self, FdExecutor, RegisteredSource}; use crate::mem::{BackingMemory, MemRegion}; use crate::{AsyncError, AsyncResult}; use crate::{IoSourceExt, ReadAsync, WriteAsync}; @@ -35,15 +33,11 @@ pub enum Error { #[error("An error occurred when executing fsync synchronously: {0}")] Fsync(sys_util::Error), /// An error occurred when reading the FD. - /// #[error("An error occurred when reading the FD: {0}.")] Read(sys_util::Error), /// Can't seek file. #[error("An error occurred when seeking the FD: {0}.")] Seeking(sys_util::Error), - /// An error occurred when setting the FD non-blocking. - #[error("An error occurred setting the FD non-blocking: {0}.")] - SettingNonBlocking(sys_util::Error), /// An error occurred when writing the FD. #[error("An error occurred when writing the FD: {0}.")] Write(sys_util::Error), @@ -52,25 +46,19 @@ pub type Result = std::result::Result; /// Async wrapper for an IO source that uses the FD executor to drive async operations. /// Used by `IoSourceExt::new` when uring isn't available. -pub struct PollSource { - source: F, - ex: FdExecutor, -} +pub struct PollSource(RegisteredSource); impl PollSource { /// Create a new `PollSource` from the given IO source. pub fn new(f: F, ex: &FdExecutor) -> Result { - let fd = f.as_raw_fd(); - add_fd_flags(fd, O_NONBLOCK).map_err(Error::SettingNonBlocking)?; - Ok(Self { - source: f, - ex: ex.clone(), - }) + ex.register_source(f) + .map(PollSource) + .map_err(Error::Executor) } /// Return the inner source. pub fn into_source(self) -> F { - self.source + self.0.into_source() } } @@ -78,7 +66,13 @@ impl Deref for PollSource { type Target = F; fn deref(&self) -> &Self::Target { - &self.source + self.0.as_ref() + } +} + +impl DerefMut for PollSource { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.as_mut() } } @@ -94,7 +88,7 @@ impl ReadAsync for PollSource { // Safe because this will only modify `vec` and we check the return value. let res = unsafe { libc::pread64( - self.source.as_raw_fd(), + self.as_raw_fd(), vec.as_mut_ptr() as *mut libc::c_void, vec.len(), file_offset as libc::off64_t, @@ -107,10 +101,7 @@ impl ReadAsync for PollSource { match sys_util::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { - let op = self - .ex - .wait_readable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; } e => return Err(Error::Read(e).into()), @@ -135,7 +126,7 @@ impl ReadAsync for PollSource { // guaranteed to be valid from the pointer by io_slice_mut. let res = unsafe { libc::preadv64( - self.source.as_raw_fd(), + self.as_raw_fd(), iovecs.as_mut_ptr() as *mut _, iovecs.len() as i32, file_offset as libc::off64_t, @@ -148,10 +139,7 @@ impl ReadAsync for PollSource { match sys_util::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { - let op = self - .ex - .wait_readable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; } e => return Err(Error::Read(e).into()), @@ -161,10 +149,7 @@ impl ReadAsync for PollSource { /// Wait for the FD of `self` to be readable. async fn wait_readable(&self) -> AsyncResult<()> { - let op = self - .ex - .wait_readable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; Ok(()) } @@ -175,7 +160,7 @@ impl ReadAsync for PollSource { // Safe because this will only modify `buf` and we check the return value. let res = unsafe { libc::read( - self.source.as_raw_fd(), + self.as_raw_fd(), buf.as_mut_ptr() as *mut libc::c_void, buf.len(), ) @@ -187,10 +172,7 @@ impl ReadAsync for PollSource { match sys_util::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { - let op = self - .ex - .wait_readable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_readable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; } e => return Err(Error::Read(e).into()), @@ -211,7 +193,7 @@ impl WriteAsync for PollSource { // Safe because this will not modify any memory and we check the return value. let res = unsafe { libc::pwrite64( - self.source.as_raw_fd(), + self.as_raw_fd(), vec.as_ptr() as *const libc::c_void, vec.len(), file_offset as libc::off64_t, @@ -224,10 +206,7 @@ impl WriteAsync for PollSource { match sys_util::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { - let op = self - .ex - .wait_writable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_writable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; } e => return Err(Error::Write(e).into()), @@ -253,7 +232,7 @@ impl WriteAsync for PollSource { // guaranteed to be valid from the pointer by io_slice_mut. let res = unsafe { libc::pwritev64( - self.source.as_raw_fd(), + self.as_raw_fd(), iovecs.as_ptr() as *mut _, iovecs.len() as i32, file_offset as libc::off64_t, @@ -266,10 +245,7 @@ impl WriteAsync for PollSource { match sys_util::Error::last() { e if e.errno() == libc::EWOULDBLOCK => { - let op = self - .ex - .wait_writable(&self.source) - .map_err(Error::AddingWaker)?; + let op = self.0.wait_writable().map_err(Error::AddingWaker)?; op.await.map_err(Error::Executor)?; } e => return Err(Error::Write(e).into()), @@ -281,7 +257,7 @@ impl WriteAsync for PollSource { async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> { let ret = unsafe { libc::fallocate64( - self.source.as_raw_fd(), + self.as_raw_fd(), mode as libc::c_int, file_offset as libc::off64_t, len as libc::off64_t, @@ -296,7 +272,7 @@ impl WriteAsync for PollSource { /// Sync all completed write operations to the backing storage. async fn fsync(&self) -> AsyncResult<()> { - let ret = unsafe { libc::fsync(self.source.as_raw_fd()) }; + let ret = unsafe { libc::fsync(self.as_raw_fd()) }; if ret == 0 { Ok(()) } else { @@ -309,23 +285,17 @@ impl WriteAsync for PollSource { impl IoSourceExt for PollSource { /// Yields the underlying IO source. fn into_source(self: Box) -> F { - self.source + self.0.into_source() } /// Provides a mutable ref to the underlying IO source. fn as_source_mut(&mut self) -> &mut F { - &mut self.source + self } /// Provides a ref to the underlying IO source. fn as_source(&self) -> &F { - &self.source - } -} - -impl DerefMut for PollSource { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.source + self } } @@ -393,4 +363,23 @@ mod tests { let ex = FdExecutor::new().unwrap(); ex.run_until(go(&ex)).unwrap(); } + + #[test] + fn memory_leak() { + // This test needs to run under ASAN to detect memory leaks. + + async fn owns_poll_source(source: PollSource) { + let _ = source.wait_readable().await; + } + + let (rx, _tx) = sys_util::pipe(true).unwrap(); + let ex = FdExecutor::new().unwrap(); + let source = PollSource::new(rx, &ex).unwrap(); + ex.spawn_local(owns_poll_source(source)).detach(); + + // Drop `ex` without running. This would cause a memory leak if PollSource owned a strong + // reference to the executor because it owns a reference to the future that owns PollSource + // (via its Runnable). The strong reference prevents the drop impl from running, which would + // otherwise poll the future and have it return with an error. + } }