mirror of
https://chromium.googlesource.com/crosvm/crosvm
synced 2025-02-11 12:35:26 +00:00
BUG=b:165423256 TEST=./build_test Change-Id: Ia67c1a7fe29b66c9cab38476eecde8c25a55617b Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2442569 Reviewed-by: Daniel Verkamp <dverkamp@chromium.org> Tested-by: kokoro <noreply+kokoro@google.com> Commit-Queue: Michael Hoyle <mikehoyle@google.com>
1130 lines
41 KiB
Rust
1130 lines
41 KiB
Rust
// Copyright 2020 The Chromium OS Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
// This file makes several casts from u8 pointers into more-aligned pointer types.
|
|
// We assume that the kernel will give us suitably aligned memory.
|
|
#![allow(clippy::cast_ptr_alignment)]
|
|
|
|
use std::collections::BTreeMap;
|
|
use std::fmt;
|
|
use std::fs::File;
|
|
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
|
|
use std::pin::Pin;
|
|
use std::ptr::null_mut;
|
|
use std::sync::atomic::{AtomicU32, Ordering};
|
|
|
|
use base::{MappedRegion, MemoryMapping, MemoryMappingBuilder, WatchingEvents};
|
|
|
|
use crate::bindings::*;
|
|
use crate::syscalls::*;
|
|
|
|
/// Holds per-operation, user specified data. The usage is up to the caller. The most common use is
|
|
/// for callers to identify each request.
|
|
pub type UserData = u64;
|
|
|
|
#[derive(Debug)]
|
|
pub enum Error {
|
|
/// The call to `io_uring_enter` failed with the given errno.
|
|
RingEnter(libc::c_int),
|
|
/// The call to `io_uring_setup` failed with the given errno.
|
|
Setup(libc::c_int),
|
|
/// Failed to map the completion ring.
|
|
MappingCompleteRing(base::MmapError),
|
|
/// Failed to map the submit ring.
|
|
MappingSubmitRing(base::MmapError),
|
|
/// Failed to map submit entries.
|
|
MappingSubmitEntries(base::MmapError),
|
|
/// Too many ops are already queued.
|
|
NoSpace,
|
|
}
|
|
pub type Result<T> = std::result::Result<T, Error>;
|
|
|
|
impl fmt::Display for Error {
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
use self::Error::*;
|
|
|
|
match self {
|
|
RingEnter(e) => write!(f, "Failed to enter io uring {}", e),
|
|
Setup(e) => write!(f, "Failed to setup io uring {}", e),
|
|
MappingCompleteRing(e) => write!(f, "Failed to mmap completion ring {}", e),
|
|
MappingSubmitRing(e) => write!(f, "Failed to mmap submit ring {}", e),
|
|
MappingSubmitEntries(e) => write!(f, "Failed to mmap submit entries {}", e),
|
|
NoSpace => write!(
|
|
f,
|
|
"No space for more ring entries, try increasing the size passed to `new`",
|
|
),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Basic statistics about the operations that have been submitted to the uring.
|
|
#[derive(Default)]
|
|
pub struct URingStats {
|
|
total_enter_calls: u64, // Number of times the uring has been entered.
|
|
total_ops: u64, // Total ops submitted to io_uring.
|
|
total_complete: u64, // Total ops completed by io_uring.
|
|
}
|
|
|
|
/// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations
|
|
/// to the kernel and asynchronously handling the completion of these operations.
|
|
/// Use the various `add_*` functions to configure operations, then call `wait` to start
|
|
/// the operations and get any completed results. Each op is given a u64 user_data argument that is
|
|
/// used to identify the result when returned in the iterator provided by `wait`.
|
|
///
|
|
/// # Example polling an FD for readable status.
|
|
///
|
|
/// ```
|
|
/// # use std::fs::File;
|
|
/// # use std::os::unix::io::AsRawFd;
|
|
/// # use std::path::Path;
|
|
/// # use base::WatchingEvents;
|
|
/// # use io_uring::URingContext;
|
|
/// let f = File::open(Path::new("/dev/zero")).unwrap();
|
|
/// let mut uring = URingContext::new(16).unwrap();
|
|
/// uring
|
|
/// .add_poll_fd(f.as_raw_fd(), &WatchingEvents::empty().set_read(), 454)
|
|
/// .unwrap();
|
|
/// let (user_data, res) = uring.wait().unwrap().next().unwrap();
|
|
/// assert_eq!(user_data, 454 as io_uring::UserData);
|
|
/// assert_eq!(res.unwrap(), 1 as u32);
|
|
///
|
|
/// ```
|
|
pub struct URingContext {
|
|
ring_file: File, // Holds the io_uring context FD returned from io_uring_setup.
|
|
submit_ring: SubmitQueueState,
|
|
submit_queue_entries: SubmitQueueEntries,
|
|
complete_ring: CompleteQueueState,
|
|
io_vecs: Pin<Box<[libc::iovec]>>,
|
|
in_flight: usize, // The number of pending operations.
|
|
added: usize, // The number of ops added since the last call to `io_uring_enter`.
|
|
num_sqes: usize, // The total number of sqes allocated in shared memory.
|
|
stats: URingStats,
|
|
}
|
|
|
|
impl URingContext {
|
|
/// Creates a `URingContext` where the underlying uring has a space for `num_entries`
|
|
/// simultaneous operations.
|
|
pub fn new(num_entries: usize) -> Result<URingContext> {
|
|
let ring_params = io_uring_params::default();
|
|
// The below unsafe block isolates the creation of the URingContext. Each step on it's own
|
|
// is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for
|
|
// base addresses maintains safety guarantees assuming the kernel API guarantees are
|
|
// trusted.
|
|
unsafe {
|
|
// Safe because the kernel is trusted to only modify params and `File` is created with
|
|
// an FD that it takes complete ownership of.
|
|
let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?;
|
|
let ring_file = File::from_raw_fd(fd);
|
|
|
|
// Mmap the submit and completion queues.
|
|
// Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error
|
|
// is checked.
|
|
let submit_ring = SubmitQueueState::new(
|
|
MemoryMappingBuilder::new(
|
|
ring_params.sq_off.array as usize
|
|
+ ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
|
|
)
|
|
.from_descriptor(&ring_file)
|
|
.offset(u64::from(IORING_OFF_SQ_RING))
|
|
.populate()
|
|
.build()
|
|
.map_err(Error::MappingSubmitRing)?,
|
|
&ring_params,
|
|
);
|
|
|
|
let num_sqe = ring_params.sq_entries as usize;
|
|
let submit_queue_entries = SubmitQueueEntries {
|
|
mmap: MemoryMappingBuilder::new(
|
|
ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
|
|
)
|
|
.from_descriptor(&ring_file)
|
|
.offset(u64::from(IORING_OFF_SQES))
|
|
.populate()
|
|
.build()
|
|
.map_err(Error::MappingSubmitEntries)?,
|
|
len: num_sqe,
|
|
};
|
|
|
|
let complete_ring = CompleteQueueState::new(
|
|
MemoryMappingBuilder::new(
|
|
ring_params.cq_off.cqes as usize
|
|
+ ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
|
|
)
|
|
.from_descriptor(&ring_file)
|
|
.offset(u64::from(IORING_OFF_CQ_RING))
|
|
.populate()
|
|
.build()
|
|
.map_err(Error::MappingCompleteRing)?,
|
|
&ring_params,
|
|
);
|
|
|
|
Ok(URingContext {
|
|
ring_file,
|
|
submit_ring,
|
|
submit_queue_entries,
|
|
complete_ring,
|
|
io_vecs: Pin::from(
|
|
vec![
|
|
libc::iovec {
|
|
iov_base: null_mut(),
|
|
iov_len: 0
|
|
};
|
|
num_sqe
|
|
]
|
|
.into_boxed_slice(),
|
|
),
|
|
added: 0,
|
|
num_sqes: ring_params.sq_entries as usize,
|
|
in_flight: 0,
|
|
stats: Default::default(),
|
|
})
|
|
}
|
|
}
|
|
|
|
// Call `f` with the next available sqe or return an error if none are available.
|
|
// After `f` returns, the sqe is appended to the kernel's queue.
|
|
fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
|
|
where
|
|
F: FnMut(&mut io_uring_sqe, &mut libc::iovec),
|
|
{
|
|
if self.added == self.num_sqes {
|
|
return Err(Error::NoSpace);
|
|
}
|
|
|
|
// Find the next free submission entry in the submit ring and fill it with an iovec.
|
|
// The below raw pointer derefs are safe because the memory the pointers use lives as long
|
|
// as the mmap in self.
|
|
let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
|
|
let next_tail = tail.wrapping_add(1);
|
|
if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
|
|
return Err(Error::NoSpace);
|
|
}
|
|
// `tail` is the next sqe to use.
|
|
let index = (tail & self.submit_ring.ring_mask) as usize;
|
|
let sqe = self.submit_queue_entries.get_mut(index).unwrap();
|
|
|
|
f(sqe, &mut self.io_vecs[index]);
|
|
|
|
// Tells the kernel to use the new index when processing the entry at that index.
|
|
self.submit_ring.set_array_entry(index, index as u32);
|
|
// Ensure the above writes to sqe are seen before the tail is updated.
|
|
// set_tail uses Release ordering when storing to the ring.
|
|
self.submit_ring.pointers.set_tail(next_tail);
|
|
|
|
self.added += 1;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
unsafe fn add_rw_op(
|
|
&mut self,
|
|
ptr: *const u8,
|
|
len: usize,
|
|
fd: RawFd,
|
|
offset: u64,
|
|
user_data: UserData,
|
|
op: u8,
|
|
) -> Result<()> {
|
|
self.prep_next_sqe(|sqe, iovec| {
|
|
iovec.iov_base = ptr as *const libc::c_void as *mut _;
|
|
iovec.iov_len = len;
|
|
sqe.opcode = op;
|
|
sqe.addr = iovec as *const _ as *const libc::c_void as u64;
|
|
sqe.len = 1;
|
|
sqe.__bindgen_anon_1.off = offset;
|
|
sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
|
|
sqe.ioprio = 0;
|
|
sqe.user_data = user_data;
|
|
sqe.flags = 0;
|
|
sqe.fd = fd;
|
|
})?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Asynchronously writes to `fd` from the address given in `ptr`.
|
|
/// # Safety
|
|
/// `add_write` will write up to `len` bytes of data from the address given by `ptr`. This is
|
|
/// only safe if the caller guarantees that the memory lives until the transaction is complete
|
|
/// and that completion has been returned from the `wait` function. In addition there must not
|
|
/// be other references to the data pointed to by `ptr` until the operation completes. Ensure
|
|
/// that the fd remains open until the op completes as well.
|
|
pub unsafe fn add_write(
|
|
&mut self,
|
|
ptr: *const u8,
|
|
len: usize,
|
|
fd: RawFd,
|
|
offset: u64,
|
|
user_data: UserData,
|
|
) -> Result<()> {
|
|
self.add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_WRITEV as u8)
|
|
}
|
|
|
|
/// Asynchronously reads from `fd` to the address given in `ptr`.
|
|
/// # Safety
|
|
/// `add_read` will write up to `len` bytes of data to the address given by `ptr`. This is only
|
|
/// safe if the caller guarantees there are no other references to that memory and that the
|
|
/// memory lives until the transaction is complete and that completion has been returned from
|
|
/// the `wait` function. In addition there must not be any mutable references to the data
|
|
/// pointed to by `ptr` until the operation completes. Ensure that the fd remains open until
|
|
/// the op completes as well.
|
|
pub unsafe fn add_read(
|
|
&mut self,
|
|
ptr: *mut u8,
|
|
len: usize,
|
|
fd: RawFd,
|
|
offset: u64,
|
|
user_data: UserData,
|
|
) -> Result<()> {
|
|
self.add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_READV as u8)
|
|
}
|
|
|
|
/// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in
|
|
/// existence.
|
|
pub unsafe fn add_writev_iter<I>(
|
|
&mut self,
|
|
iovecs: I,
|
|
fd: RawFd,
|
|
offset: u64,
|
|
user_data: UserData,
|
|
) -> Result<()>
|
|
where
|
|
I: Iterator<Item = libc::iovec>,
|
|
{
|
|
self.add_writev(
|
|
Pin::from(iovecs.collect::<Vec<_>>().into_boxed_slice()),
|
|
fd,
|
|
offset,
|
|
user_data,
|
|
)
|
|
}
|
|
|
|
/// Asynchronously writes to `fd` from the addresses given in `iovecs`.
|
|
/// # Safety
|
|
/// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller
|
|
/// guarantees there are no other references to that memory and that the memory lives until the
|
|
/// transaction is complete and that completion has been returned from the `wait` function. In
|
|
/// addition there must not be any mutable references to the data pointed to by `iovecs` until
|
|
/// the operation completes. Ensure that the fd remains open until the op completes as well.
|
|
/// The iovecs reference must be kept alive until the op returns.
|
|
pub unsafe fn add_writev(
|
|
&mut self,
|
|
iovecs: Pin<Box<[libc::iovec]>>,
|
|
fd: RawFd,
|
|
offset: u64,
|
|
user_data: UserData,
|
|
) -> Result<()> {
|
|
self.prep_next_sqe(|sqe, _iovec| {
|
|
sqe.opcode = IORING_OP_WRITEV as u8;
|
|
sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64;
|
|
sqe.len = iovecs.len() as u32;
|
|
sqe.__bindgen_anon_1.off = offset;
|
|
sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
|
|
sqe.ioprio = 0;
|
|
sqe.user_data = user_data;
|
|
sqe.flags = 0;
|
|
sqe.fd = fd;
|
|
})?;
|
|
self.complete_ring.add_op_data(user_data, iovecs);
|
|
Ok(())
|
|
}
|
|
|
|
/// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in
|
|
/// existence.
|
|
pub unsafe fn add_readv_iter<I>(
|
|
&mut self,
|
|
iovecs: I,
|
|
fd: RawFd,
|
|
offset: u64,
|
|
user_data: UserData,
|
|
) -> Result<()>
|
|
where
|
|
I: Iterator<Item = libc::iovec>,
|
|
{
|
|
self.add_readv(
|
|
Pin::from(iovecs.collect::<Vec<_>>().into_boxed_slice()),
|
|
fd,
|
|
offset,
|
|
user_data,
|
|
)
|
|
}
|
|
|
|
/// Asynchronously reads from `fd` to the addresses given in `iovecs`.
|
|
/// # Safety
|
|
/// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller
|
|
/// guarantees there are no other references to that memory and that the memory lives until the
|
|
/// transaction is complete and that completion has been returned from the `wait` function. In
|
|
/// addition there must not be any references to the data pointed to by `iovecs` until the
|
|
/// operation completes. Ensure that the fd remains open until the op completes as well.
|
|
/// The iovecs reference must be kept alive until the op returns.
|
|
pub unsafe fn add_readv(
|
|
&mut self,
|
|
iovecs: Pin<Box<[libc::iovec]>>,
|
|
fd: RawFd,
|
|
offset: u64,
|
|
user_data: UserData,
|
|
) -> Result<()> {
|
|
self.prep_next_sqe(|sqe, _iovec| {
|
|
sqe.opcode = IORING_OP_READV as u8;
|
|
sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64;
|
|
sqe.len = iovecs.len() as u32;
|
|
sqe.__bindgen_anon_1.off = offset;
|
|
sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
|
|
sqe.ioprio = 0;
|
|
sqe.user_data = user_data;
|
|
sqe.flags = 0;
|
|
sqe.fd = fd;
|
|
})?;
|
|
self.complete_ring.add_op_data(user_data, iovecs);
|
|
Ok(())
|
|
}
|
|
|
|
/// Syncs all completed operations, the ordering with in-flight async ops is not
|
|
/// defined.
|
|
pub fn add_fsync(&mut self, fd: RawFd, user_data: UserData) -> Result<()> {
|
|
self.prep_next_sqe(|sqe, _iovec| {
|
|
sqe.opcode = IORING_OP_FSYNC as u8;
|
|
sqe.fd = fd;
|
|
sqe.user_data = user_data;
|
|
|
|
sqe.addr = 0;
|
|
sqe.len = 0;
|
|
sqe.__bindgen_anon_1.off = 0;
|
|
sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
|
|
sqe.__bindgen_anon_2.rw_flags = 0;
|
|
sqe.ioprio = 0;
|
|
sqe.flags = 0;
|
|
})
|
|
}
|
|
|
|
/// See the usage of `fallocate`, this asynchronously performs the same operations.
|
|
pub fn add_fallocate(
|
|
&mut self,
|
|
fd: RawFd,
|
|
offset: u64,
|
|
len: u64,
|
|
mode: u32,
|
|
user_data: UserData,
|
|
) -> Result<()> {
|
|
// Note that len for fallocate in passed in the addr field of the sqe and the mode uses the
|
|
// len field.
|
|
self.prep_next_sqe(|sqe, _iovec| {
|
|
sqe.opcode = IORING_OP_FALLOCATE as u8;
|
|
|
|
sqe.fd = fd;
|
|
sqe.addr = len;
|
|
sqe.len = mode;
|
|
sqe.__bindgen_anon_1.off = offset;
|
|
sqe.user_data = user_data;
|
|
|
|
sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
|
|
sqe.__bindgen_anon_2.rw_flags = 0;
|
|
sqe.ioprio = 0;
|
|
sqe.flags = 0;
|
|
})
|
|
}
|
|
|
|
/// Adds an FD to be polled based on the given flags.
|
|
/// The user must keep the FD open until the operation completion is returned from
|
|
/// `wait`.
|
|
/// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added
|
|
/// to get future events.
|
|
pub fn add_poll_fd(
|
|
&mut self,
|
|
fd: RawFd,
|
|
events: &WatchingEvents,
|
|
user_data: UserData,
|
|
) -> Result<()> {
|
|
self.prep_next_sqe(|sqe, _iovec| {
|
|
sqe.opcode = IORING_OP_POLL_ADD as u8;
|
|
sqe.fd = fd;
|
|
sqe.user_data = user_data;
|
|
sqe.__bindgen_anon_2.poll_events = events.get_raw() as u16;
|
|
|
|
sqe.addr = 0;
|
|
sqe.len = 0;
|
|
sqe.__bindgen_anon_1.off = 0;
|
|
sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
|
|
sqe.ioprio = 0;
|
|
sqe.flags = 0;
|
|
})
|
|
}
|
|
|
|
/// Removes an FD that was previously added with `add_poll_fd`.
|
|
pub fn remove_poll_fd(
|
|
&mut self,
|
|
fd: RawFd,
|
|
events: &WatchingEvents,
|
|
user_data: UserData,
|
|
) -> Result<()> {
|
|
self.prep_next_sqe(|sqe, _iovec| {
|
|
sqe.opcode = IORING_OP_POLL_REMOVE as u8;
|
|
sqe.fd = fd;
|
|
sqe.user_data = user_data;
|
|
sqe.__bindgen_anon_2.poll_events = events.get_raw() as u16;
|
|
|
|
sqe.addr = 0;
|
|
sqe.len = 0;
|
|
sqe.__bindgen_anon_1.off = 0;
|
|
sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
|
|
sqe.ioprio = 0;
|
|
sqe.flags = 0;
|
|
})
|
|
}
|
|
|
|
/// Sends operations added with the `add_*` functions to the kernel.
|
|
pub fn submit(&mut self) -> Result<()> {
|
|
self.in_flight += self.added;
|
|
self.stats.total_ops = self.stats.total_ops.wrapping_add(self.added as u64);
|
|
if self.added > 0 {
|
|
self.stats.total_enter_calls = self.stats.total_enter_calls.wrapping_add(1);
|
|
unsafe {
|
|
// Safe because the only memory modified is in the completion queue.
|
|
io_uring_enter(self.ring_file.as_raw_fd(), self.added as u64, 0, 0)
|
|
.map_err(Error::RingEnter)?;
|
|
}
|
|
}
|
|
self.added = 0;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Sends operations added with the `add_*` functions to the kernel and return an iterator to any
|
|
/// completed operations. `wait` blocks until at least one completion is ready. If called
|
|
/// without any new events added, this simply waits for any existing events to complete and
|
|
/// returns as soon an one or more is ready.
|
|
pub fn wait<'a>(
|
|
&'a mut self,
|
|
) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + 'a> {
|
|
let completed = self.complete_ring.num_completed();
|
|
self.stats.total_complete = self.stats.total_complete.wrapping_add(completed as u64);
|
|
self.in_flight -= completed;
|
|
self.stats.total_ops = self.stats.total_ops.wrapping_add(self.added as u64);
|
|
if self.in_flight > 0 || self.added > 0 {
|
|
unsafe {
|
|
self.stats.total_enter_calls = self.stats.total_enter_calls.wrapping_add(1);
|
|
// Safe because the only memory modified is in the completion queue.
|
|
let ret = io_uring_enter(
|
|
self.ring_file.as_raw_fd(),
|
|
self.added as u64,
|
|
1,
|
|
IORING_ENTER_GETEVENTS,
|
|
);
|
|
match ret {
|
|
Ok(_) => {
|
|
self.in_flight += self.added;
|
|
self.added = 0;
|
|
}
|
|
Err(e) => {
|
|
if e != libc::EBUSY {
|
|
return Err(Error::RingEnter(e));
|
|
}
|
|
// An ebusy return means that some completed events must be processed before
|
|
// submitting more, wait for some to finish without pushing the new sqes in
|
|
// that case.
|
|
io_uring_enter(self.ring_file.as_raw_fd(), 0, 1, IORING_ENTER_GETEVENTS)
|
|
.map_err(Error::RingEnter)?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// The CompletionQueue will iterate all completed ops.
|
|
Ok(&mut self.complete_ring)
|
|
}
|
|
}
|
|
|
|
impl AsRawFd for URingContext {
|
|
fn as_raw_fd(&self) -> RawFd {
|
|
self.ring_file.as_raw_fd()
|
|
}
|
|
}
|
|
|
|
struct SubmitQueueEntries {
|
|
mmap: MemoryMapping,
|
|
len: usize,
|
|
}
|
|
|
|
impl SubmitQueueEntries {
|
|
fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> {
|
|
if index >= self.len {
|
|
return None;
|
|
}
|
|
let mut_ref = unsafe {
|
|
// Safe because the mut borrow of self resticts to one mutable reference at a time and
|
|
// we trust that the kernel has returned enough memory in io_uring_setup and mmap.
|
|
&mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index)
|
|
};
|
|
// Clear any state.
|
|
*mut_ref = io_uring_sqe::default();
|
|
Some(mut_ref)
|
|
}
|
|
}
|
|
|
|
struct SubmitQueueState {
|
|
_mmap: MemoryMapping,
|
|
pointers: QueuePointers,
|
|
ring_mask: u32,
|
|
array: *mut u32,
|
|
}
|
|
|
|
impl SubmitQueueState {
|
|
// # Safety
|
|
// Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is
|
|
// the params struct passed to io_uring_setup.
|
|
unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState {
|
|
let ptr = mmap.as_ptr();
|
|
// Transmutes are safe because a u32 is atomic on all supported architectures and the
|
|
// pointer will live until after self is dropped because the mmap is owned.
|
|
let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32;
|
|
let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32;
|
|
// This offset is guaranteed to be within the mmap so unwrap the result.
|
|
let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap();
|
|
let array = ptr.add(params.sq_off.array as usize) as *mut u32;
|
|
SubmitQueueState {
|
|
_mmap: mmap,
|
|
pointers: QueuePointers { head, tail },
|
|
ring_mask,
|
|
array,
|
|
}
|
|
}
|
|
|
|
// Sets the kernel's array entry at the given `index` to `value`.
|
|
fn set_array_entry(&self, index: usize, value: u32) {
|
|
// Safe because self being constructed from the correct mmap guaratees that the memory is
|
|
// valid to written.
|
|
unsafe {
|
|
std::ptr::write_volatile(self.array.add(index), value as u32);
|
|
}
|
|
}
|
|
}
|
|
|
|
struct CompleteQueueState {
|
|
mmap: MemoryMapping,
|
|
pointers: QueuePointers,
|
|
ring_mask: u32,
|
|
cqes_offset: u32,
|
|
completed: usize,
|
|
//For ops that pass in arrays of iovecs, they need to be valid for the duration of the
|
|
//operation because the kernel might read them at any time.
|
|
pending_op_addrs: BTreeMap<UserData, Pin<Box<[libc::iovec]>>>,
|
|
}
|
|
|
|
impl CompleteQueueState {
|
|
/// # Safety
|
|
/// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is
|
|
/// the params struct passed to io_uring_setup.
|
|
unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState {
|
|
let ptr = mmap.as_ptr();
|
|
let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32;
|
|
let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32;
|
|
let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap();
|
|
CompleteQueueState {
|
|
mmap,
|
|
pointers: QueuePointers { head, tail },
|
|
ring_mask,
|
|
cqes_offset: params.cq_off.cqes,
|
|
completed: 0,
|
|
pending_op_addrs: BTreeMap::new(),
|
|
}
|
|
}
|
|
|
|
fn add_op_data(&mut self, user_data: UserData, addrs: Pin<Box<[libc::iovec]>>) {
|
|
self.pending_op_addrs.insert(user_data, addrs);
|
|
}
|
|
|
|
fn get_cqe(&self, head: u32) -> &io_uring_cqe {
|
|
unsafe {
|
|
// Safe because we trust that the kernel has returned enough memory in io_uring_setup
|
|
// and mmap and index is checked within range by the ring_mask.
|
|
let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize)
|
|
as *const io_uring_cqe;
|
|
|
|
let index = head & self.ring_mask;
|
|
|
|
&*cqes.add(index as usize)
|
|
}
|
|
}
|
|
|
|
fn num_completed(&mut self) -> usize {
|
|
std::mem::replace(&mut self.completed, 0)
|
|
}
|
|
}
|
|
|
|
// Return the completed ops with their result.
|
|
impl Iterator for CompleteQueueState {
|
|
type Item = (UserData, std::io::Result<u32>);
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
// Safe because the pointers to the atomics are valid and the cqe must be in range
|
|
// because the kernel provided mask is applied to the index.
|
|
let head = self.pointers.head(Ordering::Relaxed);
|
|
|
|
// Synchronize the read of tail after the read of head.
|
|
if head == self.pointers.tail(Ordering::Acquire) {
|
|
return None;
|
|
}
|
|
|
|
self.completed += 1;
|
|
|
|
let cqe = self.get_cqe(head);
|
|
let user_data = cqe.user_data;
|
|
let res = cqe.res;
|
|
|
|
// free the addrs saved for this op.
|
|
let _ = self.pending_op_addrs.remove(&user_data);
|
|
|
|
// Store the new head and ensure the reads above complete before the kernel sees the
|
|
// update to head, `set_head` uses `Release` ordering
|
|
let new_head = head.wrapping_add(1);
|
|
self.pointers.set_head(new_head);
|
|
|
|
let io_res = match res {
|
|
r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)),
|
|
r => Ok(r as u32),
|
|
};
|
|
Some((user_data, io_res))
|
|
}
|
|
}
|
|
|
|
struct QueuePointers {
|
|
head: *const AtomicU32,
|
|
tail: *const AtomicU32,
|
|
}
|
|
|
|
impl QueuePointers {
|
|
// Loads the tail pointer atomically with the given ordering.
|
|
fn tail(&self, ordering: Ordering) -> u32 {
|
|
// Safe because self being constructed from the correct mmap guaratees that the memory is
|
|
// valid to read.
|
|
unsafe { (*self.tail).load(ordering) }
|
|
}
|
|
|
|
// Stores the new value of the tail in the submit queue. This allows the kernel to start
|
|
// processing entries that have been added up until the given tail pointer.
|
|
// Always stores with release ordering as that is the only valid way to use the pointer.
|
|
fn set_tail(&self, next_tail: u32) {
|
|
// Safe because self being constructed from the correct mmap guaratees that the memory is
|
|
// valid to read and it's used as an atomic to cover mutability concerns.
|
|
unsafe { (*self.tail).store(next_tail, Ordering::Release) }
|
|
}
|
|
|
|
// Loads the head pointer atomically with the given ordering.
|
|
fn head(&self, ordering: Ordering) -> u32 {
|
|
// Safe because self being constructed from the correct mmap guaratees that the memory is
|
|
// valid to read.
|
|
unsafe { (*self.head).load(ordering) }
|
|
}
|
|
|
|
// Stores the new value of the head in the submit queue. This allows the kernel to start
|
|
// processing entries that have been added up until the given head pointer.
|
|
// Always stores with release ordering as that is the only valid way to use the pointer.
|
|
fn set_head(&self, next_head: u32) {
|
|
// Safe because self being constructed from the correct mmap guaratees that the memory is
|
|
// valid to read and it's used as an atomic to cover mutability concerns.
|
|
unsafe { (*self.head).store(next_head, Ordering::Release) }
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::fs::OpenOptions;
|
|
use std::io::{IoSlice, IoSliceMut};
|
|
use std::io::{Read, Seek, SeekFrom, Write};
|
|
use std::path::{Path, PathBuf};
|
|
use std::time::Duration;
|
|
|
|
use base::PollContext;
|
|
use tempfile::{tempfile, TempDir};
|
|
|
|
use super::*;
|
|
|
|
fn append_file_name(path: &Path, name: &str) -> PathBuf {
|
|
let mut joined = path.to_path_buf();
|
|
joined.push(name);
|
|
joined
|
|
}
|
|
|
|
fn check_one_read(
|
|
uring: &mut URingContext,
|
|
buf: &mut [u8],
|
|
fd: RawFd,
|
|
offset: u64,
|
|
user_data: UserData,
|
|
) {
|
|
let (user_data_ret, res) = unsafe {
|
|
// Safe because the `wait` call waits until the kernel is done with `buf`.
|
|
uring
|
|
.add_read(buf.as_mut_ptr(), buf.len(), fd, offset, user_data)
|
|
.unwrap();
|
|
uring.wait().unwrap().next().unwrap()
|
|
};
|
|
assert_eq!(user_data_ret, user_data);
|
|
assert_eq!(res.unwrap(), buf.len() as u32);
|
|
}
|
|
|
|
fn check_one_readv(
|
|
uring: &mut URingContext,
|
|
buf: &mut [u8],
|
|
fd: RawFd,
|
|
offset: u64,
|
|
user_data: UserData,
|
|
) {
|
|
let io_vecs = unsafe {
|
|
//safe to transmut from IoSlice to iovec.
|
|
vec![IoSliceMut::new(buf)]
|
|
.into_iter()
|
|
.map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice))
|
|
};
|
|
let (user_data_ret, res) = unsafe {
|
|
// Safe because the `wait` call waits until the kernel is done with `buf`.
|
|
uring
|
|
.add_readv_iter(io_vecs, fd, offset, user_data)
|
|
.unwrap();
|
|
uring.wait().unwrap().next().unwrap()
|
|
};
|
|
assert_eq!(user_data_ret, user_data);
|
|
assert_eq!(res.unwrap(), buf.len() as u32);
|
|
}
|
|
|
|
fn create_test_file(size: u64) -> std::fs::File {
|
|
let f = tempfile().unwrap();
|
|
f.set_len(size).unwrap();
|
|
f
|
|
}
|
|
|
|
#[test]
|
|
// Queue as many reads as possible and then collect the completions.
|
|
fn read_parallel() {
|
|
const QUEUE_SIZE: usize = 10;
|
|
const BUF_SIZE: usize = 0x1000;
|
|
|
|
let mut uring = URingContext::new(QUEUE_SIZE).unwrap();
|
|
let mut buf = [0u8; BUF_SIZE * QUEUE_SIZE];
|
|
let f = create_test_file((BUF_SIZE * QUEUE_SIZE) as u64);
|
|
|
|
// check that the whole file can be read and that the queues wrapping is handled by reading
|
|
// double the quue depth of buffers.
|
|
for i in 0..QUEUE_SIZE * 64 {
|
|
let index = i as u64;
|
|
unsafe {
|
|
let offset = (i % QUEUE_SIZE) * BUF_SIZE;
|
|
match uring.add_read(
|
|
buf[offset..].as_mut_ptr(),
|
|
BUF_SIZE,
|
|
f.as_raw_fd(),
|
|
offset as u64,
|
|
index,
|
|
) {
|
|
Ok(_) => (),
|
|
Err(Error::NoSpace) => {
|
|
let _ = uring.wait().unwrap().next().unwrap();
|
|
}
|
|
Err(_) => panic!("unexpected error from uring wait"),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn read_readv() {
|
|
let queue_size = 128;
|
|
|
|
let mut uring = URingContext::new(queue_size).unwrap();
|
|
let mut buf = [0u8; 0x1000];
|
|
let f = create_test_file(0x1000 * 2);
|
|
|
|
// check that the whole file can be read and that the queues wrapping is handled by reading
|
|
// double the quue depth of buffers.
|
|
for i in 0..queue_size * 2 {
|
|
let index = i as u64;
|
|
check_one_read(
|
|
&mut uring,
|
|
&mut buf,
|
|
f.as_raw_fd(),
|
|
(index % 2) * 0x1000,
|
|
index,
|
|
);
|
|
check_one_readv(
|
|
&mut uring,
|
|
&mut buf,
|
|
f.as_raw_fd(),
|
|
(index % 2) * 0x1000,
|
|
index,
|
|
);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn readv_vec() {
|
|
let queue_size = 128;
|
|
const BUF_SIZE: usize = 0x2000;
|
|
|
|
let mut uring = URingContext::new(queue_size).unwrap();
|
|
let mut buf = [0u8; BUF_SIZE];
|
|
let mut buf2 = [0u8; BUF_SIZE];
|
|
let mut buf3 = [0u8; BUF_SIZE];
|
|
let io_vecs = unsafe {
|
|
//safe to transmut from IoSlice to iovec.
|
|
vec![
|
|
IoSliceMut::new(&mut buf),
|
|
IoSliceMut::new(&mut buf2),
|
|
IoSliceMut::new(&mut buf3),
|
|
]
|
|
.into_iter()
|
|
.map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice))
|
|
.collect::<Vec<libc::iovec>>()
|
|
};
|
|
let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len);
|
|
let f = create_test_file(total_len as u64 * 2);
|
|
let (user_data_ret, res) = unsafe {
|
|
// Safe because the `wait` call waits until the kernel is done with `buf`.
|
|
uring
|
|
.add_readv_iter(io_vecs.into_iter(), f.as_raw_fd(), 0, 55)
|
|
.unwrap();
|
|
uring.wait().unwrap().next().unwrap()
|
|
};
|
|
assert_eq!(user_data_ret, 55);
|
|
assert_eq!(res.unwrap(), total_len as u32);
|
|
}
|
|
|
|
#[test]
|
|
fn write_one_block() {
|
|
let mut uring = URingContext::new(16).unwrap();
|
|
let mut buf = [0u8; 4096];
|
|
let mut f = create_test_file(0);
|
|
f.write(&buf).unwrap();
|
|
f.write(&buf).unwrap();
|
|
|
|
unsafe {
|
|
// Safe because the `wait` call waits until the kernel is done mutating `buf`.
|
|
uring
|
|
.add_write(buf.as_mut_ptr(), buf.len(), f.as_raw_fd(), 0, 55)
|
|
.unwrap();
|
|
let (user_data, res) = uring.wait().unwrap().next().unwrap();
|
|
assert_eq!(user_data, 55 as UserData);
|
|
assert_eq!(res.unwrap(), buf.len() as u32);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn write_one_submit_poll() {
|
|
let mut uring = URingContext::new(16).unwrap();
|
|
let mut buf = [0u8; 4096];
|
|
let mut f = create_test_file(0);
|
|
f.write(&buf).unwrap();
|
|
f.write(&buf).unwrap();
|
|
|
|
let ctx: PollContext<u64> = PollContext::build_with(&[(&uring, 1)]).unwrap();
|
|
{
|
|
// Test that the uring context isn't readable before any events are complete.
|
|
let events = ctx.wait_timeout(Duration::from_millis(1)).unwrap();
|
|
assert!(events.iter_readable().next().is_none());
|
|
}
|
|
|
|
unsafe {
|
|
// Safe because the `wait` call waits until the kernel is done mutating `buf`.
|
|
uring
|
|
.add_write(buf.as_mut_ptr(), buf.len(), f.as_raw_fd(), 0, 55)
|
|
.unwrap();
|
|
uring.submit().unwrap();
|
|
// Poll for completion with epoll.
|
|
let events = ctx.wait().unwrap();
|
|
let event = events.iter_readable().next().unwrap();
|
|
assert_eq!(event.token(), 1);
|
|
let (user_data, res) = uring.wait().unwrap().next().unwrap();
|
|
assert_eq!(user_data, 55 as UserData);
|
|
assert_eq!(res.unwrap(), buf.len() as u32);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn writev_vec() {
|
|
let queue_size = 128;
|
|
const BUF_SIZE: usize = 0x2000;
|
|
const OFFSET: u64 = 0x2000;
|
|
|
|
let mut uring = URingContext::new(queue_size).unwrap();
|
|
let buf = [0xaau8; BUF_SIZE];
|
|
let buf2 = [0xffu8; BUF_SIZE];
|
|
let buf3 = [0x55u8; BUF_SIZE];
|
|
let io_vecs = unsafe {
|
|
//safe to transmut from IoSlice to iovec.
|
|
vec![IoSlice::new(&buf), IoSlice::new(&buf2), IoSlice::new(&buf3)]
|
|
.into_iter()
|
|
.map(|slice| std::mem::transmute::<IoSlice, libc::iovec>(slice))
|
|
.collect::<Vec<libc::iovec>>()
|
|
};
|
|
let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len);
|
|
let mut f = create_test_file(total_len as u64 * 2);
|
|
let (user_data_ret, res) = unsafe {
|
|
// Safe because the `wait` call waits until the kernel is done with `buf`.
|
|
uring
|
|
.add_writev_iter(io_vecs.into_iter(), f.as_raw_fd(), OFFSET, 55)
|
|
.unwrap();
|
|
uring.wait().unwrap().next().unwrap()
|
|
};
|
|
assert_eq!(user_data_ret, 55);
|
|
assert_eq!(res.unwrap(), total_len as u32);
|
|
|
|
let mut read_back = [0u8; BUF_SIZE];
|
|
f.seek(SeekFrom::Start(OFFSET)).unwrap();
|
|
f.read(&mut read_back).unwrap();
|
|
assert!(!read_back.iter().any(|&b| b != 0xaa));
|
|
f.read(&mut read_back).unwrap();
|
|
assert!(!read_back.iter().any(|&b| b != 0xff));
|
|
f.read(&mut read_back).unwrap();
|
|
assert!(!read_back.iter().any(|&b| b != 0x55));
|
|
}
|
|
|
|
#[test]
|
|
fn fallocate_fsync() {
|
|
let tempdir = TempDir::new().unwrap();
|
|
let file_path = append_file_name(tempdir.path(), "test");
|
|
|
|
{
|
|
let buf = [0u8; 4096];
|
|
let mut f = OpenOptions::new()
|
|
.read(true)
|
|
.write(true)
|
|
.create(true)
|
|
.truncate(true)
|
|
.open(&file_path)
|
|
.unwrap();
|
|
f.write(&buf).unwrap();
|
|
}
|
|
|
|
let init_size = std::fs::metadata(&file_path).unwrap().len() as usize;
|
|
let set_size = init_size + 1024 * 1024 * 50;
|
|
let f = OpenOptions::new()
|
|
.read(true)
|
|
.write(true)
|
|
.create(true)
|
|
.open(&file_path)
|
|
.unwrap();
|
|
|
|
let mut uring = URingContext::new(16).unwrap();
|
|
uring
|
|
.add_fallocate(f.as_raw_fd(), 0, set_size as u64, 0, 66)
|
|
.unwrap();
|
|
let (user_data, res) = uring.wait().unwrap().next().unwrap();
|
|
assert_eq!(user_data, 66 as UserData);
|
|
match res {
|
|
Err(e) => {
|
|
if e.kind() == std::io::ErrorKind::InvalidInput {
|
|
// skip on kernels that don't support fallocate.
|
|
return;
|
|
}
|
|
panic!("Unexpected fallocate error: {}", e);
|
|
}
|
|
Ok(val) => assert_eq!(val, 0 as u32),
|
|
}
|
|
|
|
// Add a few writes and then fsync
|
|
let buf = [0u8; 4096];
|
|
let mut pending = std::collections::BTreeSet::new();
|
|
unsafe {
|
|
uring
|
|
.add_write(buf.as_ptr(), buf.len(), f.as_raw_fd(), 0, 67)
|
|
.unwrap();
|
|
pending.insert(67u64);
|
|
uring
|
|
.add_write(buf.as_ptr(), buf.len(), f.as_raw_fd(), 4096, 68)
|
|
.unwrap();
|
|
pending.insert(68);
|
|
uring
|
|
.add_write(buf.as_ptr(), buf.len(), f.as_raw_fd(), 8192, 69)
|
|
.unwrap();
|
|
pending.insert(69);
|
|
}
|
|
uring.add_fsync(f.as_raw_fd(), 70).unwrap();
|
|
pending.insert(70);
|
|
|
|
let mut wait_calls = 0;
|
|
|
|
while !pending.is_empty() && wait_calls < 5 {
|
|
let events = uring.wait().unwrap();
|
|
for (user_data, res) in events {
|
|
assert!(res.is_ok());
|
|
assert!(pending.contains(&user_data));
|
|
pending.remove(&user_data);
|
|
}
|
|
wait_calls += 1;
|
|
}
|
|
assert!(pending.is_empty());
|
|
|
|
uring
|
|
.add_fallocate(
|
|
f.as_raw_fd(),
|
|
init_size as u64,
|
|
(set_size - init_size) as u64,
|
|
(libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE) as u32,
|
|
68,
|
|
)
|
|
.unwrap();
|
|
let (user_data, res) = uring.wait().unwrap().next().unwrap();
|
|
assert_eq!(user_data, 68 as UserData);
|
|
assert_eq!(res.unwrap(), 0 as u32);
|
|
|
|
drop(f); // Close to ensure directory entires for metadata are updated.
|
|
|
|
let new_size = std::fs::metadata(&file_path).unwrap().len() as usize;
|
|
assert_eq!(new_size, set_size);
|
|
}
|
|
|
|
#[test]
|
|
fn dev_zero_readable() {
|
|
let f = File::open(Path::new("/dev/zero")).unwrap();
|
|
let mut uring = URingContext::new(16).unwrap();
|
|
uring
|
|
.add_poll_fd(f.as_raw_fd(), &WatchingEvents::empty().set_read(), 454)
|
|
.unwrap();
|
|
let (user_data, res) = uring.wait().unwrap().next().unwrap();
|
|
assert_eq!(user_data, 454 as UserData);
|
|
assert_eq!(res.unwrap(), 1 as u32);
|
|
}
|
|
|
|
#[test]
|
|
fn queue_many_ebusy_retry() {
|
|
let num_entries = 16;
|
|
let f = File::open(Path::new("/dev/zero")).unwrap();
|
|
let mut uring = URingContext::new(num_entries).unwrap();
|
|
// Fill the sumbit ring.
|
|
for sqe_batch in 0..3 {
|
|
for i in 0..num_entries {
|
|
uring
|
|
.add_poll_fd(
|
|
f.as_raw_fd(),
|
|
&WatchingEvents::empty().set_read(),
|
|
(sqe_batch * num_entries + i) as u64,
|
|
)
|
|
.unwrap();
|
|
}
|
|
uring.submit().unwrap();
|
|
}
|
|
// Adding more than the number of cqes will cause the uring to return ebusy, make sure that
|
|
// is handled cleanly and wait still returns the completed entries.
|
|
uring
|
|
.add_poll_fd(
|
|
f.as_raw_fd(),
|
|
&WatchingEvents::empty().set_read(),
|
|
(num_entries * 3) as u64,
|
|
)
|
|
.unwrap();
|
|
// The first wait call should return the cques that are already filled.
|
|
{
|
|
let mut results = uring.wait().unwrap();
|
|
for _i in 0..num_entries * 2 {
|
|
assert_eq!(results.next().unwrap().1.unwrap(), 1 as u32);
|
|
}
|
|
assert!(results.next().is_none());
|
|
}
|
|
// The second will finish submitting any more sqes and return the rest.
|
|
let mut results = uring.wait().unwrap();
|
|
for _i in 0..num_entries + 1 {
|
|
assert_eq!(results.next().unwrap().1.unwrap(), 1 as u32);
|
|
}
|
|
assert!(results.next().is_none());
|
|
}
|
|
}
|