diff --git a/cros_async/src/event.rs b/cros_async/src/event.rs index 852b58eeaa..ad03d77865 100644 --- a/cros_async/src/event.rs +++ b/cros_async/src/event.rs @@ -17,6 +17,13 @@ impl EventAsync { Ok(EventAsync { io_source: new(f)? }) } + /// Like new, but allows the source to be constructed directly. Used for + /// testing only. + #[cfg(test)] + pub(crate) fn new_from_source(io_source: Box + 'static>) -> EventAsync { + EventAsync { io_source } + } + /// Gets the next value from the eventfd. #[allow(dead_code)] pub async fn next_val(&self) -> AsyncResult { @@ -27,6 +34,7 @@ impl EventAsync { #[cfg(test)] mod tests { use super::*; + use crate::io_ext::{new_poll, new_uring}; use base::Event; use futures::pin_mut; @@ -44,4 +52,26 @@ mod tests { let val = crate::run_executor(crate::RunOne::new(fut)).unwrap(); assert_eq!(val, 0xaa); } + + #[test] + fn next_val_reads_value_poll_and_ring() { + async fn go(source: Box + 'static>) -> u64 { + let event_async = EventAsync::new_from_source(source); + event_async.next_val().await.unwrap() + } + + let eventfd = Event::new().unwrap(); + eventfd.write(0xaa).unwrap(); + let fut = go(new_poll(eventfd).unwrap()); + pin_mut!(fut); + let val = crate::run_executor(crate::RunOne::new(fut)).unwrap(); + assert_eq!(val, 0xaa); + + let eventfd = Event::new().unwrap(); + eventfd.write(0xaa).unwrap(); + let fut = go(new_uring(eventfd).unwrap()); + pin_mut!(fut); + let val = crate::run_executor(crate::RunOne::new(fut)).unwrap(); + assert_eq!(val, 0xaa); + } } diff --git a/cros_async/src/io_ext.rs b/cros_async/src/io_ext.rs index ec8abe33e2..0a946e3c12 100644 --- a/cros_async/src/io_ext.rs +++ b/cros_async/src/io_ext.rs @@ -29,9 +29,6 @@ pub enum Error { /// An error with a polled(FD) source. #[error("An error with a poll source: {0}")] Poll(crate::poll_source::Error), - /// An error reading from a wrapped source. - #[error("An error from the source: {0}")] - ReadingInner(isize), /// An error with a uring source. #[error("An error with a uring source: {0}")] Uring(crate::uring_executor::Error), diff --git a/cros_async/src/uring_futures/uring_source.rs b/cros_async/src/uring_futures/uring_source.rs index ef0c3d7e62..15b88b8230 100644 --- a/cros_async/src/uring_futures/uring_source.rs +++ b/cros_async/src/uring_futures/uring_source.rs @@ -2,13 +2,15 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +use std::convert::TryInto; +use std::io; use std::ops::{Deref, DerefMut}; use std::os::unix::io::AsRawFd; use std::rc::Rc; use std::task::{Context, Poll}; use crate::io_source::IoSource; -use crate::uring_executor::{self, PendingOperation, RegisteredSource, Result}; +use crate::uring_executor::{self, Error, PendingOperation, RegisteredSource, Result}; use crate::uring_futures; use crate::uring_mem::{BackingMemory, MemRegion}; use crate::AsyncError; @@ -121,17 +123,19 @@ impl crate::ReadAsync for UringSource { .map_err(AsyncError::Uring) } - /// Reads a single u64 from the current offset. + /// Reads a single u64 (e.g. from an eventfd). async fn read_u64(&self) -> AsyncResult { - crate::IoSourceExt::wait_readable(self).await?; - let mut bytes = 0u64.to_ne_bytes(); - // Safe to read to the buffer of known length. - let ret = - unsafe { libc::read(self.as_raw_fd(), bytes.as_mut_ptr() as *mut _, bytes.len()) }; - if ret < 0 { - return Err(AsyncError::ReadingInner(ret)); + let bytes = 0u64.to_ne_bytes().to_vec(); + let (len, bytes) = self.read_to_vec(0, bytes).await?; + if len != bytes.len() { + Err(AsyncError::Uring(Error::Io(io::Error::new( + io::ErrorKind::Other, + format!("expected to read {} bytes, but read {}", bytes.len(), len), + )))) + } else { + // Will never panic because bytes is of the appropriate size. + Ok(u64::from_ne_bytes(bytes[..].try_into().unwrap())) } - Ok(u64::from_ne_bytes(bytes)) } /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.