mirror of
https://chromium.googlesource.com/crosvm/crosvm
synced 2025-02-11 04:26:38 +00:00
async_core: eventfd: implement a future for the next value
It is useful to have the ability to get a future for only the next value. This is helpful when borrowing the EventFd for the duration of a loop is not feasible. It is also helpful for situations where the future might be dropped. Because dropping a polled eventfd future can leak an FD, and there is no way to implement a custom `Drop` for the future returned by stream, using the new `read_next` is the only way to ensure there aren't any FD leaks if the future might be dropped before completion. TEST=added a unit test that makes use of the new feature and mirrors the existing stream test. cargo test eventfd_write_read Change-Id: I9b20c89be561e4a1ca43f2befc66c16188a91d4b Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2173973 Tested-by: Dylan Reid <dgreid@chromium.org> Tested-by: kokoro <noreply+kokoro@google.com> Commit-Queue: Dylan Reid <dgreid@chromium.org> Reviewed-by: Stephen Barber <smbarber@chromium.org> Reviewed-by: Chirantan Ekbote <chirantan@chromium.org>
This commit is contained in:
parent
4381d04dd9
commit
34c00465d5
1 changed files with 69 additions and 38 deletions
|
@ -2,19 +2,18 @@
|
|||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file.
|
||||
|
||||
use futures::Stream;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt::{self, Display};
|
||||
use std::future::Future;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use libc::{EWOULDBLOCK, O_NONBLOCK};
|
||||
|
||||
use cros_async::fd_executor::{self, add_read_waker, cancel_waker, WakerToken};
|
||||
use sys_util::{self, add_fd_flags};
|
||||
|
||||
use cros_async::fd_executor::{self, add_read_waker};
|
||||
|
||||
/// Errors generated while polling for events.
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
|
@ -50,21 +49,20 @@ impl Display for Error {
|
|||
}
|
||||
}
|
||||
|
||||
/// Asynchronous version of `sys_util::EventFd`. Provides an implementation of `futures::Stream` so
|
||||
/// that events can be consumed in an async context.
|
||||
/// Asynchronous version of `sys_util::EventFd`. Provides asynchronous values that complete when the
|
||||
/// next event can be read from the eventfd.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use std::convert::TryInto;
|
||||
///
|
||||
/// use async_core::{EventFd };
|
||||
/// use futures::StreamExt;
|
||||
/// use async_core::{EventFd};
|
||||
/// use sys_util::{self};
|
||||
///
|
||||
/// async fn process_events() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
/// let mut async_events: EventFd = sys_util::EventFd::new()?.try_into()?;
|
||||
/// while let Some(e) = async_events.next().await {
|
||||
/// while let Ok(e) = async_events.read_next().await {
|
||||
/// // Handle event here.
|
||||
/// }
|
||||
/// Ok(())
|
||||
|
@ -72,13 +70,32 @@ impl Display for Error {
|
|||
/// ```
|
||||
pub struct EventFd {
|
||||
inner: sys_util::EventFd,
|
||||
done: bool,
|
||||
}
|
||||
|
||||
impl EventFd {
|
||||
pub fn new() -> Result<EventFd> {
|
||||
Self::try_from(sys_util::EventFd::new().map_err(Error::EventFdCreate)?)
|
||||
}
|
||||
|
||||
/// Asynchronously read the next value from the eventfd.
|
||||
/// Returns a Future that can be `awaited` for the next value.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use async_core::EventFd;
|
||||
/// async fn print_events(mut event_fd: EventFd) {
|
||||
/// loop {
|
||||
/// match event_fd.read_next().await {
|
||||
/// Ok(e) => println!("Got event: {}", e),
|
||||
/// Err(e) => break,
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub fn read_next(&mut self) -> NextValFuture {
|
||||
NextValFuture::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<sys_util::EventFd> for EventFd {
|
||||
|
@ -87,58 +104,72 @@ impl TryFrom<sys_util::EventFd> for EventFd {
|
|||
fn try_from(eventfd: sys_util::EventFd) -> Result<EventFd> {
|
||||
let fd = eventfd.as_raw_fd();
|
||||
add_fd_flags(fd, O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
|
||||
Ok(EventFd {
|
||||
inner: eventfd,
|
||||
done: false,
|
||||
})
|
||||
Ok(EventFd { inner: eventfd })
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for EventFd {
|
||||
type Item = Result<u64>;
|
||||
/// A Future that yields the next value from the eventfd when it is ready.
|
||||
pub struct NextValFuture<'a> {
|
||||
eventfd: &'a mut EventFd,
|
||||
waker_token: Option<WakerToken>,
|
||||
}
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
if self.done {
|
||||
return Poll::Ready(None);
|
||||
impl<'a> NextValFuture<'a> {
|
||||
fn new(eventfd: &'a mut EventFd) -> NextValFuture<'a> {
|
||||
NextValFuture {
|
||||
eventfd,
|
||||
waker_token: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Future for NextValFuture<'a> {
|
||||
type Output = Result<u64>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
if let Some(token) = self.waker_token.take() {
|
||||
let _ = cancel_waker(token);
|
||||
}
|
||||
|
||||
let res = self
|
||||
.inner
|
||||
.read()
|
||||
.map(|v| Poll::Ready(Some(Ok(v))))
|
||||
.or_else(|e| {
|
||||
if e.errno() == EWOULDBLOCK {
|
||||
add_read_waker(self.inner.as_raw_fd(), cx.waker().clone())
|
||||
.map(|_token| Poll::Pending)
|
||||
.map_err(Error::AddingWaker)
|
||||
} else {
|
||||
Err(Error::EventFdRead(e))
|
||||
}
|
||||
});
|
||||
|
||||
match res {
|
||||
Ok(v) => v,
|
||||
match self.eventfd.inner.read() {
|
||||
Ok(v) => Poll::Ready(Ok(v)),
|
||||
Err(e) => {
|
||||
self.done = true;
|
||||
Poll::Ready(Some(Err(e)))
|
||||
if e.errno() == EWOULDBLOCK {
|
||||
match add_read_waker(self.eventfd.inner.as_raw_fd(), cx.waker().clone()) {
|
||||
Ok(token) => {
|
||||
self.waker_token = Some(token);
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(Error::AddingWaker(e))),
|
||||
}
|
||||
} else {
|
||||
Poll::Ready(Err(Error::EventFdRead(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for NextValFuture<'a> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(token) = self.waker_token.take() {
|
||||
let _ = cancel_waker(token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use cros_async::{select2, SelectResult};
|
||||
use futures::future::pending;
|
||||
use futures::pin_mut;
|
||||
use futures::stream::StreamExt;
|
||||
|
||||
#[test]
|
||||
fn eventfd_write_read() {
|
||||
let evt = EventFd::new().unwrap();
|
||||
async fn read_one(mut evt: EventFd) -> u64 {
|
||||
if let Some(Ok(e)) = evt.next().await {
|
||||
if let Ok(e) = evt.read_next().await {
|
||||
e
|
||||
} else {
|
||||
66
|
||||
|
|
Loading…
Reference in a new issue