mirror of
https://chromium.googlesource.com/crosvm/crosvm
synced 2025-02-06 02:25:23 +00:00
cros_async: Fix circular reference memory leak
PollSource keeps a strong reference to the FdExecutor, which can lead to a memory leak via a circular reference if the caller spawns a future that owns a PollSource and then detaches it. Avoid this by using weak references instead. With this change, we now only use weak references internally. The only way to increase the strong reference count is by cloning the FdExecutor. BUG=none TEST=unit tests Change-Id: Ic58ff475a31c6fca831c3ced73b26b87ceeda028 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2760378 Tested-by: kokoro <noreply+kokoro@google.com> Reviewed-by: Dylan Reid <dgreid@chromium.org> Commit-Queue: Daniel Verkamp <dverkamp@chromium.org>
This commit is contained in:
parent
12d17e90b8
commit
8e80902ce3
2 changed files with 115 additions and 79 deletions
|
@ -49,6 +49,9 @@ pub enum Error {
|
|||
/// PollContext failure.
|
||||
#[error("PollContext failure: {0}")]
|
||||
PollContextError(sys_util::Error),
|
||||
/// An error occurred when setting the FD non-blocking.
|
||||
#[error("An error occurred setting the FD non-blocking: {0}.")]
|
||||
SettingNonBlocking(sys_util::Error),
|
||||
/// Failed to submit the waker to the polling context.
|
||||
#[error("An error adding to the Aio context: {0}")]
|
||||
SubmittingWaker(sys_util::Error),
|
||||
|
@ -70,6 +73,62 @@ enum OpStatus {
|
|||
Completed,
|
||||
}
|
||||
|
||||
// An IO source previously registered with an FdExecutor. Used to initiate asynchronous IO with the
|
||||
// associated executor.
|
||||
pub struct RegisteredSource<F> {
|
||||
source: F,
|
||||
ex: Weak<RawExecutor>,
|
||||
}
|
||||
|
||||
impl<F: AsRawFd> RegisteredSource<F> {
|
||||
// Start an asynchronous operation to wait for this source to become readable. The returned
|
||||
// future will not be ready until the source is readable.
|
||||
pub fn wait_readable(&self) -> Result<PendingOperation> {
|
||||
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
|
||||
|
||||
let token =
|
||||
ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_read())?;
|
||||
|
||||
Ok(PendingOperation {
|
||||
token: Some(token),
|
||||
ex: self.ex.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
// Start an asynchronous operation to wait for this source to become writable. The returned
|
||||
// future will not be ready until the source is writable.
|
||||
pub fn wait_writable(&self) -> Result<PendingOperation> {
|
||||
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
|
||||
|
||||
let token =
|
||||
ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_write())?;
|
||||
|
||||
Ok(PendingOperation {
|
||||
token: Some(token),
|
||||
ex: self.ex.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> RegisteredSource<F> {
|
||||
// Consume this RegisteredSource and return the inner IO source.
|
||||
pub fn into_source(self) -> F {
|
||||
self.source
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> AsRef<F> for RegisteredSource<F> {
|
||||
fn as_ref(&self) -> &F {
|
||||
&self.source
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> AsMut<F> for RegisteredSource<F> {
|
||||
fn as_mut(&mut self) -> &mut F {
|
||||
&mut self.source
|
||||
}
|
||||
}
|
||||
|
||||
/// A token returned from `add_operation` that can be used to cancel the waker before it completes.
|
||||
/// Used to manage getting the result from the underlying executor for a completed operation.
|
||||
/// Dropping a `PendingOperation` will get the result from the executor.
|
||||
|
@ -431,24 +490,10 @@ impl FdExecutor {
|
|||
self.raw.run(&mut ctx, f)
|
||||
}
|
||||
|
||||
pub fn wait_readable<F: AsRawFd>(&self, f: &F) -> Result<PendingOperation> {
|
||||
let token = self
|
||||
.raw
|
||||
.add_operation(f.as_raw_fd(), WatchingEvents::empty().set_read())?;
|
||||
|
||||
Ok(PendingOperation {
|
||||
token: Some(token),
|
||||
ex: Arc::downgrade(&self.raw),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn wait_writable<F: AsRawFd>(&self, f: &F) -> Result<PendingOperation> {
|
||||
let token = self
|
||||
.raw
|
||||
.add_operation(f.as_raw_fd(), WatchingEvents::empty().set_read())?;
|
||||
|
||||
Ok(PendingOperation {
|
||||
token: Some(token),
|
||||
pub(crate) fn register_source<F: AsRawFd>(&self, f: F) -> Result<RegisteredSource<F>> {
|
||||
add_fd_flags(f.as_raw_fd(), libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
|
||||
Ok(RegisteredSource {
|
||||
source: f,
|
||||
ex: Arc::downgrade(&self.raw),
|
||||
})
|
||||
}
|
||||
|
@ -480,7 +525,8 @@ mod test {
|
|||
async fn do_test(ex: &FdExecutor) {
|
||||
let (r, _w) = sys_util::pipe(true).unwrap();
|
||||
let done = Box::pin(async { 5usize });
|
||||
let pending = ex.wait_readable(&r).unwrap();
|
||||
let source = ex.register_source(r).unwrap();
|
||||
let pending = source.wait_readable().unwrap();
|
||||
match futures::future::select(pending, done).await {
|
||||
Either::Right((5, pending)) => std::mem::drop(pending),
|
||||
_ => panic!("unexpected select result"),
|
||||
|
@ -521,7 +567,8 @@ mod test {
|
|||
|
||||
let ex = FdExecutor::new().unwrap();
|
||||
|
||||
let op = ex.wait_writable(&tx).unwrap();
|
||||
let source = ex.register_source(tx.try_clone().unwrap()).unwrap();
|
||||
let op = source.wait_writable().unwrap();
|
||||
|
||||
ex.spawn_local(write_value(tx)).detach();
|
||||
ex.spawn_local(check_op(op)).detach();
|
||||
|
|
|
@ -10,11 +10,9 @@ use std::ops::{Deref, DerefMut};
|
|||
use std::os::unix::io::AsRawFd;
|
||||
use std::sync::Arc;
|
||||
|
||||
use libc::O_NONBLOCK;
|
||||
use sys_util::{self, add_fd_flags};
|
||||
use thiserror::Error as ThisError;
|
||||
|
||||
use crate::fd_executor::{self, FdExecutor};
|
||||
use crate::fd_executor::{self, FdExecutor, RegisteredSource};
|
||||
use crate::mem::{BackingMemory, MemRegion};
|
||||
use crate::{AsyncError, AsyncResult};
|
||||
use crate::{IoSourceExt, ReadAsync, WriteAsync};
|
||||
|
@ -35,15 +33,11 @@ pub enum Error {
|
|||
#[error("An error occurred when executing fsync synchronously: {0}")]
|
||||
Fsync(sys_util::Error),
|
||||
/// An error occurred when reading the FD.
|
||||
///
|
||||
#[error("An error occurred when reading the FD: {0}.")]
|
||||
Read(sys_util::Error),
|
||||
/// Can't seek file.
|
||||
#[error("An error occurred when seeking the FD: {0}.")]
|
||||
Seeking(sys_util::Error),
|
||||
/// An error occurred when setting the FD non-blocking.
|
||||
#[error("An error occurred setting the FD non-blocking: {0}.")]
|
||||
SettingNonBlocking(sys_util::Error),
|
||||
/// An error occurred when writing the FD.
|
||||
#[error("An error occurred when writing the FD: {0}.")]
|
||||
Write(sys_util::Error),
|
||||
|
@ -52,25 +46,19 @@ pub type Result<T> = std::result::Result<T, Error>;
|
|||
|
||||
/// Async wrapper for an IO source that uses the FD executor to drive async operations.
|
||||
/// Used by `IoSourceExt::new` when uring isn't available.
|
||||
pub struct PollSource<F: AsRawFd> {
|
||||
source: F,
|
||||
ex: FdExecutor,
|
||||
}
|
||||
pub struct PollSource<F>(RegisteredSource<F>);
|
||||
|
||||
impl<F: AsRawFd> PollSource<F> {
|
||||
/// Create a new `PollSource` from the given IO source.
|
||||
pub fn new(f: F, ex: &FdExecutor) -> Result<Self> {
|
||||
let fd = f.as_raw_fd();
|
||||
add_fd_flags(fd, O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
|
||||
Ok(Self {
|
||||
source: f,
|
||||
ex: ex.clone(),
|
||||
})
|
||||
ex.register_source(f)
|
||||
.map(PollSource)
|
||||
.map_err(Error::Executor)
|
||||
}
|
||||
|
||||
/// Return the inner source.
|
||||
pub fn into_source(self) -> F {
|
||||
self.source
|
||||
self.0.into_source()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,7 +66,13 @@ impl<F: AsRawFd> Deref for PollSource<F> {
|
|||
type Target = F;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.source
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: AsRawFd> DerefMut for PollSource<F> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.0.as_mut()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -94,7 +88,7 @@ impl<F: AsRawFd> ReadAsync for PollSource<F> {
|
|||
// Safe because this will only modify `vec` and we check the return value.
|
||||
let res = unsafe {
|
||||
libc::pread64(
|
||||
self.source.as_raw_fd(),
|
||||
self.as_raw_fd(),
|
||||
vec.as_mut_ptr() as *mut libc::c_void,
|
||||
vec.len(),
|
||||
file_offset as libc::off64_t,
|
||||
|
@ -107,10 +101,7 @@ impl<F: AsRawFd> ReadAsync for PollSource<F> {
|
|||
|
||||
match sys_util::Error::last() {
|
||||
e if e.errno() == libc::EWOULDBLOCK => {
|
||||
let op = self
|
||||
.ex
|
||||
.wait_readable(&self.source)
|
||||
.map_err(Error::AddingWaker)?;
|
||||
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
|
||||
op.await.map_err(Error::Executor)?;
|
||||
}
|
||||
e => return Err(Error::Read(e).into()),
|
||||
|
@ -135,7 +126,7 @@ impl<F: AsRawFd> ReadAsync for PollSource<F> {
|
|||
// guaranteed to be valid from the pointer by io_slice_mut.
|
||||
let res = unsafe {
|
||||
libc::preadv64(
|
||||
self.source.as_raw_fd(),
|
||||
self.as_raw_fd(),
|
||||
iovecs.as_mut_ptr() as *mut _,
|
||||
iovecs.len() as i32,
|
||||
file_offset as libc::off64_t,
|
||||
|
@ -148,10 +139,7 @@ impl<F: AsRawFd> ReadAsync for PollSource<F> {
|
|||
|
||||
match sys_util::Error::last() {
|
||||
e if e.errno() == libc::EWOULDBLOCK => {
|
||||
let op = self
|
||||
.ex
|
||||
.wait_readable(&self.source)
|
||||
.map_err(Error::AddingWaker)?;
|
||||
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
|
||||
op.await.map_err(Error::Executor)?;
|
||||
}
|
||||
e => return Err(Error::Read(e).into()),
|
||||
|
@ -161,10 +149,7 @@ impl<F: AsRawFd> ReadAsync for PollSource<F> {
|
|||
|
||||
/// Wait for the FD of `self` to be readable.
|
||||
async fn wait_readable(&self) -> AsyncResult<()> {
|
||||
let op = self
|
||||
.ex
|
||||
.wait_readable(&self.source)
|
||||
.map_err(Error::AddingWaker)?;
|
||||
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
|
||||
op.await.map_err(Error::Executor)?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -175,7 +160,7 @@ impl<F: AsRawFd> ReadAsync for PollSource<F> {
|
|||
// Safe because this will only modify `buf` and we check the return value.
|
||||
let res = unsafe {
|
||||
libc::read(
|
||||
self.source.as_raw_fd(),
|
||||
self.as_raw_fd(),
|
||||
buf.as_mut_ptr() as *mut libc::c_void,
|
||||
buf.len(),
|
||||
)
|
||||
|
@ -187,10 +172,7 @@ impl<F: AsRawFd> ReadAsync for PollSource<F> {
|
|||
|
||||
match sys_util::Error::last() {
|
||||
e if e.errno() == libc::EWOULDBLOCK => {
|
||||
let op = self
|
||||
.ex
|
||||
.wait_readable(&self.source)
|
||||
.map_err(Error::AddingWaker)?;
|
||||
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
|
||||
op.await.map_err(Error::Executor)?;
|
||||
}
|
||||
e => return Err(Error::Read(e).into()),
|
||||
|
@ -211,7 +193,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
|
|||
// Safe because this will not modify any memory and we check the return value.
|
||||
let res = unsafe {
|
||||
libc::pwrite64(
|
||||
self.source.as_raw_fd(),
|
||||
self.as_raw_fd(),
|
||||
vec.as_ptr() as *const libc::c_void,
|
||||
vec.len(),
|
||||
file_offset as libc::off64_t,
|
||||
|
@ -224,10 +206,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
|
|||
|
||||
match sys_util::Error::last() {
|
||||
e if e.errno() == libc::EWOULDBLOCK => {
|
||||
let op = self
|
||||
.ex
|
||||
.wait_writable(&self.source)
|
||||
.map_err(Error::AddingWaker)?;
|
||||
let op = self.0.wait_writable().map_err(Error::AddingWaker)?;
|
||||
op.await.map_err(Error::Executor)?;
|
||||
}
|
||||
e => return Err(Error::Write(e).into()),
|
||||
|
@ -253,7 +232,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
|
|||
// guaranteed to be valid from the pointer by io_slice_mut.
|
||||
let res = unsafe {
|
||||
libc::pwritev64(
|
||||
self.source.as_raw_fd(),
|
||||
self.as_raw_fd(),
|
||||
iovecs.as_ptr() as *mut _,
|
||||
iovecs.len() as i32,
|
||||
file_offset as libc::off64_t,
|
||||
|
@ -266,10 +245,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
|
|||
|
||||
match sys_util::Error::last() {
|
||||
e if e.errno() == libc::EWOULDBLOCK => {
|
||||
let op = self
|
||||
.ex
|
||||
.wait_writable(&self.source)
|
||||
.map_err(Error::AddingWaker)?;
|
||||
let op = self.0.wait_writable().map_err(Error::AddingWaker)?;
|
||||
op.await.map_err(Error::Executor)?;
|
||||
}
|
||||
e => return Err(Error::Write(e).into()),
|
||||
|
@ -281,7 +257,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
|
|||
async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> {
|
||||
let ret = unsafe {
|
||||
libc::fallocate64(
|
||||
self.source.as_raw_fd(),
|
||||
self.as_raw_fd(),
|
||||
mode as libc::c_int,
|
||||
file_offset as libc::off64_t,
|
||||
len as libc::off64_t,
|
||||
|
@ -296,7 +272,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
|
|||
|
||||
/// Sync all completed write operations to the backing storage.
|
||||
async fn fsync(&self) -> AsyncResult<()> {
|
||||
let ret = unsafe { libc::fsync(self.source.as_raw_fd()) };
|
||||
let ret = unsafe { libc::fsync(self.as_raw_fd()) };
|
||||
if ret == 0 {
|
||||
Ok(())
|
||||
} else {
|
||||
|
@ -309,23 +285,17 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
|
|||
impl<F: AsRawFd> IoSourceExt<F> for PollSource<F> {
|
||||
/// Yields the underlying IO source.
|
||||
fn into_source(self: Box<Self>) -> F {
|
||||
self.source
|
||||
self.0.into_source()
|
||||
}
|
||||
|
||||
/// Provides a mutable ref to the underlying IO source.
|
||||
fn as_source_mut(&mut self) -> &mut F {
|
||||
&mut self.source
|
||||
self
|
||||
}
|
||||
|
||||
/// Provides a ref to the underlying IO source.
|
||||
fn as_source(&self) -> &F {
|
||||
&self.source
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: AsRawFd> DerefMut for PollSource<F> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.source
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -393,4 +363,23 @@ mod tests {
|
|||
let ex = FdExecutor::new().unwrap();
|
||||
ex.run_until(go(&ex)).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn memory_leak() {
|
||||
// This test needs to run under ASAN to detect memory leaks.
|
||||
|
||||
async fn owns_poll_source(source: PollSource<File>) {
|
||||
let _ = source.wait_readable().await;
|
||||
}
|
||||
|
||||
let (rx, _tx) = sys_util::pipe(true).unwrap();
|
||||
let ex = FdExecutor::new().unwrap();
|
||||
let source = PollSource::new(rx, &ex).unwrap();
|
||||
ex.spawn_local(owns_poll_source(source)).detach();
|
||||
|
||||
// Drop `ex` without running. This would cause a memory leak if PollSource owned a strong
|
||||
// reference to the executor because it owns a reference to the future that owns PollSource
|
||||
// (via its Runnable). The strong reference prevents the drop impl from running, which would
|
||||
// otherwise poll the future and have it return with an error.
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue