mirror of
https://chromium.googlesource.com/crosvm/crosvm
synced 2025-02-05 18:20:34 +00:00
cros_async: Refactor executors
This CL includes several smaller changes to how executors work: * Replace BTreeMap with Slab, which should give us some small performance benefits by giving O(1) lookups and reducing memory allocations when adding new I/O operations. It also gives some improvements to readability as we no longer have to carry around "next_*" variables. Slab has no dependencies and we're already pulling it in via the futures crate. * WakerToken no longer implements Clone. * Merge pending_ops and completed_ops in URingExecutor into a single `ops` Slab and introduce an OpStatus enum that indicates whether an operation is pending or completed. This also fixes a resource leak where an operation that was canceled before completion would end up staying in completed_ops ~forever. Add a test for this leak. * Add a generation number to RingWakerState and include it in all RegisteredSources. Since a RegisteredSource can outlive the RingWakerState that created it, the generation number ensures that it will only affect the RingWakerState that created it. Add a test for this. * Poison RegisteredSource so that it doesn't implement Send or Sync. Since it's associated with a thread-local executor, sending it across thread boundaries is not ok. BUG=none TEST=unit tests Change-Id: I43dcfbb8166002995ec8773522c22fab8fb2da9e Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2374885 Tested-by: kokoro <noreply+kokoro@google.com> Reviewed-by: Daniel Verkamp <dverkamp@chromium.org> Commit-Queue: Chirantan Ekbote <chirantan@chromium.org>
This commit is contained in:
parent
c677fb49ce
commit
8ea889fccf
5 changed files with 316 additions and 152 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -128,6 +128,7 @@ dependencies = [
|
|||
"libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"paste 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"syscall_defines 0.1.0",
|
||||
]
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ paste = "*"
|
|||
pin-utils = "0.1.0-alpha.4"
|
||||
base = { path = "../base" }
|
||||
syscall_defines = { path = "../syscall_defines" }
|
||||
slab = "0.4"
|
||||
|
||||
[dependencies.futures]
|
||||
version = "*"
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
//! utility functions to combine futures.
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::{self, Display};
|
||||
use std::fs::File;
|
||||
use std::future::Future;
|
||||
|
@ -19,6 +19,8 @@ use std::os::unix::io::RawFd;
|
|||
use std::pin::Pin;
|
||||
use std::task::Waker;
|
||||
|
||||
use slab::Slab;
|
||||
|
||||
use base::{PollContext, WatchingEvents};
|
||||
|
||||
use crate::executor::{ExecutableFuture, Executor, FutureList};
|
||||
|
@ -94,7 +96,7 @@ impl PendingWaker {
|
|||
impl Drop for PendingWaker {
|
||||
fn drop(&mut self) {
|
||||
if let Some(token) = self.token.take() {
|
||||
let _ = cancel_waker(&token);
|
||||
let _ = cancel_waker(token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -120,7 +122,7 @@ pub(crate) fn add_write_waker(fd: RawFd, waker: Waker) -> Result<PendingWaker> {
|
|||
}
|
||||
|
||||
/// Cancels the waker that returned the given token if the waker hasn't yet fired.
|
||||
pub(crate) fn cancel_waker(token: &WakerToken) -> Result<()> {
|
||||
pub(crate) fn cancel_waker(token: WakerToken) -> Result<()> {
|
||||
STATE.with(|state| {
|
||||
let mut state = state.borrow_mut();
|
||||
if let Some(state) = state.as_mut() {
|
||||
|
@ -147,9 +149,8 @@ pub(crate) fn add_future(future: Pin<Box<dyn Future<Output = ()>>>) -> Result<()
|
|||
|
||||
// Tracks active wakers and associates wakers with the futures that registered them.
|
||||
struct FdWakerState {
|
||||
poll_ctx: PollContext<u64>,
|
||||
token_map: BTreeMap<WakerToken, (File, Waker)>,
|
||||
next_token: u64, // Next token for adding to the context.
|
||||
poll_ctx: PollContext<usize>,
|
||||
tokens: Slab<(File, Waker)>,
|
||||
new_futures: VecDeque<ExecutableFuture<()>>,
|
||||
}
|
||||
|
||||
|
@ -157,8 +158,7 @@ impl FdWakerState {
|
|||
fn new() -> Result<Self> {
|
||||
Ok(FdWakerState {
|
||||
poll_ctx: PollContext::new().map_err(Error::CreatingContext)?,
|
||||
token_map: BTreeMap::new(),
|
||||
next_token: 0,
|
||||
tokens: Slab::with_capacity(64),
|
||||
new_futures: VecDeque::new(),
|
||||
})
|
||||
}
|
||||
|
@ -170,33 +170,31 @@ impl FdWakerState {
|
|||
// will only be added to the poll loop.
|
||||
File::from_raw_fd(dup_fd(fd)?)
|
||||
};
|
||||
let entry = self.tokens.vacant_entry();
|
||||
let next_token = entry.key();
|
||||
self.poll_ctx
|
||||
.add_fd_with_events(&duped_fd, events, self.next_token)
|
||||
.add_fd_with_events(&duped_fd, events, next_token)
|
||||
.map_err(Error::SubmittingWaker)?;
|
||||
let next_token = WakerToken(self.next_token);
|
||||
self.token_map.insert(next_token.clone(), (duped_fd, waker));
|
||||
self.next_token += 1;
|
||||
Ok(next_token)
|
||||
entry.insert((duped_fd, waker));
|
||||
Ok(WakerToken(next_token))
|
||||
}
|
||||
|
||||
// Waits until one of the FDs is readable and wakes the associated waker.
|
||||
fn wait_wake_event(&mut self) -> Result<()> {
|
||||
let events = self.poll_ctx.wait().map_err(Error::PollContextError)?;
|
||||
for e in events.iter() {
|
||||
let waker_token = WakerToken(e.token());
|
||||
if let Some((fd, waker)) = self.token_map.remove(&waker_token) {
|
||||
self.poll_ctx.delete(&fd).map_err(Error::PollContextError)?;
|
||||
waker.wake_by_ref();
|
||||
}
|
||||
let token = e.token();
|
||||
let (fd, waker) = self.tokens.remove(token);
|
||||
self.poll_ctx.delete(&fd).map_err(Error::PollContextError)?;
|
||||
waker.wake();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Remove the waker for the given token if it hasn't fired yet.
|
||||
fn cancel_waker(&mut self, token: &WakerToken) -> Result<()> {
|
||||
if let Some((fd, _waker)) = self.token_map.remove(token) {
|
||||
self.poll_ctx.delete(&fd).map_err(Error::PollContextError)?;
|
||||
}
|
||||
fn cancel_waker(&mut self, token: WakerToken) -> Result<()> {
|
||||
let (fd, _waker) = self.tokens.remove(token.0);
|
||||
self.poll_ctx.delete(&fd).map_err(Error::PollContextError)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -299,7 +297,7 @@ mod test {
|
|||
fn pending_ops() -> usize {
|
||||
STATE.with(|state| {
|
||||
let state = state.borrow_mut();
|
||||
state.as_ref().unwrap().token_map.len()
|
||||
state.as_ref().unwrap().tokens.len()
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,8 @@
|
|||
//! ensure it lives long enough.
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::collections::VecDeque;
|
||||
use std::convert::TryInto;
|
||||
use std::fmt::{self, Display};
|
||||
use std::fs::File;
|
||||
use std::future::Future;
|
||||
|
@ -60,11 +61,12 @@ use std::io;
|
|||
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
|
||||
use std::task::Waker;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::pin_mut;
|
||||
use slab::Slab;
|
||||
|
||||
use base::WatchingEvents;
|
||||
use io_uring::URingContext;
|
||||
|
@ -168,9 +170,21 @@ thread_local!(static STATE: RefCell<Option<RingWakerState>> = RefCell::new(None)
|
|||
thread_local!(static NEW_FUTURES: RefCell<VecDeque<ExecutableFuture<()>>>
|
||||
= RefCell::new(VecDeque::new()));
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord)]
|
||||
struct RegisteredSourceTag(u64);
|
||||
pub struct RegisteredSource(RegisteredSourceTag);
|
||||
// Tracks `RingWakerState` instances and prevents `RegisteredSource`s created by an older executor
|
||||
// from affecting a newer one.
|
||||
static GENERATION: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
pub struct RegisteredSource {
|
||||
generation: u64,
|
||||
tag: usize,
|
||||
|
||||
// Since a `RegisteredSource` is associated with a thread-local executor, it cannot be Send or
|
||||
// Sync. However, negative trait impls are not supported yet so use an Rc, which is neither Send
|
||||
// nor Sync, to poison the struct. TODO: Consider using negative trait impls once
|
||||
// https://github.com/rust-lang/rust/issues/68318 is fixed.
|
||||
_block_send_sync: Rc<()>,
|
||||
}
|
||||
|
||||
impl RegisteredSource {
|
||||
pub fn start_read_to_mem(
|
||||
&self,
|
||||
|
@ -181,7 +195,7 @@ impl RegisteredSource {
|
|||
let token = STATE.with(|state| {
|
||||
let mut state = state.borrow_mut();
|
||||
if let Some(state) = state.as_mut() {
|
||||
state.submit_read_to_vectored(&self.0, mem, file_offset, addrs)
|
||||
state.submit_read_to_vectored(self, mem, file_offset, addrs)
|
||||
} else {
|
||||
Err(Error::InvalidContext)
|
||||
}
|
||||
|
@ -201,7 +215,7 @@ impl RegisteredSource {
|
|||
let token = STATE.with(|state| {
|
||||
let mut state = state.borrow_mut();
|
||||
if let Some(state) = state.as_mut() {
|
||||
state.submit_write_from_vectored(&self.0, mem, file_offset, addrs)
|
||||
state.submit_write_from_vectored(self, mem, file_offset, addrs)
|
||||
} else {
|
||||
Err(Error::InvalidContext)
|
||||
}
|
||||
|
@ -216,7 +230,7 @@ impl RegisteredSource {
|
|||
let token = STATE.with(|state| {
|
||||
let mut state = state.borrow_mut();
|
||||
if let Some(state) = state.as_mut() {
|
||||
state.submit_fallocate(&self.0, offset, len, mode)
|
||||
state.submit_fallocate(self, offset, len, mode)
|
||||
} else {
|
||||
Err(Error::InvalidContext)
|
||||
}
|
||||
|
@ -231,7 +245,7 @@ impl RegisteredSource {
|
|||
let token = STATE.with(|state| {
|
||||
let mut state = state.borrow_mut();
|
||||
if let Some(state) = state.as_mut() {
|
||||
state.submit_fsync(&self.0)
|
||||
state.submit_fsync(self)
|
||||
} else {
|
||||
Err(Error::InvalidContext)
|
||||
}
|
||||
|
@ -247,7 +261,7 @@ impl RegisteredSource {
|
|||
let token = STATE.with(|state| {
|
||||
let mut state = state.borrow_mut();
|
||||
if let Some(state) = state.as_mut() {
|
||||
state.submit_poll(&self.0, &events)
|
||||
state.submit_poll(self, &events)
|
||||
} else {
|
||||
Err(Error::InvalidContext)
|
||||
}
|
||||
|
@ -266,7 +280,7 @@ impl RegisteredSource {
|
|||
|
||||
impl Drop for RegisteredSource {
|
||||
fn drop(&mut self) {
|
||||
let _ = RingWakerState::deregister_source(&self.0);
|
||||
let _ = RingWakerState::deregister_source(self);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -275,27 +289,30 @@ struct OpData {
|
|||
_file: Rc<File>,
|
||||
_mem: Option<Rc<dyn BackingMemory>>,
|
||||
waker: Option<Waker>,
|
||||
canceled: bool,
|
||||
}
|
||||
|
||||
// The current status of an operation that's been submitted to the uring.
|
||||
enum OpStatus {
|
||||
Pending(OpData),
|
||||
Completed(Option<::std::io::Result<u32>>),
|
||||
}
|
||||
|
||||
// Tracks active wakers and associates wakers with the futures that registered them.
|
||||
struct RingWakerState {
|
||||
ctx: URingContext,
|
||||
pending_ops: BTreeMap<WakerToken, OpData>,
|
||||
next_op_token: u64, // Next token for adding to the context.
|
||||
completed_ops: BTreeMap<WakerToken, std::io::Result<u32>>,
|
||||
registered_sources: BTreeMap<RegisteredSourceTag, Rc<File>>,
|
||||
next_source_token: u64, // Next token for registering sources.
|
||||
ops: Slab<OpStatus>,
|
||||
registered_sources: Slab<Rc<File>>,
|
||||
generation: u64,
|
||||
}
|
||||
|
||||
impl RingWakerState {
|
||||
fn new() -> Result<Self> {
|
||||
Ok(RingWakerState {
|
||||
ctx: URingContext::new(256).map_err(Error::CreatingContext)?,
|
||||
pending_ops: BTreeMap::new(),
|
||||
next_op_token: 0,
|
||||
completed_ops: BTreeMap::new(),
|
||||
registered_sources: BTreeMap::new(),
|
||||
next_source_token: 0,
|
||||
ops: Slab::with_capacity(256),
|
||||
registered_sources: Slab::with_capacity(256),
|
||||
generation: GENERATION.fetch_add(1, Ordering::Relaxed),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -306,100 +323,105 @@ impl RingWakerState {
|
|||
// will only be added to the poll loop.
|
||||
File::from_raw_fd(dup_fd(fd.as_raw_fd())?)
|
||||
};
|
||||
let tag = RegisteredSourceTag(state.next_source_token);
|
||||
state
|
||||
.registered_sources
|
||||
.insert(tag.clone(), Rc::new(duped_fd));
|
||||
state.next_source_token += 1;
|
||||
Ok(RegisteredSource(tag))
|
||||
let tag = state.registered_sources.insert(Rc::new(duped_fd));
|
||||
Ok(RegisteredSource {
|
||||
generation: state.generation,
|
||||
tag,
|
||||
_block_send_sync: Rc::new(()),
|
||||
})
|
||||
})?
|
||||
}
|
||||
|
||||
fn deregister_source(tag: &RegisteredSourceTag) {
|
||||
fn deregister_source(source: &RegisteredSource) {
|
||||
// There isn't any need to pull pending ops out, the all have Rc's to the file and mem they
|
||||
// need.let them complete. deregister with pending ops is not a common path no need to
|
||||
// optimize that case yet.
|
||||
let _ = Self::with(|state| {
|
||||
state.registered_sources.remove(tag);
|
||||
if source.generation == state.generation {
|
||||
state.registered_sources.remove(source.tag);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn submit_poll(
|
||||
&mut self,
|
||||
source_tag: &RegisteredSourceTag,
|
||||
source: &RegisteredSource,
|
||||
events: &base::WatchingEvents,
|
||||
) -> Result<WakerToken> {
|
||||
let source = self
|
||||
let src = self
|
||||
.registered_sources
|
||||
.get(source_tag)
|
||||
.get(source.tag)
|
||||
.ok_or(Error::InvalidSource)?;
|
||||
let entry = self.ops.vacant_entry();
|
||||
let next_op_token = entry.key();
|
||||
self.ctx
|
||||
.add_poll_fd(source.as_raw_fd(), events, self.next_op_token)
|
||||
.add_poll_fd(src.as_raw_fd(), events, usize_to_u64(next_op_token))
|
||||
.map_err(Error::SubmittingOp)?;
|
||||
let next_op_token = WakerToken(self.next_op_token);
|
||||
self.pending_ops.insert(
|
||||
next_op_token.clone(),
|
||||
OpData {
|
||||
_file: Rc::clone(&source),
|
||||
_mem: None,
|
||||
waker: None,
|
||||
},
|
||||
);
|
||||
self.next_op_token += 1;
|
||||
Ok(next_op_token)
|
||||
|
||||
entry.insert(OpStatus::Pending(OpData {
|
||||
_file: Rc::clone(&src),
|
||||
_mem: None,
|
||||
waker: None,
|
||||
canceled: false,
|
||||
}));
|
||||
Ok(WakerToken(next_op_token))
|
||||
}
|
||||
|
||||
fn submit_fallocate(
|
||||
&mut self,
|
||||
source_tag: &RegisteredSourceTag,
|
||||
source: &RegisteredSource,
|
||||
offset: u64,
|
||||
len: u64,
|
||||
mode: u32,
|
||||
) -> Result<WakerToken> {
|
||||
let source = self
|
||||
let src = self
|
||||
.registered_sources
|
||||
.get(source_tag)
|
||||
.get(source.tag)
|
||||
.ok_or(Error::InvalidSource)?;
|
||||
let entry = self.ops.vacant_entry();
|
||||
let next_op_token = entry.key();
|
||||
self.ctx
|
||||
.add_fallocate(source.as_raw_fd(), offset, len, mode, self.next_op_token)
|
||||
.add_fallocate(
|
||||
src.as_raw_fd(),
|
||||
offset,
|
||||
len,
|
||||
mode,
|
||||
usize_to_u64(next_op_token),
|
||||
)
|
||||
.map_err(Error::SubmittingOp)?;
|
||||
let next_op_token = WakerToken(self.next_op_token);
|
||||
self.pending_ops.insert(
|
||||
next_op_token.clone(),
|
||||
OpData {
|
||||
_file: Rc::clone(&source),
|
||||
_mem: None,
|
||||
waker: None,
|
||||
},
|
||||
);
|
||||
self.next_op_token += 1;
|
||||
Ok(next_op_token)
|
||||
|
||||
entry.insert(OpStatus::Pending(OpData {
|
||||
_file: Rc::clone(&src),
|
||||
_mem: None,
|
||||
waker: None,
|
||||
canceled: false,
|
||||
}));
|
||||
Ok(WakerToken(next_op_token))
|
||||
}
|
||||
|
||||
fn submit_fsync(&mut self, source_tag: &RegisteredSourceTag) -> Result<WakerToken> {
|
||||
let source = self
|
||||
fn submit_fsync(&mut self, source: &RegisteredSource) -> Result<WakerToken> {
|
||||
let src = self
|
||||
.registered_sources
|
||||
.get(source_tag)
|
||||
.get(source.tag)
|
||||
.ok_or(Error::InvalidSource)?;
|
||||
let entry = self.ops.vacant_entry();
|
||||
let next_op_token = entry.key();
|
||||
self.ctx
|
||||
.add_fsync(source.as_raw_fd(), self.next_op_token)
|
||||
.add_fsync(src.as_raw_fd(), usize_to_u64(next_op_token))
|
||||
.map_err(Error::SubmittingOp)?;
|
||||
let next_op_token = WakerToken(self.next_op_token);
|
||||
self.pending_ops.insert(
|
||||
next_op_token.clone(),
|
||||
OpData {
|
||||
_file: Rc::clone(&source),
|
||||
_mem: None,
|
||||
waker: None,
|
||||
},
|
||||
);
|
||||
self.next_op_token += 1;
|
||||
Ok(next_op_token)
|
||||
|
||||
entry.insert(OpStatus::Pending(OpData {
|
||||
_file: Rc::clone(&src),
|
||||
_mem: None,
|
||||
waker: None,
|
||||
canceled: false,
|
||||
}));
|
||||
Ok(WakerToken(next_op_token))
|
||||
}
|
||||
|
||||
fn submit_read_to_vectored(
|
||||
&mut self,
|
||||
source_tag: &RegisteredSourceTag,
|
||||
source: &RegisteredSource,
|
||||
mem: Rc<dyn BackingMemory>,
|
||||
offset: u64,
|
||||
addrs: &[MemRegion],
|
||||
|
@ -411,40 +433,43 @@ impl RingWakerState {
|
|||
return Err(Error::InvalidOffset);
|
||||
}
|
||||
|
||||
let source = self
|
||||
let src = self
|
||||
.registered_sources
|
||||
.get(source_tag)
|
||||
.get(source.tag)
|
||||
.ok_or(Error::InvalidSource)?;
|
||||
|
||||
// We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
|
||||
let entry = self.ops.vacant_entry();
|
||||
let next_op_token = entry.key();
|
||||
|
||||
// The addresses have already been validated, so unwrapping them will succeed.
|
||||
// validate their addresses before submitting.
|
||||
let iovecs = addrs
|
||||
.iter()
|
||||
.map(|&mem_range| mem.get_iovec(mem_range).unwrap().iovec());
|
||||
|
||||
unsafe {
|
||||
// Safe because all the addresses are within the Memory that an Rc is kept for the
|
||||
// duration to ensure the memory is valid while the kernel accesses it.
|
||||
// Tested by `dont_drop_backing_mem_read` unit test.
|
||||
self.ctx
|
||||
.add_readv_iter(iovecs, source.as_raw_fd(), offset, self.next_op_token)
|
||||
.add_readv_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
|
||||
.map_err(Error::SubmittingOp)?;
|
||||
}
|
||||
let next_op_token = WakerToken(self.next_op_token);
|
||||
self.pending_ops.insert(
|
||||
next_op_token.clone(),
|
||||
OpData {
|
||||
_file: Rc::clone(&source),
|
||||
_mem: Some(mem),
|
||||
waker: None,
|
||||
},
|
||||
);
|
||||
self.next_op_token += 1;
|
||||
Ok(next_op_token)
|
||||
|
||||
entry.insert(OpStatus::Pending(OpData {
|
||||
_file: Rc::clone(&src),
|
||||
_mem: Some(mem),
|
||||
waker: None,
|
||||
canceled: false,
|
||||
}));
|
||||
|
||||
Ok(WakerToken(next_op_token))
|
||||
}
|
||||
|
||||
fn submit_write_from_vectored(
|
||||
&mut self,
|
||||
source_tag: &RegisteredSourceTag,
|
||||
source: &RegisteredSource,
|
||||
mem: Rc<dyn BackingMemory>,
|
||||
offset: u64,
|
||||
addrs: &[MemRegion],
|
||||
|
@ -456,49 +481,63 @@ impl RingWakerState {
|
|||
return Err(Error::InvalidOffset);
|
||||
}
|
||||
|
||||
let source = self
|
||||
let src = self
|
||||
.registered_sources
|
||||
.get(source_tag)
|
||||
.get(source.tag)
|
||||
.ok_or(Error::InvalidSource)?;
|
||||
|
||||
// We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
|
||||
let entry = self.ops.vacant_entry();
|
||||
let next_op_token = entry.key();
|
||||
|
||||
// The addresses have already been validated, so unwrapping them will succeed.
|
||||
// validate their addresses before submitting.
|
||||
let iovecs = addrs
|
||||
.iter()
|
||||
.map(|&mem_range| mem.get_iovec(mem_range).unwrap().iovec());
|
||||
|
||||
unsafe {
|
||||
// Safe because all the addresses are within the Memory that an Rc is kept for the
|
||||
// duration to ensure the memory is valid while the kernel accesses it.
|
||||
// Tested by `dont_drop_backing_mem_write` unit test.
|
||||
self.ctx
|
||||
.add_writev_iter(iovecs, source.as_raw_fd(), offset, self.next_op_token)
|
||||
.add_writev_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
|
||||
.map_err(Error::SubmittingOp)?;
|
||||
}
|
||||
let next_op_token = WakerToken(self.next_op_token);
|
||||
self.pending_ops.insert(
|
||||
next_op_token.clone(),
|
||||
OpData {
|
||||
_file: Rc::clone(&source),
|
||||
_mem: Some(mem),
|
||||
waker: None,
|
||||
},
|
||||
);
|
||||
self.next_op_token += 1;
|
||||
Ok(next_op_token)
|
||||
|
||||
entry.insert(OpStatus::Pending(OpData {
|
||||
_file: Rc::clone(&src),
|
||||
_mem: Some(mem),
|
||||
waker: None,
|
||||
canceled: false,
|
||||
}));
|
||||
|
||||
Ok(WakerToken(next_op_token))
|
||||
}
|
||||
|
||||
// Remove the waker for the given token if it hasn't fired yet.
|
||||
fn cancel_waker(token: &WakerToken) -> Result<()> {
|
||||
fn cancel_waker(token: WakerToken) -> Result<()> {
|
||||
Self::with(|state| {
|
||||
// Clear the waker as its no longer needed, keep the pending_op in the map because the
|
||||
// uring might still be accessing either the source of the backing memory and pending op
|
||||
// will ensure those live until completion.
|
||||
if let Some(op) = state.pending_ops.get_mut(&token) {
|
||||
op.waker = None;
|
||||
if let Some(op) = state.ops.get_mut(token.0) {
|
||||
match op {
|
||||
OpStatus::Pending(data) => {
|
||||
if data.canceled {
|
||||
panic!("uring operation canceled more than once");
|
||||
}
|
||||
|
||||
// Clear the waker as it is no longer needed.
|
||||
data.waker = None;
|
||||
data.canceled = true;
|
||||
|
||||
// Keep the rest of the op data as the uring might still be accessing either
|
||||
// the source of the backing memory so it needs to live until the kernel
|
||||
// completes the operation. TODO: cancel the operation in the uring.
|
||||
}
|
||||
OpStatus::Completed(_) => {
|
||||
state.ops.remove(token.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO - handle canceling ops in the uring
|
||||
// For now the op will complete but the response will be dropped.
|
||||
let _ = state.completed_ops.remove(token);
|
||||
Ok(())
|
||||
})?
|
||||
}
|
||||
|
@ -508,14 +547,33 @@ impl RingWakerState {
|
|||
Self::with(|state| {
|
||||
let events = state.ctx.wait().map_err(Error::URingEnter)?;
|
||||
for (raw_token, result) in events {
|
||||
let token = WakerToken(raw_token);
|
||||
// if the op is still in pending_ops then it hasn't been cancelled and someone is
|
||||
// interested in the result, so save it. Otherwise, drop it.
|
||||
if let Some(op) = state.pending_ops.remove(&token) {
|
||||
if let Some(waker) = op.waker {
|
||||
waker.wake_by_ref();
|
||||
// While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
|
||||
// something that we originally gave to the kernel and that was created from a
|
||||
// `usize` so we should always be able to convert it back into a `usize`.
|
||||
let token = raw_token
|
||||
.try_into()
|
||||
.expect("`u64` doesn't fit inside a `usize`");
|
||||
|
||||
if let Some(op) = state.ops.get_mut(token) {
|
||||
match op {
|
||||
OpStatus::Pending(data) => {
|
||||
if data.canceled {
|
||||
// No one is waiting for this operation and the uring is done with
|
||||
// it so it's safe to remove.
|
||||
state.ops.remove(token);
|
||||
} else {
|
||||
let waker = data.waker.take();
|
||||
*op = OpStatus::Completed(Some(result));
|
||||
|
||||
if let Some(waker) = waker {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
OpStatus::Completed(_) => {
|
||||
panic!("uring operation completed more than once")
|
||||
}
|
||||
}
|
||||
state.completed_ops.insert(token, result);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
@ -524,12 +582,22 @@ impl RingWakerState {
|
|||
|
||||
fn get_result(token: &WakerToken, waker: Waker) -> Result<Option<io::Result<u32>>> {
|
||||
Self::with(|state| {
|
||||
if let Some(result) = state.completed_ops.remove(token) {
|
||||
Some(result)
|
||||
} else {
|
||||
if let Some(op) = state.pending_ops.get_mut(token) {
|
||||
op.waker = Some(waker);
|
||||
if let Some(op) = state.ops.get_mut(token.0) {
|
||||
match op {
|
||||
OpStatus::Pending(data) => {
|
||||
if data.canceled {
|
||||
panic!("`get_result` called on canceled operation");
|
||||
}
|
||||
data.waker = Some(waker);
|
||||
None
|
||||
}
|
||||
OpStatus::Completed(res) => {
|
||||
let out = res.take();
|
||||
state.ops.remove(token.0);
|
||||
out
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
|
@ -617,6 +685,12 @@ unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
|
|||
}
|
||||
}
|
||||
|
||||
// Converts a `usize` into a `u64` and panics if the conversion fails.
|
||||
#[inline]
|
||||
fn usize_to_u64(val: usize) -> u64 {
|
||||
val.try_into().expect("`usize` doesn't fit inside a `u64`")
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PendingOperation {
|
||||
waker_token: Option<WakerToken>,
|
||||
|
@ -639,7 +713,7 @@ impl Future for PendingOperation {
|
|||
impl Drop for PendingOperation {
|
||||
fn drop(&mut self) {
|
||||
if let Some(waker_token) = self.waker_token.take() {
|
||||
let _ = RingWakerState::cancel_waker(&waker_token);
|
||||
let _ = RingWakerState::cancel_waker(waker_token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -647,6 +721,7 @@ impl Drop for PendingOperation {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io::{Read, Write};
|
||||
use std::mem;
|
||||
|
||||
use super::*;
|
||||
use crate::uring_mem::{BackingMemory, MemRegion, VecIoWrapper};
|
||||
|
@ -727,4 +802,93 @@ mod tests {
|
|||
RingWakerState::wait_wake_event().expect("Failed to wait for read pipe ready");
|
||||
assert_eq!(Rc::strong_count(&bm), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn registered_source_outlives_executor() {
|
||||
let bm = Rc::new(VecIoWrapper::from(vec![0u8; 4096])) as Rc<dyn BackingMemory>;
|
||||
let (rx, tx) = base::pipe(true).unwrap();
|
||||
|
||||
// Register a source before creating the executor.
|
||||
let rx_source = register_source(&rx).expect("register source failed");
|
||||
|
||||
let ex = URingExecutor::new(crate::executor::UnitFutures::new()).unwrap();
|
||||
let _pending_op = rx_source
|
||||
.start_read_to_mem(0, Rc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
|
||||
.expect("failed to start read to mem");
|
||||
|
||||
// Now drop the executor without finishing the operation.
|
||||
mem::drop(ex);
|
||||
|
||||
// Register another source.
|
||||
let tx_source = register_source(&tx).expect("register source failed");
|
||||
|
||||
assert!(RingWakerState::with(|state| state
|
||||
.registered_sources
|
||||
.get(tx_source.tag)
|
||||
.is_some())
|
||||
.expect("failed to check registered source"));
|
||||
|
||||
// Since they were created by separate executors, they should both have the same tag.
|
||||
assert_eq!(tx_source.tag, rx_source.tag);
|
||||
|
||||
// Dropping `rx_source` shouldn't affect `tx_source`.
|
||||
mem::drop(rx_source);
|
||||
|
||||
assert!(RingWakerState::with(|state| state
|
||||
.registered_sources
|
||||
.get(tx_source.tag)
|
||||
.is_some())
|
||||
.expect("failed to check registered source"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn canceled_before_completion() {
|
||||
async fn cancel_io(op: PendingOperation) {
|
||||
mem::drop(op);
|
||||
}
|
||||
|
||||
async fn check_result(op: PendingOperation, expected: u32) {
|
||||
let actual = op.await.expect("operation failed to complete");
|
||||
assert_eq!(expected, actual);
|
||||
}
|
||||
|
||||
let bm = Rc::new(VecIoWrapper::from(vec![0u8; 16])) as Rc<dyn BackingMemory>;
|
||||
|
||||
let (rx, tx) = base::pipe(true).expect("Pipe failed");
|
||||
|
||||
let mut ex = URingExecutor::new(crate::executor::UnitFutures::new()).unwrap();
|
||||
|
||||
let rx_source = register_source(&rx).expect("register source failed");
|
||||
let tx_source = register_source(&tx).expect("register source failed");
|
||||
|
||||
let read_op = rx_source
|
||||
.start_read_to_mem(0, Rc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
|
||||
.expect("failed to start read to mem");
|
||||
|
||||
let read_token = read_op
|
||||
.waker_token
|
||||
.as_ref()
|
||||
.map(|t| t.0)
|
||||
.expect("No `WakerToken` in `PendingOperation`");
|
||||
assert!(
|
||||
RingWakerState::with(|state| state.ops.get(read_token).is_some())
|
||||
.expect("Failed to check `RingWakerState` for pending operation")
|
||||
);
|
||||
|
||||
add_future(Box::pin(cancel_io(read_op)));
|
||||
|
||||
// Write to the pipe so that the kernel operation will complete.
|
||||
let buf = Rc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Rc<dyn BackingMemory>;
|
||||
let write_op = tx_source
|
||||
.start_write_from_mem(0, Rc::clone(&buf), &[MemRegion { offset: 0, len: 8 }])
|
||||
.expect("failed to start write from mem");
|
||||
add_future(Box::pin(check_result(write_op, 8)));
|
||||
|
||||
ex.run().expect("Failed to run executor");
|
||||
|
||||
assert!(
|
||||
RingWakerState::with(|state| state.ops.get(read_token).is_none())
|
||||
.expect("Failed to check `RingWakerState` for canceled operation")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,9 +7,9 @@ use std::sync::Arc;
|
|||
|
||||
use futures::task::ArcWake;
|
||||
|
||||
/// Wrapper around a u64 used as a token to uniquely identify a pending waker.
|
||||
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord)]
|
||||
pub(crate) struct WakerToken(pub(crate) u64);
|
||||
/// Wrapper around a usize used as a token to uniquely identify a pending waker.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct WakerToken(pub(crate) usize);
|
||||
|
||||
/// Raw waker used by executors. Associated with a single future and used to indicate whether that
|
||||
/// future needs to be polled.
|
||||
|
|
Loading…
Reference in a new issue