devices: virtio: make Queue track its own Interrupt

When constructing a Queue from a QueueConfig, require the caller to pass
the corresponding Interrupt, and remove the &Interrupt argument from the
Queue::trigger_interrupt() function. This prevents mismatches of Queue
and Interrupt, especially in the case of a re-activation after a reset
where the old and new Interrupt are not the same.

BUG=b:360926085

Change-Id: I31ad9704d4963e46f7ce1b7b7a43aec1a92e342d
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5798666
Commit-Queue: Daniel Verkamp <dverkamp@chromium.org>
Reviewed-by: Frederick Mayle <fmayle@google.com>
This commit is contained in:
Daniel Verkamp 2024-08-19 19:06:00 -07:00 committed by crosvm LUCI
parent 2ac374d114
commit 8cc63c4219
51 changed files with 317 additions and 480 deletions

View file

@ -347,7 +347,6 @@ async fn handle_queue<F>(
mut queue: Queue,
mut queue_event: EventAsync,
release_memory_tube: Option<&Tube>,
interrupt: Interrupt,
mut desc_handler: F,
mut stop_rx: oneshot::Receiver<()>,
) -> Queue
@ -372,7 +371,7 @@ where
error!("balloon: failed to process inflate addresses: {}", e);
}
queue.add_used(avail_desc, 0);
queue.trigger_interrupt(&interrupt);
queue.trigger_interrupt();
}
}
@ -400,7 +399,6 @@ async fn handle_reporting_queue<F>(
mut queue: Queue,
mut queue_event: EventAsync,
release_memory_tube: Option<&Tube>,
interrupt: Interrupt,
mut desc_handler: F,
mut stop_rx: oneshot::Receiver<()>,
) -> Queue
@ -424,7 +422,7 @@ where
error!("balloon: failed to process reported buffer: {}", e);
}
queue.add_used(avail_desc, 0);
queue.trigger_interrupt(&interrupt);
queue.trigger_interrupt();
}
}
@ -453,7 +451,6 @@ async fn handle_stats_queue(
command_tube: &AsyncTube,
#[cfg(feature = "registered_events")] registered_evt_q: Option<&SendTubeAsync>,
state: Arc<AsyncRwLock<BalloonState>>,
interrupt: Interrupt,
mut stop_rx: oneshot::Receiver<()>,
) -> Queue {
let mut avail_desc = match queue
@ -487,7 +484,7 @@ async fn handle_stats_queue(
// Request a new stats_desc to the guest.
queue.add_used(avail_desc, 0);
queue.trigger_interrupt(&interrupt);
queue.trigger_interrupt();
avail_desc = match queue.next_async(&mut queue_event).await {
Err(e) => {
@ -543,7 +540,6 @@ async fn handle_ws_op_queue(
mut queue_event: EventAsync,
mut ws_op_rx: mpsc::Receiver<WSOp>,
state: Arc<AsyncRwLock<BalloonState>>,
interrupt: Interrupt,
mut stop_rx: oneshot::Receiver<()>,
) -> Result<Queue> {
loop {
@ -604,7 +600,7 @@ async fn handle_ws_op_queue(
let len = writer.bytes_written() as u32;
queue.add_used(avail_desc, len);
queue.trigger_interrupt(&interrupt);
queue.trigger_interrupt();
}
Ok(queue)
@ -641,7 +637,6 @@ async fn handle_ws_data_queue(
command_tube: &AsyncTube,
#[cfg(feature = "registered_events")] registered_evt_q: Option<&SendTubeAsync>,
state: Arc<AsyncRwLock<BalloonState>>,
interrupt: Interrupt,
mut stop_rx: oneshot::Receiver<()>,
) -> Result<Queue> {
loop {
@ -682,7 +677,7 @@ async fn handle_ws_data_queue(
}
queue.add_used(avail_desc, 0);
queue.trigger_interrupt(&interrupt);
queue.trigger_interrupt();
}
}
@ -906,7 +901,6 @@ fn run_worker(
inflate_queue,
EventAsync::new(inflate_queue_evt, &ex).expect("failed to create async event"),
release_memory_tube.as_ref(),
interrupt.clone(),
|guest_address, len| {
sys::free_memory(
&guest_address,
@ -932,7 +926,6 @@ fn run_worker(
deflate_queue,
EventAsync::new(deflate_queue_evt, &ex).expect("failed to create async event"),
None,
interrupt.clone(),
|guest_address, len| {
sys::reclaim_memory(
&guest_address,
@ -963,7 +956,6 @@ fn run_worker(
#[cfg(feature = "registered_events")]
registered_evt_q_async.as_ref(),
state.clone(),
interrupt.clone(),
stop_rx,
)
.left_future()
@ -985,7 +977,6 @@ fn run_worker(
reporting_queue,
EventAsync::new(reporting_queue_evt, &ex).expect("failed to create async event"),
release_memory_tube.as_ref(),
interrupt.clone(),
|guest_address, len| {
sys::free_memory(
&guest_address,
@ -1021,7 +1012,6 @@ fn run_worker(
#[cfg(feature = "registered_events")]
registered_evt_q_async.as_ref(),
state.clone(),
interrupt.clone(),
stop_rx,
)
.left_future()
@ -1044,7 +1034,6 @@ fn run_worker(
EventAsync::new(ws_op_queue_evt, &ex).expect("failed to create async event"),
ws_op_rx,
state.clone(),
interrupt.clone(),
stop_rx,
)
.left_future()

View file

@ -275,7 +275,6 @@ async fn process_one_chain(
queue: &RefCell<Queue>,
mut avail_desc: DescriptorChain,
disk_state: &AsyncRwLock<DiskState>,
interrupt: &Interrupt,
flush_timer: &RefCell<TimerAsync<Timer>>,
flush_timer_armed: &RefCell<bool>,
) {
@ -292,7 +291,7 @@ async fn process_one_chain(
let mut queue = queue.borrow_mut();
queue.add_used(avail_desc, len as u32);
queue.trigger_interrupt(interrupt);
queue.trigger_interrupt();
}
// There is one async task running `handle_queue` per virtio queue in use.
@ -302,7 +301,6 @@ async fn handle_queue(
disk_state: Rc<AsyncRwLock<DiskState>>,
queue: Queue,
evt: EventAsync,
interrupt: Interrupt,
flush_timer: Rc<RefCell<TimerAsync<Timer>>>,
flush_timer_armed: Rc<RefCell<bool>>,
mut stop_rx: oneshot::Receiver<()>,
@ -339,7 +337,6 @@ async fn handle_queue(
&queue,
descriptor_chain,
&disk_state,
&interrupt,
&flush_timer,
&flush_timer_armed,
));
@ -448,7 +445,6 @@ enum WorkerCmd {
StartQueue {
index: usize,
queue: Queue,
interrupt: Interrupt,
},
StopQueue {
index: usize,
@ -526,16 +522,16 @@ async fn run_worker(
worker_cmd = worker_rx.next() => {
match worker_cmd {
None => anyhow::bail!("worker control channel unexpectedly closed"),
Some(WorkerCmd::StartQueue{index, queue, interrupt}) => {
Some(WorkerCmd::StartQueue{index, queue}) => {
if matches!(&*resample_future, futures::future::Either::Left(_)) {
resample_future.set(
async_utils::handle_irq_resample(ex, interrupt.clone())
async_utils::handle_irq_resample(ex, queue.interrupt().clone())
.fuse()
.right_future(),
);
}
if control_interrupt.borrow().is_none() {
*control_interrupt.borrow_mut() = Some(interrupt.clone());
*control_interrupt.borrow_mut() = Some(queue.interrupt().clone());
}
let (tx, rx) = oneshot::channel();
@ -544,7 +540,6 @@ async fn run_worker(
Rc::clone(disk_state),
queue,
EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
interrupt,
Rc::clone(&flush_timer),
Rc::clone(&flush_timer_armed),
rx,
@ -1057,15 +1052,10 @@ impl BlockAsync {
idx: usize,
queue: Queue,
_mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
let (_, worker_tx) = self.start_worker(idx)?;
worker_tx
.unbounded_send(WorkerCmd::StartQueue {
index: idx,
queue,
interrupt: doorbell,
})
.unbounded_send(WorkerCmd::StartQueue { index: idx, queue })
.expect("worker channel closed early");
self.activated_queues.insert(idx);
Ok(())
@ -1139,11 +1129,11 @@ impl VirtioDevice for BlockAsync {
fn activate(
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
_interrupt: Interrupt,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
for (i, q) in queues {
self.start_queue(i, q, mem.clone(), interrupt.clone())?;
self.start_queue(i, q, mem.clone())?;
}
Ok(())
}
@ -1176,9 +1166,9 @@ impl VirtioDevice for BlockAsync {
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
if let Some((mem, interrupt, queues)) = queues_state {
if let Some((mem, _interrupt, queues)) = queues_state {
for (i, q) in queues {
self.start_queue(i, q, mem.clone(), interrupt.clone())?
self.start_queue(i, q, mem.clone())?
}
}
Ok(())
@ -1641,25 +1631,23 @@ mod tests {
)
.unwrap();
let interrupt = Interrupt::new_for_test();
// activate with queues of an arbitrary size.
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q0.set_ready(true);
let q0 = q0
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q1.set_ready(true);
let q1 = q1
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
b.activate(
mem.clone(),
Interrupt::new_for_test(),
BTreeMap::from([(0, q0), (1, q1)]),
)
.expect("activate should succeed");
b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
.expect("activate should succeed");
// assert resources are consumed
if !enables_multiple_workers {
assert!(
@ -1695,24 +1683,21 @@ mod tests {
assert_eq!(b.id, Some(*b"Block serial number\0"));
// re-activate should succeed
let interrupt = Interrupt::new_for_test();
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q0.set_ready(true);
let q0 = q0
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q1.set_ready(true);
let q1 = q1
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
b.activate(
mem,
Interrupt::new_for_test(),
BTreeMap::from([(0, q0), (1, q1)]),
)
.expect("re-activate should succeed");
b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
.expect("re-activate should succeed");
}
#[test]
@ -1760,20 +1745,21 @@ mod tests {
)
.unwrap();
let interrupt = Interrupt::new_for_test();
// activate with queues of an arbitrary size.
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q0.set_ready(true);
let q0 = q0
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q1.set_ready(true);
let q1 = q1
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
let interrupt = Interrupt::new_for_test();
b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
.expect("activate should succeed");
@ -1861,24 +1847,21 @@ mod tests {
.unwrap();
// activate with queues of an arbitrary size.
let interrupt = Interrupt::new_for_test();
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q0.set_ready(true);
let q0 = q0
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q1.set_ready(true);
let q1 = q1
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
b.activate(
mem.clone(),
Interrupt::new_for_test(),
BTreeMap::from([(0, q0), (1, q1)]),
)
.expect("activate should succeed");
b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
.expect("activate should succeed");
assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
drop(b);
@ -1894,24 +1877,21 @@ mod tests {
let mut b = BlockAsync::new(features, disk_image, &disk_option, None, None, None).unwrap();
// activate should succeed
let interrupt = Interrupt::new_for_test();
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q0.set_ready(true);
let q0 = q0
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q1.set_ready(true);
let q1 = q1
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
b.activate(
mem,
Interrupt::new_for_test(),
BTreeMap::from([(0, q0), (1, q1)]),
)
.expect("activate should succeed");
b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
.expect("activate should succeed");
assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
}

View file

@ -87,11 +87,11 @@ impl VirtioDevice for Console {
fn activate(
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
_interrupt: Interrupt,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
for (idx, queue) in queues.into_iter() {
self.console.start_queue(idx, queue, interrupt.clone())?
self.console.start_queue(idx, queue)?
}
Ok(())
}
@ -128,9 +128,9 @@ impl VirtioDevice for Console {
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
if let Some((_mem, interrupt, queues)) = queues_state {
if let Some((_mem, _interrupt, queues)) = queues_state {
for (idx, queue) in queues.into_iter() {
self.console.start_queue(idx, queue, interrupt.clone())?;
self.console.start_queue(idx, queue)?;
}
}
Ok(())

View file

@ -21,7 +21,6 @@ use crate::virtio::device_constants::console::VIRTIO_CONSOLE_DEVICE_READY;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_PORT_NAME;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_PORT_OPEN;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_PORT_READY;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::virtio::Reader;
@ -122,7 +121,6 @@ fn process_control_msg(
pub fn process_control_transmit_queue(
queue: &mut Queue,
interrupt: &Interrupt,
ports: &[WorkerPort],
pending_receive_control_msgs: &mut VecDeque<ControlMsgBytes>,
) {
@ -140,13 +138,12 @@ pub fn process_control_transmit_queue(
}
if needs_interrupt {
queue.trigger_interrupt(interrupt);
queue.trigger_interrupt();
}
}
pub fn process_control_receive_queue(
queue: &mut Queue,
interrupt: &Interrupt,
pending_receive_control_msgs: &mut VecDeque<ControlMsgBytes>,
) {
let mut needs_interrupt = false;
@ -175,6 +172,6 @@ pub fn process_control_receive_queue(
}
if needs_interrupt {
queue.trigger_interrupt(interrupt);
queue.trigger_interrupt();
}
}

View file

@ -116,13 +116,8 @@ impl ConsoleDevice {
}
}
pub fn start_queue(
&mut self,
idx: usize,
queue: Queue,
interrupt: Interrupt,
) -> anyhow::Result<()> {
let worker = self.ensure_worker_started(interrupt);
pub fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
let worker = self.ensure_worker_started(queue.interrupt().clone());
worker.start_queue(idx, queue)
}

View file

@ -7,7 +7,6 @@
use std::collections::VecDeque;
use std::io::Write;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
/// Checks for input from `buffer` and transfers it to the receive queue, if any.
@ -17,11 +16,7 @@ use crate::virtio::Queue;
/// * `interrupt` - Interrupt used to signal that the queue has been used
/// * `buffer` - Ring buffer providing data to put into the guest
/// * `receive_queue` - The receive virtio Queue
pub fn process_receive_queue(
interrupt: &Interrupt,
buffer: &mut VecDeque<u8>,
receive_queue: &mut Queue,
) {
pub fn process_receive_queue(buffer: &mut VecDeque<u8>, receive_queue: &mut Queue) {
while let Some(mut desc) = receive_queue.peek() {
if buffer.is_empty() {
break;
@ -44,7 +39,7 @@ pub fn process_receive_queue(
if bytes_written > 0 {
let desc = desc.pop();
receive_queue.add_used(desc, bytes_written);
receive_queue.trigger_interrupt(interrupt);
receive_queue.trigger_interrupt();
}
}
}

View file

@ -9,7 +9,6 @@ use std::io::Read;
use base::error;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::virtio::Reader;
@ -35,11 +34,7 @@ fn process_transmit_request(reader: &mut Reader, output: &mut dyn io::Write) ->
/// * `interrupt` - Interrupt used to signal (if required) that the queue has been used
/// * `transmit_queue` - The transmit virtio Queue
/// * `output` - The output sink we are going to write the data into
pub fn process_transmit_queue(
interrupt: &Interrupt,
transmit_queue: &mut Queue,
output: &mut dyn io::Write,
) {
pub fn process_transmit_queue(transmit_queue: &mut Queue, output: &mut dyn io::Write) {
let mut needs_interrupt = false;
while let Some(mut avail_desc) = transmit_queue.pop() {
if let Err(e) = process_transmit_request(&mut avail_desc.reader, output) {
@ -51,6 +46,6 @@ pub fn process_transmit_queue(
}
if needs_interrupt {
transmit_queue.trigger_interrupt(interrupt);
transmit_queue.trigger_interrupt();
}
}

View file

@ -172,7 +172,7 @@ impl Worker {
.event()
.wait()
.context("failed reading transmit queue Event")?;
process_transmit_queue(&self.interrupt, transmitq, &mut port.output);
process_transmit_queue(transmitq, &mut port.output);
}
}
Token::ReceiveQueueAvailable(port_id) | Token::InputAvailable(port_id) => {
@ -191,7 +191,7 @@ impl Worker {
if let (Some(port), Some(receiveq)) = (port, receiveq) {
let mut input_buffer = port.input_buffer.lock();
process_receive_queue(&self.interrupt, &mut input_buffer, receiveq);
process_receive_queue(&mut input_buffer, receiveq);
}
}
Token::ControlReceiveQueueAvailable => {
@ -202,7 +202,6 @@ impl Worker {
.context("failed waiting on control event")?;
process_control_receive_queue(
ctrl_receiveq,
&self.interrupt,
&mut self.pending_receive_control_msgs,
);
}
@ -215,7 +214,6 @@ impl Worker {
.context("failed waiting on control event")?;
process_control_transmit_queue(
ctrl_transmitq,
&self.interrupt,
&self.ports,
&mut self.pending_receive_control_msgs,
);
@ -225,7 +223,6 @@ impl Worker {
if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
process_control_receive_queue(
ctrl_receiveq,
&self.interrupt,
&mut self.pending_receive_control_msgs,
)
}

View file

@ -147,7 +147,6 @@ pub struct Worker<F: FileSystem + Sync> {
}
pub fn process_fs_queue<F: FileSystem + Sync>(
interrupt: &Interrupt,
queue: &mut Queue,
server: &Arc<fuse::Server<F>>,
tube: &Arc<Mutex<Tube>>,
@ -159,7 +158,7 @@ pub fn process_fs_queue<F: FileSystem + Sync>(
server.handle_message(&mut avail_desc.reader, &mut avail_desc.writer, &mapper)?;
queue.add_used(avail_desc, total as u32);
queue.trigger_interrupt(interrupt);
queue.trigger_interrupt();
}
Ok(())
@ -252,13 +251,9 @@ impl<F: FileSystem + Sync> Worker<F> {
match event.token {
Token::QueueReady => {
self.queue.event().wait().map_err(Error::ReadQueueEvent)?;
if let Err(e) = process_fs_queue(
&self.irq,
&mut self.queue,
&self.server,
&self.tube,
self.slot,
) {
if let Err(e) =
process_fs_queue(&mut self.queue, &self.server, &self.tube, self.slot)
{
error!("virtio-fs transport error: {}", e);
return Err(e);
}

View file

@ -184,14 +184,12 @@ pub trait QueueReader {
struct LocalQueueReader {
queue: RefCell<Queue>,
interrupt: Interrupt,
}
impl LocalQueueReader {
fn new(queue: Queue, interrupt: Interrupt) -> Self {
fn new(queue: Queue) -> Self {
Self {
queue: RefCell::new(queue),
interrupt,
}
}
}
@ -206,21 +204,19 @@ impl QueueReader for LocalQueueReader {
}
fn signal_used(&self) {
self.queue.borrow_mut().trigger_interrupt(&self.interrupt);
self.queue.borrow_mut().trigger_interrupt();
}
}
#[derive(Clone)]
struct SharedQueueReader {
queue: Arc<Mutex<Queue>>,
interrupt: Interrupt,
}
impl SharedQueueReader {
fn new(queue: Queue, interrupt: Interrupt) -> Self {
fn new(queue: Queue) -> Self {
Self {
queue: Arc::new(Mutex::new(queue)),
interrupt,
}
}
}
@ -235,7 +231,7 @@ impl QueueReader for SharedQueueReader {
}
fn signal_used(&self) {
self.queue.lock().trigger_interrupt(&self.interrupt);
self.queue.lock().trigger_interrupt();
}
}
@ -1755,8 +1751,8 @@ impl VirtioDevice for Gpu {
));
}
let ctrl_queue = SharedQueueReader::new(queues.remove(&0).unwrap(), interrupt.clone());
let cursor_queue = LocalQueueReader::new(queues.remove(&1).unwrap(), interrupt.clone());
let ctrl_queue = SharedQueueReader::new(queues.remove(&0).unwrap());
let cursor_queue = LocalQueueReader::new(queues.remove(&1).unwrap());
match self
.worker_thread

View file

@ -529,10 +529,10 @@ impl<T: EventSource> Worker<T> {
}
if eventq_needs_interrupt {
self.event_queue.trigger_interrupt(&self.interrupt);
self.event_queue.trigger_interrupt();
}
if statusq_needs_interrupt {
self.status_queue.trigger_interrupt(&self.interrupt);
self.status_queue.trigger_interrupt();
}
}

View file

@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::fmt;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
@ -75,6 +76,12 @@ pub struct Interrupt {
inner: Arc<InterruptInner>,
}
impl fmt::Debug for Interrupt {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Interrupt")
}
}
#[derive(Serialize, Deserialize)]
pub struct InterruptSnapshot {
interrupt_status: usize,

View file

@ -598,7 +598,6 @@ async fn request_queue(
state: &Rc<RefCell<State>>,
mut queue: Queue,
mut queue_event: EventAsync,
interrupt: Interrupt,
) -> Result<()> {
loop {
let mut avail_desc = queue
@ -628,7 +627,7 @@ async fn request_queue(
}
queue.add_used(avail_desc, len as u32);
queue.trigger_interrupt(&interrupt);
queue.trigger_interrupt();
}
}
@ -651,7 +650,7 @@ fn run(
.expect("Failed to clone queue event");
let req_evt = EventAsync::new(req_evt, &ex).expect("Failed to create async event for queue");
let f_resample = async_utils::handle_irq_resample(&ex, interrupt.clone());
let f_resample = async_utils::handle_irq_resample(&ex, interrupt);
let f_kill = async_utils::await_and_exit(&ex, kill_evt);
let request_tube = translate_request_rx
@ -669,7 +668,7 @@ fn run(
let f_handle_translate_request =
sys::handle_translate_request(&ex, &state, request_tube, response_tubes);
let f_request = request_queue(&state, req_queue, req_evt, interrupt);
let f_request = request_queue(&state, req_queue, req_evt);
let command_tube = AsyncTube::new(&ex, iommu_device_tube).unwrap();
// Future to handle command messages from host, such as passing vfio containers.

View file

@ -283,7 +283,6 @@ fn process_ctrl_request<T: TapT>(
}
pub fn process_ctrl<T: TapT>(
interrupt: &Interrupt,
ctrl_queue: &mut Queue,
tap: &mut T,
acked_features: u64,
@ -307,7 +306,7 @@ pub fn process_ctrl<T: TapT>(
ctrl_queue.add_used(desc_chain, len);
}
ctrl_queue.trigger_interrupt(interrupt);
ctrl_queue.trigger_interrupt();
Ok(())
}
@ -352,7 +351,7 @@ where
T: TapT + ReadNotifier,
{
fn process_tx(&mut self) {
process_tx(&self.interrupt, &mut self.tx_queue, &mut self.tap)
process_tx(&mut self.tx_queue, &mut self.tap)
}
fn process_ctrl(&mut self) -> Result<(), NetError> {
@ -362,7 +361,6 @@ where
};
process_ctrl(
&self.interrupt,
ctrl_queue,
&mut self.tap,
self.acked_features,

View file

@ -17,7 +17,6 @@ use virtio_sys::virtio_net::virtio_net_hdr_v1;
use super::super::super::net::NetError;
use super::super::super::net::Token;
use super::super::super::net::Worker;
use super::super::super::Interrupt;
use super::super::super::Queue;
// Ensure that the tap interface has the correct flags and sets the offload and VNET header size
@ -81,11 +80,7 @@ pub fn virtio_features_to_tap_offload(features: u64) -> u32 {
tap_offloads
}
pub fn process_rx<T: TapT>(
interrupt: &Interrupt,
rx_queue: &mut Queue,
mut tap: &mut T,
) -> result::Result<(), NetError> {
pub fn process_rx<T: TapT>(rx_queue: &mut Queue, mut tap: &mut T) -> result::Result<(), NetError> {
let mut needs_interrupt = false;
let mut exhausted_queue = false;
@ -128,7 +123,7 @@ pub fn process_rx<T: TapT>(
}
if needs_interrupt {
rx_queue.trigger_interrupt(interrupt);
rx_queue.trigger_interrupt();
}
if exhausted_queue {
@ -138,7 +133,7 @@ pub fn process_rx<T: TapT>(
}
}
pub fn process_tx<T: TapT>(interrupt: &Interrupt, tx_queue: &mut Queue, mut tap: &mut T) {
pub fn process_tx<T: TapT>(tx_queue: &mut Queue, mut tap: &mut T) {
while let Some(mut desc_chain) = tx_queue.pop() {
let reader = &mut desc_chain.reader;
let expected_count = reader.available_bytes();
@ -160,7 +155,7 @@ pub fn process_tx<T: TapT>(interrupt: &Interrupt, tx_queue: &mut Queue, mut tap:
tx_queue.add_used(desc_chain, 0);
}
tx_queue.trigger_interrupt(interrupt);
tx_queue.trigger_interrupt();
}
impl<T> Worker<T>
@ -195,6 +190,6 @@ where
Ok(())
}
pub(super) fn process_rx(&mut self) -> result::Result<(), NetError> {
process_rx(&self.interrupt, &mut self.rx_queue, &mut self.tap)
process_rx(&mut self.rx_queue, &mut self.tap)
}
}

View file

@ -77,7 +77,6 @@ fn rx_single_frame(rx_queue: &mut Queue, rx_buf: &mut [u8], rx_count: usize) ->
}
pub fn process_rx<T: TapT>(
interrupt: &Interrupt,
rx_queue: &mut Queue,
tap: &mut T,
rx_buf: &mut [u8],
@ -103,7 +102,7 @@ pub fn process_rx<T: TapT>(
*deferred_rx = true;
break;
} else if first_frame {
interrupt.signal_used_queue(rx_queue.vector());
rx_queue.trigger_interrupt();
first_frame = false;
} else {
needs_interrupt = true;
@ -155,7 +154,7 @@ pub fn process_rx<T: TapT>(
needs_interrupt
}
pub fn process_tx<T: TapT>(interrupt: &Interrupt, tx_queue: &mut Queue, tap: &mut T) {
pub fn process_tx<T: TapT>(tx_queue: &mut Queue, tap: &mut T) {
// Reads up to `buf.len()` bytes or until there is no more data in `r`, whichever
// is smaller.
fn read_to_end(r: &mut Reader, buf: &mut [u8]) -> io::Result<usize> {
@ -187,7 +186,7 @@ pub fn process_tx<T: TapT>(interrupt: &Interrupt, tx_queue: &mut Queue, tap: &mu
tx_queue.add_used(desc_chain, 0);
}
tx_queue.trigger_interrupt(interrupt);
tx_queue.trigger_interrupt();
}
impl<T> Worker<T>
@ -196,7 +195,6 @@ where
{
pub(super) fn process_rx_slirp(&mut self) -> bool {
process_rx(
&self.interrupt,
&mut self.rx_queue,
&mut self.tap,
&mut self.rx_buf,

View file

@ -86,7 +86,7 @@ impl Worker {
self.queue.add_used(avail_desc, len);
}
self.queue.trigger_interrupt(&self.interrupt);
self.queue.trigger_interrupt();
Ok(())
}

View file

@ -264,7 +264,6 @@ fn handle_request(
async fn handle_queue(
queue: &mut Queue,
mut queue_event: EventAsync,
interrupt: Interrupt,
pmem_device_tube: &Tube,
mapping_arena_slot: u32,
mapping_size: usize,
@ -291,7 +290,7 @@ async fn handle_queue(
}
};
queue.add_used(avail_desc, written as u32);
queue.trigger_interrupt(&interrupt);
queue.trigger_interrupt();
}
}
@ -316,7 +315,6 @@ fn run_worker(
let queue_fut = handle_queue(
queue,
queue_evt,
interrupt.clone(),
pmem_device_tube,
mapping_arena_slot,
mapping_size,

View file

@ -854,7 +854,7 @@ fn run_main_worker(
};
set_pvclock_page_queue.add_used(desc_chain, len);
set_pvclock_page_queue.trigger_interrupt(&interrupt);
set_pvclock_page_queue.trigger_interrupt();
}
Token::SuspendResume => {
let req = match suspend_tube.recv::<PvClockCommand>() {
@ -1078,11 +1078,17 @@ mod tests {
let mut fake_queue = QueueConfig::new(TEST_QUEUE_SIZE, 0);
fake_queue.set_ready(true);
let mem = GuestMemory::new(&[(GuestAddress(0), 0x10000)]).unwrap();
let interrupt = make_interrupt();
pvclock_device
.activate(
mem.clone(),
make_interrupt(),
BTreeMap::from([(0, fake_queue.activate(&mem, Event::new().unwrap()).unwrap())]),
interrupt.clone(),
BTreeMap::from([(
0,
fake_queue
.activate(&mem, Event::new().unwrap(), interrupt)
.unwrap(),
)]),
)
.expect("activate should succeed");
let queues = pvclock_device
@ -1103,9 +1109,15 @@ mod tests {
// by the device in these tests.
let mut wake_queues = BTreeMap::new();
let mut fake_queue = QueueConfig::new(TEST_QUEUE_SIZE, 0);
let interrupt = make_interrupt();
fake_queue.set_ready(true);
wake_queues.insert(0, fake_queue.activate(mem, Event::new().unwrap()).unwrap());
let queues_state = (mem.clone(), make_interrupt(), wake_queues);
wake_queues.insert(
0,
fake_queue
.activate(mem, Event::new().unwrap(), interrupt.clone())
.unwrap(),
);
let queues_state = (mem.clone(), interrupt, wake_queues);
pvclock_device
.virtio_wake(Some(queues_state))
.expect("wake should succeed");

View file

@ -260,7 +260,12 @@ impl QueueConfig {
}
/// Convert the queue configuration into an active queue.
pub fn activate(&mut self, mem: &GuestMemory, event: Event) -> Result<Queue> {
pub fn activate(
&mut self,
mem: &GuestMemory,
event: Event,
interrupt: Interrupt,
) -> Result<Queue> {
if !self.ready {
bail!("attempted to activate a non-ready queue");
}
@ -271,12 +276,12 @@ impl QueueConfig {
// If VIRTIO_F_RING_PACKED feature bit is set, create a packed queue, otherwise create a
// split queue
let queue: Queue = if ((self.acked_features >> VIRTIO_F_RING_PACKED) & 1) != 0 {
let pq =
PackedQueue::new(self, mem, event).context("Failed to create a packed queue.")?;
let pq = PackedQueue::new(self, mem, event, interrupt)
.context("Failed to create a packed queue.")?;
Queue::PackedVirtQueue(pq)
} else {
let sq =
SplitQueue::new(self, mem, event).context("Failed to create a split queue.")?;
let sq = SplitQueue::new(self, mem, event, interrupt)
.context("Failed to create a split queue.")?;
Queue::SplitVirtQueue(sq)
};
@ -436,10 +441,10 @@ impl Queue {
/// inject interrupt into guest on this queue
/// return true: interrupt is injected into guest for this queue
/// false: interrupt isn't injected
pub fn trigger_interrupt(&mut self, interrupt: &Interrupt) -> bool {
pub fn trigger_interrupt(&mut self) -> bool {
match self {
Queue::SplitVirtQueue(sq) => sq.trigger_interrupt(interrupt),
Queue::PackedVirtQueue(pq) => pq.trigger_interrupt(interrupt),
Queue::SplitVirtQueue(sq) => sq.trigger_interrupt(),
Queue::PackedVirtQueue(pq) => pq.trigger_interrupt(),
}
}
@ -449,11 +454,12 @@ impl Queue {
queue_value: serde_json::Value,
mem: &GuestMemory,
event: Event,
interrupt: Interrupt,
) -> anyhow::Result<Queue> {
if queue_config.acked_features & 1 << VIRTIO_F_RING_PACKED != 0 {
PackedQueue::restore(queue_value, mem, event).map(Queue::PackedVirtQueue)
PackedQueue::restore(queue_value, mem, event, interrupt).map(Queue::PackedVirtQueue)
} else {
SplitQueue::restore(queue_value, mem, event).map(Queue::SplitVirtQueue)
SplitQueue::restore(queue_value, mem, event, interrupt).map(Queue::SplitVirtQueue)
}
}
@ -517,6 +523,12 @@ impl Queue {
&Event,
);
define_queue_method!(
/// Get a reference to the queue's interrupt.
interrupt,
&Interrupt,
);
define_queue_method!(
/// Puts an available descriptor head into the used ring for use by the guest.
add_used,

View file

@ -85,6 +85,7 @@ pub struct PackedQueue {
mem: GuestMemory,
event: Event,
interrupt: Interrupt,
// The queue size in elements the driver selected
size: u16,
@ -125,7 +126,12 @@ pub struct PackedQueueSnapshot {
impl PackedQueue {
/// Constructs an empty virtio queue with the given `max_size`.
pub fn new(config: &QueueConfig, mem: &GuestMemory, event: Event) -> Result<Self> {
pub fn new(
config: &QueueConfig,
mem: &GuestMemory,
event: Event,
interrupt: Interrupt,
) -> Result<Self> {
let size = config.size();
let desc_table = config.desc_table();
@ -154,6 +160,7 @@ impl PackedQueue {
Ok(PackedQueue {
mem: mem.clone(),
event,
interrupt,
size,
vector: config.vector(),
desc_table: config.desc_table(),
@ -206,6 +213,11 @@ impl PackedQueue {
&self.event
}
/// Get a reference to the queue's interrupt
pub fn interrupt(&self) -> &Interrupt {
&self.interrupt
}
fn area_sizes(
queue_size: u16,
desc_table: GuestAddress,
@ -396,9 +408,9 @@ impl PackedQueue {
/// inject interrupt into guest on this queue
/// return true: interrupt is injected into guest for this queue
/// false: interrupt isn't injected
pub fn trigger_interrupt(&mut self, interrupt: &Interrupt) -> bool {
pub fn trigger_interrupt(&mut self) -> bool {
if self.queue_wants_interrupt() {
interrupt.signal_used_queue(self.vector);
self.interrupt.signal_used_queue(self.vector);
true
} else {
false
@ -422,6 +434,7 @@ impl PackedQueue {
_queue_value: serde_json::Value,
_mem: &GuestMemory,
_event: Event,
_interrupt: Interrupt,
) -> Result<PackedQueue> {
bail!("Restore for packed virtqueue not implemented.");
}

View file

@ -37,6 +37,7 @@ pub struct SplitQueue {
mem: GuestMemory,
event: Event,
interrupt: Interrupt,
/// The queue size in elements the driver selected. This is always guaranteed to be a power of
/// two, as required for split virtqueues.
@ -84,7 +85,12 @@ struct virtq_used_elem {
impl SplitQueue {
/// Constructs an activated split virtio queue with the given configuration.
pub fn new(config: &QueueConfig, mem: &GuestMemory, event: Event) -> Result<SplitQueue> {
pub fn new(
config: &QueueConfig,
mem: &GuestMemory,
event: Event,
interrupt: Interrupt,
) -> Result<SplitQueue> {
let size = config.size();
if !size.is_power_of_two() {
bail!("split queue size {size} is not a power of 2");
@ -114,6 +120,7 @@ impl SplitQueue {
Ok(SplitQueue {
mem: mem.clone(),
event,
interrupt,
size,
vector: config.vector(),
desc_table: config.desc_table(),
@ -180,6 +187,11 @@ impl SplitQueue {
&self.event
}
/// Get a reference to the queue's interrupt
pub fn interrupt(&self) -> &Interrupt {
&self.interrupt
}
// Return `index` modulo the currently configured queue size.
fn wrap_queue_index(&self, index: Wrapping<u16>) -> u16 {
// We know that `self.size` is a power of two (enforced by `new()`), so the modulus can
@ -473,10 +485,10 @@ impl SplitQueue {
/// inject interrupt into guest on this queue
/// return true: interrupt is injected into guest for this queue
/// false: interrupt isn't injected
pub fn trigger_interrupt(&mut self, interrupt: &Interrupt) -> bool {
pub fn trigger_interrupt(&mut self) -> bool {
if self.queue_wants_interrupt() {
self.last_used = self.next_used;
interrupt.signal_used_queue(self.vector);
self.interrupt.signal_used_queue(self.vector);
true
} else {
false
@ -502,11 +514,13 @@ impl SplitQueue {
queue_value: serde_json::Value,
mem: &GuestMemory,
event: Event,
interrupt: Interrupt,
) -> anyhow::Result<SplitQueue> {
let s: SplitQueueSnapshot = serde_json::from_value(queue_value)?;
let queue = SplitQueue {
mem: mem.clone(),
event,
interrupt,
size: s.size,
vector: s.vector,
desc_table: s.desc_table,
@ -537,7 +551,6 @@ mod tests {
use crate::virtio::Desc;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::IrqLevelEvent;
const GUEST_MEMORY_SIZE: u64 = 0x10000;
const DESC_OFFSET: u64 = 0;
@ -625,7 +638,7 @@ mod tests {
queue.set_ready(true);
queue
.activate(mem, Event::new().unwrap())
.activate(mem, Event::new().unwrap(), Interrupt::new_for_test())
.expect("QueueConfig::activate failed")
}
@ -642,14 +655,6 @@ mod tests {
let mem = GuestMemory::new(&[(memory_start_addr, GUEST_MEMORY_SIZE)]).unwrap();
let mut queue = setup_vq(&mut queue, &mem);
let interrupt = Interrupt::new(
IrqLevelEvent::new().unwrap(),
None,
10,
#[cfg(target_arch = "x86_64")]
None,
);
// Offset of used_event within Avail structure
let used_event_offset = offset_of!(Avail, used_event) as u64;
let used_event_address = GuestAddress(AVAIL_OFFSET + used_event_offset);
@ -663,7 +668,7 @@ mod tests {
// At this moment driver hasn't handled any interrupts yet, so it
// should inject interrupt.
assert_eq!(queue.trigger_interrupt(&interrupt), true);
assert_eq!(queue.trigger_interrupt(), true);
// Driver handle all the interrupts and update avail.used_event to 0x100
let mut driver_handled = device_generate;
@ -671,7 +676,7 @@ mod tests {
// At this moment driver have handled all the interrupts, and
// device doesn't generate more data, so interrupt isn't needed.
assert_eq!(queue.trigger_interrupt(&interrupt), false);
assert_eq!(queue.trigger_interrupt(), false);
// Assume driver submit another u16::MAX - 0x100 req to device,
// Device has handled all of them, so increase self.next_used to u16::MAX
@ -682,7 +687,7 @@ mod tests {
// At this moment driver just handled 0x100 interrupts, so it
// should inject interrupt.
assert_eq!(queue.trigger_interrupt(&interrupt), true);
assert_eq!(queue.trigger_interrupt(), true);
// driver handle all the interrupts and update avail.used_event to u16::MAX
driver_handled = device_generate;
@ -690,7 +695,7 @@ mod tests {
// At this moment driver have handled all the interrupts, and
// device doesn't generate more data, so interrupt isn't needed.
assert_eq!(queue.trigger_interrupt(&interrupt), false);
assert_eq!(queue.trigger_interrupt(), false);
// Assume driver submit another 1 request,
// device has handled it, so wrap self.next_used to 0
@ -699,7 +704,7 @@ mod tests {
// At this moment driver has handled all the previous interrupts, so it
// should inject interrupt again.
assert_eq!(queue.trigger_interrupt(&interrupt), true);
assert_eq!(queue.trigger_interrupt(), true);
// driver handle that interrupts and update avail.used_event to 0
driver_handled = device_generate;
@ -707,7 +712,7 @@ mod tests {
// At this moment driver have handled all the interrupts, and
// device doesn't generate more data, so interrupt isn't needed.
assert_eq!(queue.trigger_interrupt(&interrupt), false);
assert_eq!(queue.trigger_interrupt(), false);
}
#[test]
@ -718,14 +723,6 @@ mod tests {
let mem = GuestMemory::new(&[(memory_start_addr, GUEST_MEMORY_SIZE)]).unwrap();
let mut queue = setup_vq(&mut queue, &mem);
let interrupt = Interrupt::new(
IrqLevelEvent::new().unwrap(),
None,
10,
#[cfg(target_arch = "x86_64")]
None,
);
// Offset of used_event within Avail structure
let used_event_offset = offset_of!(Avail, used_event) as u64;
let used_event_address = GuestAddress(AVAIL_OFFSET + used_event_offset);
@ -739,7 +736,7 @@ mod tests {
// At this moment driver hasn't handled any interrupts yet, so it
// should inject interrupt.
assert_eq!(queue.trigger_interrupt(&interrupt), true);
assert_eq!(queue.trigger_interrupt(), true);
// Driver handle part of the interrupts and update avail.used_event to 0x80
let mut driver_handled = Wrapping(0x80);
@ -747,7 +744,7 @@ mod tests {
// At this moment driver hasn't finished last interrupt yet,
// so interrupt isn't needed.
assert_eq!(queue.trigger_interrupt(&interrupt), false);
assert_eq!(queue.trigger_interrupt(), false);
// Assume driver submit another 1 request,
// device has handled it, so increment self.next_used.
@ -756,7 +753,7 @@ mod tests {
// At this moment driver hasn't finished last interrupt yet,
// so interrupt isn't needed.
assert_eq!(queue.trigger_interrupt(&interrupt), false);
assert_eq!(queue.trigger_interrupt(), false);
// Assume driver submit another u16::MAX - 0x101 req to device,
// Device has handled all of them, so increase self.next_used to u16::MAX
@ -767,7 +764,7 @@ mod tests {
// At this moment driver hasn't finished last interrupt yet,
// so interrupt isn't needed.
assert_eq!(queue.trigger_interrupt(&interrupt), false);
assert_eq!(queue.trigger_interrupt(), false);
// driver handle most of the interrupts and update avail.used_event to u16::MAX - 1,
driver_handled = device_generate - Wrapping(1);
@ -780,7 +777,7 @@ mod tests {
// At this moment driver has already finished the last interrupt(0x100),
// and device service other request, so new interrupt is needed.
assert_eq!(queue.trigger_interrupt(&interrupt), true);
assert_eq!(queue.trigger_interrupt(), true);
// Assume driver submit another 1 request,
// device has handled it, so increment self.next_used to 1
@ -789,7 +786,7 @@ mod tests {
// At this moment driver hasn't finished last interrupt((Wrapping(0)) yet,
// so interrupt isn't needed.
assert_eq!(queue.trigger_interrupt(&interrupt), false);
assert_eq!(queue.trigger_interrupt(), false);
// driver handle all the remain interrupts and wrap avail.used_event to 0x1.
driver_handled = device_generate;
@ -797,7 +794,7 @@ mod tests {
// At this moment driver has handled all the interrupts, and
// device doesn't generate more data, so interrupt isn't needed.
assert_eq!(queue.trigger_interrupt(&interrupt), false);
assert_eq!(queue.trigger_interrupt(), false);
// Assume driver submit another 1 request,
// device has handled it, so increase self.next_used.
@ -806,6 +803,6 @@ mod tests {
// At this moment driver has finished all the previous interrupts, so it
// should inject interrupt again.
assert_eq!(queue.trigger_interrupt(&interrupt), true);
assert_eq!(queue.trigger_interrupt(), true);
}
}

View file

@ -57,7 +57,7 @@ impl Worker {
}
if needs_interrupt {
self.queue.trigger_interrupt(&self.interrupt);
self.queue.trigger_interrupt();
}
}

View file

@ -759,7 +759,6 @@ async fn run_worker(
let queue_handler = handle_queue(
Rc::new(RefCell::new(queue)),
EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
interrupt,
queue_type,
sense_size,
cdb_size,
@ -777,7 +776,6 @@ async fn run_worker(
async fn handle_queue(
queue: Rc<RefCell<Queue>>,
evt: EventAsync,
interrupt: Interrupt,
queue_type: QueueType,
sense_size: u32,
cdb_size: u32,
@ -800,7 +798,6 @@ async fn handle_queue(
background_tasks.push(process_one_chain(
&queue,
chain,
&interrupt,
&queue_type,
sense_size,
cdb_size,
@ -812,7 +809,6 @@ async fn handle_queue(
async fn process_one_chain(
queue: &RefCell<Queue>,
mut avail_desc: DescriptorChain,
interrupt: &Interrupt,
queue_type: &QueueType,
sense_size: u32,
cdb_size: u32,
@ -821,7 +817,7 @@ async fn process_one_chain(
let len = process_one_request(&mut avail_desc, queue_type, sense_size, cdb_size).await;
let mut queue = queue.borrow_mut();
queue.add_used(avail_desc, len as u32);
queue.trigger_interrupt(interrupt);
queue.trigger_interrupt();
}
async fn process_one_request(

View file

@ -41,7 +41,6 @@ use crate::virtio::snd::common_backend::PcmResponse;
use crate::virtio::snd::constants::*;
use crate::virtio::snd::layout::*;
use crate::virtio::DescriptorChain;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::virtio::Reader;
use crate::virtio::Writer;
@ -524,7 +523,6 @@ async fn defer_pcm_response_to_worker(
fn send_pcm_response(
mut desc_chain: DescriptorChain,
queue: &mut Queue,
interrupt: &Interrupt,
status: virtio_snd_pcm_status,
) -> Result<(), Error> {
let writer = &mut desc_chain.writer;
@ -537,7 +535,7 @@ fn send_pcm_response(
writer.write_obj(status).map_err(Error::WriteResponse)?;
let len = writer.bytes_written() as u32;
queue.add_used(desc_chain, len);
queue.trigger_interrupt(interrupt);
queue.trigger_interrupt();
Ok(())
}
@ -556,7 +554,6 @@ async fn await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Con
pub async fn send_pcm_response_worker(
queue: Rc<AsyncRwLock<Queue>>,
interrupt: Interrupt,
recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
) -> Result<(), Error> {
@ -573,7 +570,7 @@ pub async fn send_pcm_response_worker(
};
if let Some(r) = res {
send_pcm_response(r.desc_chain, &mut *queue.lock().await, &interrupt, r.status)?;
send_pcm_response(r.desc_chain, &mut *queue.lock().await, r.status)?;
// Resume pcm_worker
if let Some(done) = r.done {
@ -680,7 +677,6 @@ pub async fn handle_ctrl_queue(
snd_data: &SndData,
queue: Rc<AsyncRwLock<Queue>>,
queue_event: &mut EventAsync,
interrupt: Interrupt,
tx_send: mpsc::UnboundedSender<PcmResponse>,
rx_send: mpsc::UnboundedSender<PcmResponse>,
card_index: usize,
@ -932,7 +928,7 @@ pub async fn handle_ctrl_queue(
handle_ctrl_msg.await?;
let len = writer.bytes_written() as u32;
queue.add_used(desc_chain, len);
queue.trigger_interrupt(&interrupt);
queue.trigger_interrupt();
}
Ok(())
}
@ -941,7 +937,6 @@ pub async fn handle_ctrl_queue(
pub async fn handle_event_queue(
mut queue: Queue,
mut queue_event: EventAsync,
interrupt: Interrupt,
) -> Result<(), Error> {
loop {
let desc_chain = queue
@ -951,6 +946,6 @@ pub async fn handle_event_queue(
// TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
queue.add_used(desc_chain, 0);
queue.trigger_interrupt(&interrupt);
queue.trigger_interrupt();
}
}

View file

@ -657,7 +657,6 @@ fn run_worker(
if run_worker_once(
&ex,
&streams,
interrupt.clone(),
&snd_data,
&mut f_kill,
&mut f_resample,
@ -680,7 +679,6 @@ fn run_worker(
if let Err(e) = reset_streams(
&ex,
&streams,
interrupt.clone(),
&tx_queue,
&mut tx_recv,
&rx_queue,
@ -744,7 +742,6 @@ async fn notify_reset_signal(reset_signal: &(AsyncRwLock<bool>, Condvar)) {
fn run_worker_once(
ex: &Executor,
streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
interrupt: Interrupt,
snd_data: &SndData,
mut f_kill: &mut (impl FusedFuture<Output = anyhow::Result<()>> + Unpin),
mut f_resample: &mut (impl FusedFuture<Output = anyhow::Result<()>> + Unpin),
@ -771,7 +768,6 @@ fn run_worker_once(
snd_data,
ctrl_queue,
ctrl_queue_evt,
interrupt.clone(),
tx_send,
rx_send,
card_index,
@ -795,8 +791,7 @@ fn run_worker_once(
Some(&reset_signal),
)
.fuse();
let f_tx_response =
send_pcm_response_worker(tx_queue, interrupt.clone(), tx_recv, Some(&reset_signal)).fuse();
let f_tx_response = send_pcm_response_worker(tx_queue, tx_recv, Some(&reset_signal)).fuse();
let f_rx = handle_pcm_queue(
streams,
rx_send2,
@ -806,8 +801,7 @@ fn run_worker_once(
Some(&reset_signal),
)
.fuse();
let f_rx_response =
send_pcm_response_worker(rx_queue, interrupt, rx_recv, Some(&reset_signal)).fuse();
let f_rx_response = send_pcm_response_worker(rx_queue, rx_recv, Some(&reset_signal)).fuse();
pin_mut!(f_ctrl, f_tx, f_tx_response, f_rx, f_rx_response);
@ -870,7 +864,6 @@ fn run_worker_once(
fn reset_streams(
ex: &Executor,
streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
interrupt: Interrupt,
tx_queue: &Rc<AsyncRwLock<Queue>>,
tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
rx_queue: &Rc<AsyncRwLock<Queue>>,
@ -902,26 +895,16 @@ fn reset_streams(
// Run these in a loop to ensure that they will survive until do_reset is finished
let f_tx_response = async {
while send_pcm_response_worker(
tx_queue.clone(),
interrupt.clone(),
tx_recv,
Some(&reset_signal),
)
.await
.is_err()
while send_pcm_response_worker(tx_queue.clone(), tx_recv, Some(&reset_signal))
.await
.is_err()
{}
};
let f_rx_response = async {
while send_pcm_response_worker(
rx_queue.clone(),
interrupt.clone(),
rx_recv,
Some(&reset_signal),
)
.await
.is_err()
while send_pcm_response_worker(rx_queue.clone(), rx_recv, Some(&reset_signal))
.await
.is_err()
{}
};

View file

@ -28,7 +28,6 @@ use crate::virtio::snd::common::from_virtio_frame_rate;
use crate::virtio::snd::constants::*;
use crate::virtio::snd::layout::*;
use crate::virtio::DescriptorChain;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
/// Messages that the worker can send to the stream (thread).
@ -58,7 +57,6 @@ pub struct Stream {
vios_client: Arc<Mutex<VioSClient>>,
control_queue: Arc<Mutex<Queue>>,
io_queue: Arc<Mutex<Queue>>,
interrupt: Interrupt,
capture: bool,
current_state: StreamState,
period: Duration,
@ -79,7 +77,6 @@ impl Stream {
pub fn try_new(
stream_id: u32,
vios_client: Arc<Mutex<VioSClient>>,
interrupt: Interrupt,
control_queue: Arc<Mutex<Queue>>,
io_queue: Arc<Mutex<Queue>>,
capture: bool,
@ -111,7 +108,6 @@ impl Stream {
vios_client: vios_client.clone(),
control_queue,
io_queue,
interrupt,
capture,
current_state,
period,
@ -249,7 +245,7 @@ impl Stream {
return Ok(false);
}
};
reply_control_op_status(code, desc, &self.control_queue, &self.interrupt)?;
reply_control_op_status(code, desc, &self.control_queue)?;
self.current_state = next_state;
Ok(true)
}
@ -310,7 +306,7 @@ impl Stream {
{
let mut io_queue_lock = self.io_queue.lock();
io_queue_lock.add_used(desc, len);
io_queue_lock.trigger_interrupt(&self.interrupt);
io_queue_lock.trigger_interrupt();
}
}
}
@ -322,13 +318,7 @@ impl Stream {
// guarantee if a buffer arrives after release is requested. Luckily it seems to
// work fine if the buffer is released after the release command is completed.
while let Some(desc) = self.buffer_queue.pop_front() {
reply_pcm_buffer_status(
VIRTIO_SND_S_OK,
0,
desc,
&self.io_queue,
&self.interrupt,
)?;
reply_pcm_buffer_status(VIRTIO_SND_S_OK, 0, desc, &self.io_queue)?;
}
}
StreamState::Prepared => {} // Do nothing, any buffers will be processed after start
@ -351,13 +341,7 @@ impl Drop for Stream {
// Also release any pending buffer
while let Some(desc) = self.buffer_queue.pop_front() {
if let Err(e) = reply_pcm_buffer_status(
VIRTIO_SND_S_IO_ERR,
0,
desc,
&self.io_queue,
&self.interrupt,
) {
if let Err(e) = reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, desc, &self.io_queue) {
error!(
"virtio-snd: Failed to reply buffer on stream {}: {}",
self.stream_id, e
@ -443,7 +427,6 @@ pub fn reply_control_op_status(
code: u32,
mut desc: DescriptorChain,
queue: &Arc<Mutex<Queue>>,
interrupt: &Interrupt,
) -> Result<()> {
let writer = &mut desc.writer;
writer
@ -455,7 +438,7 @@ pub fn reply_control_op_status(
{
let mut queue_lock = queue.lock();
queue_lock.add_used(desc, len);
queue_lock.trigger_interrupt(interrupt);
queue_lock.trigger_interrupt();
}
Ok(())
}
@ -466,7 +449,6 @@ pub fn reply_pcm_buffer_status(
latency_bytes: u32,
mut desc: DescriptorChain,
queue: &Arc<Mutex<Queue>>,
interrupt: &Interrupt,
) -> Result<()> {
let writer = &mut desc.writer;
if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
@ -483,7 +465,7 @@ pub fn reply_pcm_buffer_status(
{
let mut queue_lock = queue.lock();
queue_lock.add_used(desc, len);
queue_lock.trigger_interrupt(interrupt);
queue_lock.trigger_interrupt();
}
Ok(())
}

View file

@ -67,7 +67,6 @@ impl Worker {
streams.push(Stream::try_new(
stream_id,
vios_client.clone(),
interrupt.clone(),
control_queue.clone(),
io_queue.clone(),
capture,
@ -79,7 +78,6 @@ impl Worker {
.and_then(|e| Ok((e.try_clone()?, e)))
.map_err(SoundError::CreateEvent)?;
let interrupt_clone = interrupt.clone();
let senders: Vec<Sender<Box<StreamMsg>>> =
streams.iter().map(|sp| sp.msg_sender().clone()).collect();
let tx_queue_thread = tx_queue.clone();
@ -89,13 +87,7 @@ impl Worker {
.spawn(move || {
try_set_real_time_priority();
io_loop(
interrupt_clone,
tx_queue_thread,
rx_queue_thread,
senders,
kill_io,
)
io_loop(tx_queue_thread, rx_queue_thread, senders, kill_io)
})
.map_err(SoundError::CreateThread)?;
Ok(Worker {
@ -223,7 +215,6 @@ impl Worker {
VIRTIO_SND_S_BAD_MSG,
avail_desc,
&self.control_queue,
&self.interrupt,
);
}
let mut read_buf = vec![0u8; available_bytes];
@ -300,7 +291,7 @@ impl Worker {
{
let mut queue_lock = self.control_queue.lock();
queue_lock.add_used(avail_desc, len);
queue_lock.trigger_interrupt(&self.interrupt);
queue_lock.trigger_interrupt();
}
}
VIRTIO_SND_R_CHMAP_INFO => {
@ -386,7 +377,6 @@ impl Worker {
VIRTIO_SND_S_NOT_SUPP,
avail_desc,
&self.control_queue,
&self.interrupt,
)?;
}
}
@ -401,7 +391,7 @@ impl Worker {
writer.write_obj(evt).map_err(SoundError::QueueIO)?;
let len = writer.bytes_written() as u32;
event_queue.add_used(desc, len);
event_queue.trigger_interrupt(&self.interrupt);
event_queue.trigger_interrupt();
} else {
warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");
}
@ -431,12 +421,7 @@ impl Worker {
"virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",
read_buf.len()
);
return reply_control_op_status(
VIRTIO_SND_S_BAD_MSG,
desc,
&self.control_queue,
&self.interrupt,
);
return reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue);
}
let mut params: virtio_snd_pcm_set_params = Default::default();
params.as_bytes_mut().copy_from_slice(read_buf);
@ -448,12 +433,7 @@ impl Worker {
"virtio-snd: Driver requested operation on invalid stream: {}",
stream_id
);
reply_control_op_status(
VIRTIO_SND_S_BAD_MSG,
desc,
&self.control_queue,
&self.interrupt,
)
reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue)
}
}
@ -474,7 +454,6 @@ impl Worker {
_ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
},
&self.control_queue,
&self.interrupt,
);
}
let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();
@ -497,7 +476,6 @@ impl Worker {
_ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
},
&self.control_queue,
&self.interrupt,
)
}
}
@ -521,7 +499,7 @@ impl Worker {
{
let mut queue_lock = self.control_queue.lock();
queue_lock.add_used(desc, len);
queue_lock.trigger_interrupt(&self.interrupt);
queue_lock.trigger_interrupt();
}
Ok(())
}
@ -534,7 +512,6 @@ impl Drop for Worker {
}
fn io_loop(
interrupt: Interrupt,
tx_queue: Arc<Mutex<Queue>>,
rx_queue: Arc<Mutex<Queue>>,
senders: Vec<Sender<Box<StreamMsg>>>,
@ -587,7 +564,7 @@ fn io_loop(
"virtio-snd: Driver sent buffer for invalid stream: {}",
stream_id
);
reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, avail_desc, queue, &interrupt)?;
reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, avail_desc, queue)?;
} else {
StreamProxy::send_msg(
&senders[stream_id as usize],

View file

@ -137,7 +137,7 @@ impl Worker {
}
}
if needs_interrupt == NeedsInterrupt::Yes {
self.queue.trigger_interrupt(&self.interrupt);
self.queue.trigger_interrupt();
}
}
}

View file

@ -423,24 +423,21 @@ pub mod tests {
fn activate() {
let mut net = create_net_common();
let guest_memory = create_guest_memory().unwrap();
let interrupt = Interrupt::new_for_test();
let mut q0 = QueueConfig::new(1, 0);
q0.set_ready(true);
let q0 = q0
.activate(&guest_memory, Event::new().unwrap())
.activate(&guest_memory, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(1, 0);
q1.set_ready(true);
let q1 = q1
.activate(&guest_memory, Event::new().unwrap())
.activate(&guest_memory, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
// Just testing that we don't panic, for now
let _ = net.activate(
guest_memory,
Interrupt::new_for_test(),
BTreeMap::from([(0, q0), (1, q1)]),
);
let _ = net.activate(guest_memory, interrupt, BTreeMap::from([(0, q0), (1, q1)]));
}
}

View file

@ -18,7 +18,6 @@ use crate::virtio::block::asynchronous::BlockAsync;
use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
use crate::virtio::vhost::user::device::handler::VhostUserDevice;
use crate::virtio::vhost::user::device::VhostUserDeviceBuilder;
use crate::virtio::Interrupt;
use crate::virtio::VirtioDevice;
const NUM_QUEUES: u16 = 16;
@ -79,9 +78,8 @@ impl VhostUserDevice for BlockBackend {
idx: usize,
queue: virtio::Queue,
mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
self.inner.start_queue(idx, queue, mem, doorbell)
self.inner.start_queue(idx, queue, mem)
}
fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {

View file

@ -26,7 +26,6 @@ use crate::virtio::vhost::user::device::handler::VhostUserDevice;
use crate::virtio::vhost::user::device::listener::sys::VhostUserListener;
use crate::virtio::vhost::user::device::listener::VhostUserListenerTrait;
use crate::virtio::vhost::user::device::VhostUserDeviceBuilder;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::SerialHardware;
use crate::SerialParameters;
@ -99,14 +98,8 @@ impl VhostUserDevice for ConsoleBackend {
}
}
fn start_queue(
&mut self,
idx: usize,
queue: Queue,
_mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
self.device.console.start_queue(idx, queue, doorbell)
fn start_queue(&mut self, idx: usize, queue: Queue, _mem: GuestMemory) -> anyhow::Result<()> {
self.device.console.start_queue(idx, queue)
}
fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {

View file

@ -39,14 +39,12 @@ use crate::virtio::fs::Config;
use crate::virtio::vhost::user::device::handler::Error as DeviceError;
use crate::virtio::vhost::user::device::handler::VhostUserDevice;
use crate::virtio::vhost::user::device::handler::WorkerState;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
const MAX_QUEUE_NUM: usize = 2; /* worker queue and high priority queue */
async fn handle_fs_queue(
queue: Rc<RefCell<virtio::Queue>>,
doorbell: Interrupt,
kick_evt: EventAsync,
server: Arc<fuse::Server<PassthroughFs>>,
tube: Arc<Mutex<Tube>>,
@ -59,7 +57,7 @@ async fn handle_fs_queue(
error!("Failed to read kick event for fs queue: {}", e);
break;
}
if let Err(e) = process_fs_queue(&doorbell, &mut queue.borrow_mut(), &server, &tube, slot) {
if let Err(e) = process_fs_queue(&mut queue.borrow_mut(), &server, &tube, slot) {
error!("Process FS queue failed: {}", e);
break;
}
@ -144,7 +142,6 @@ impl VhostUserDevice for FsBackend {
idx: usize,
queue: virtio::Queue,
_mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
if self.workers[idx].is_some() {
warn!("Starting new queue handler without stopping old handler");
@ -162,7 +159,6 @@ impl VhostUserDevice for FsBackend {
let queue = Rc::new(RefCell::new(queue));
let queue_task = self.ex.spawn_local(handle_fs_queue(
queue.clone(),
doorbell,
kick_evt,
self.server.clone(),
Arc::new(Mutex::new(fs_device_tube)),

View file

@ -33,7 +33,6 @@ use crate::virtio::vhost::user::device::handler::VhostUserDevice;
use crate::virtio::vhost::user::device::handler::WorkerState;
use crate::virtio::DescriptorChain;
use crate::virtio::Gpu;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::virtio::SharedMemoryMapper;
use crate::virtio::SharedMemoryRegion;
@ -44,7 +43,6 @@ const MAX_QUEUE_NUM: usize = NUM_QUEUES;
#[derive(Clone)]
struct SharedReader {
queue: Arc<Mutex<Queue>>,
doorbell: Interrupt,
}
impl gpu::QueueReader for SharedReader {
@ -57,7 +55,7 @@ impl gpu::QueueReader for SharedReader {
}
fn signal_used(&self) {
self.queue.lock().trigger_interrupt(&self.doorbell);
self.queue.lock().trigger_interrupt();
}
}
@ -132,18 +130,14 @@ impl VhostUserDevice for GpuBackend {
self.gpu.borrow_mut().write_config(offset, data)
}
fn start_queue(
&mut self,
idx: usize,
queue: Queue,
mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
fn start_queue(&mut self, idx: usize, queue: Queue, mem: GuestMemory) -> anyhow::Result<()> {
if self.queue_workers[idx].is_some() {
warn!("Starting new queue handler without stopping old handler");
self.stop_queue(idx)?;
}
let doorbell = queue.interrupt().clone();
// Create a refcounted queue. The GPU control queue uses a SharedReader which allows us to
// handle fences in the RutabagaFenceHandler, and also handle queue messages in
// `run_ctrl_queue`.
@ -164,7 +158,6 @@ impl VhostUserDevice for GpuBackend {
.context("failed to create EventAsync for kick_evt")?;
let reader = SharedReader {
queue: queue.clone(),
doorbell: doorbell.clone(),
};
let state = if let Some(s) = self.state.as_ref() {

View file

@ -157,13 +157,7 @@ pub trait VhostUserDevice {
/// Indicates that the backend should start processing requests for virtio queue number `idx`.
/// This method must not block the current thread so device backends should either spawn an
/// async task or another thread to handle messages from the Queue.
fn start_queue(
&mut self,
idx: usize,
queue: Queue,
mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()>;
fn start_queue(&mut self, idx: usize, queue: Queue, mem: GuestMemory) -> anyhow::Result<()>;
/// Indicates that the backend should stop processing requests for virtio queue number `idx`.
/// This method should return the queue passed to `start_queue` for the corresponding `idx`.
@ -579,7 +573,9 @@ impl<T: VhostUserDevice> vmm_vhost::Backend for DeviceRequestHandler<T> {
.cloned()
.ok_or(VhostError::InvalidOperation)?;
let queue = match vring.queue.activate(&mem, kick_evt) {
let doorbell = vring.doorbell.clone().ok_or(VhostError::InvalidOperation)?;
let queue = match vring.queue.activate(&mem, kick_evt, doorbell) {
Ok(queue) => queue,
Err(e) => {
error!("failed to activate vring: {:#}", e);
@ -587,12 +583,7 @@ impl<T: VhostUserDevice> vmm_vhost::Backend for DeviceRequestHandler<T> {
}
};
let doorbell = vring.doorbell.clone().ok_or(VhostError::InvalidOperation)?;
if let Err(e) = self
.backend
.start_queue(index as usize, queue, mem, doorbell)
{
if let Err(e) = self.backend.start_queue(index as usize, queue, mem) {
error!("Failed to start queue {}: {}", index, e);
return Err(VhostError::BackendInternalError);
}
@ -1122,7 +1113,6 @@ mod tests {
idx: usize,
queue: Queue,
_mem: GuestMemory,
_doorbell: Interrupt,
) -> anyhow::Result<()> {
self.active_queues[idx] = Some(queue);
Ok(())
@ -1218,7 +1208,7 @@ mod tests {
let mut queue = QueueConfig::new(0x10, 0);
queue.set_ready(true);
let queue = queue
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
queues.insert(idx, queue);
}

View file

@ -35,7 +35,6 @@ use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
use crate::virtio::vhost::user::device::handler::Error as DeviceError;
use crate::virtio::vhost::user::device::handler::VhostUserDevice;
use crate::virtio::vhost::user::VhostUserDeviceBuilder;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
thread_local! {
@ -49,7 +48,6 @@ const MAX_QUEUE_NUM: usize = 3; /* rx, tx, ctrl */
async fn run_tx_queue<T: TapT>(
mut queue: Queue,
mut tap: T,
doorbell: Interrupt,
kick_evt: EventAsync,
mut stop_rx: oneshot::Receiver<()>,
) -> Queue {
@ -69,7 +67,7 @@ async fn run_tx_queue<T: TapT>(
}
}
process_tx(&doorbell, &mut queue, &mut tap);
process_tx(&mut queue, &mut tap);
}
queue
}
@ -77,7 +75,6 @@ async fn run_tx_queue<T: TapT>(
async fn run_ctrl_queue<T: TapT>(
mut queue: Queue,
mut tap: T,
doorbell: Interrupt,
kick_evt: EventAsync,
acked_features: u64,
vq_pairs: u16,
@ -99,7 +96,7 @@ async fn run_ctrl_queue<T: TapT>(
}
}
if let Err(e) = process_ctrl(&doorbell, &mut queue, &mut tap, acked_features, vq_pairs) {
if let Err(e) = process_ctrl(&mut queue, &mut tap, acked_features, vq_pairs) {
error!("Failed to process ctrl queue: {}", e);
break;
}
@ -178,9 +175,8 @@ where
idx: usize,
queue: virtio::Queue,
mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
sys::start_queue(self, idx, queue, mem, doorbell)
sys::start_queue(self, idx, queue, mem)
}
fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {

View file

@ -41,7 +41,6 @@ use crate::virtio::vhost::user::device::net::run_ctrl_queue;
use crate::virtio::vhost::user::device::net::run_tx_queue;
use crate::virtio::vhost::user::device::net::NetBackend;
use crate::virtio::vhost::user::device::net::NET_EXECUTOR;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
struct TapConfig {
@ -141,7 +140,6 @@ where
async fn run_rx_queue<T: TapT>(
mut queue: Queue,
mut tap: IoSource<T>,
doorbell: Interrupt,
kick_evt: EventAsync,
mut stop_rx: oneshot::Receiver<()>,
) -> Queue {
@ -162,7 +160,7 @@ async fn run_rx_queue<T: TapT>(
}
}
match process_rx(&doorbell, &mut queue, tap.as_source_mut()) {
match process_rx(&mut queue, tap.as_source_mut()) {
Ok(()) => {}
Err(NetError::RxDescriptorsExhausted) => {
if let Err(e) = kick_evt.next_val().await {
@ -185,7 +183,6 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
idx: usize,
queue: virtio::Queue,
_mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
if backend.workers[idx].is_some() {
warn!("Starting new queue handler without stopping old handler");
@ -214,14 +211,14 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
(
ex.spawn_local(run_rx_queue(queue, tap, doorbell, kick_evt, stop_rx)),
ex.spawn_local(run_rx_queue(queue, tap, kick_evt, stop_rx)),
stop_tx,
)
}
1 => {
let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
(
ex.spawn_local(run_tx_queue(queue, tap, doorbell, kick_evt, stop_rx)),
ex.spawn_local(run_tx_queue(queue, tap, kick_evt, stop_rx)),
stop_tx,
)
}
@ -231,7 +228,6 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
ex.spawn_local(run_ctrl_queue(
queue,
tap,
doorbell,
kick_evt,
backend.acked_features,
1, /* vq_pairs */

View file

@ -86,7 +86,6 @@ where
async fn run_rx_queue<T: TapT>(
mut queue: Queue,
mut tap: IoSource<T>,
call_evt: Interrupt,
kick_evt: EventAsync,
read_notifier: EventAsync,
mut overlapped_wrapper: OverlappedWrapper,
@ -132,7 +131,6 @@ async fn run_rx_queue<T: TapT>(
}
let needs_interrupt = process_rx(
&call_evt,
&mut queue,
tap.as_source_mut(),
&mut rx_buf,
@ -141,7 +139,7 @@ async fn run_rx_queue<T: TapT>(
&mut overlapped_wrapper,
);
if needs_interrupt {
call_evt.signal_used_queue(queue.vector());
queue.trigger_interrupt();
}
// There aren't any RX descriptors available for us to write packets to. Wait for the guest
@ -171,7 +169,6 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
idx: usize,
queue: virtio::Queue,
_mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
if backend.workers.get(idx).is_some() {
warn!("Starting new queue handler without stopping old handler");
@ -213,7 +210,6 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
ex.spawn_local(run_rx_queue(
queue,
tap,
doorbell,
kick_evt,
read_notifier,
overlapped_wrapper,
@ -225,7 +221,7 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
1 => {
let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
(
ex.spawn_local(run_tx_queue(queue, tap, doorbell, kick_evt, stop_rx)),
ex.spawn_local(run_tx_queue(queue, tap, kick_evt, stop_rx)),
stop_tx,
)
}
@ -235,7 +231,6 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
ex.spawn_local(run_ctrl_queue(
queue,
tap,
doorbell,
kick_evt,
backend.acked_features,
1, /* vq_pairs */

View file

@ -51,7 +51,6 @@ use crate::virtio::vhost::user::device::handler::Error as DeviceError;
use crate::virtio::vhost::user::device::handler::VhostUserDevice;
use crate::virtio::vhost::user::device::handler::WorkerState;
use crate::virtio::vhost::user::VhostUserDeviceBuilder;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
// Async workers:
@ -177,7 +176,6 @@ impl VhostUserDevice for SndBackend {
idx: usize,
queue: virtio::Queue,
_mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
if self.workers[idx].is_some() {
warn!(
@ -214,7 +212,6 @@ impl VhostUserDevice for SndBackend {
&snd_data,
ctrl_queue,
&mut kick_evt,
doorbell,
tx_send,
rx_send,
card_index,
@ -248,7 +245,7 @@ impl VhostUserDevice for SndBackend {
let queue_response_queue = queue.clone();
let response_queue_task = self.ex.spawn_local(async move {
send_pcm_response_worker(queue_response_queue, doorbell, &mut recv, None).await
send_pcm_response_worker(queue_response_queue, &mut recv, None).await
});
self.response_workers[idx - PCM_RESPONSE_WORKER_IDX_OFFSET] = Some(WorkerState {

View file

@ -46,13 +46,11 @@ use crate::virtio::vhost::user::device::handler::WorkerState;
use crate::virtio::vhost::user::device::listener::sys::VhostUserListener;
use crate::virtio::vhost::user::device::listener::VhostUserListenerTrait;
use crate::virtio::wl;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::virtio::SharedMemoryRegion;
async fn run_out_queue(
queue: Rc<RefCell<Queue>>,
doorbell: Interrupt,
kick_evt: EventAsync,
wlstate: Rc<RefCell<wl::WlState>>,
) {
@ -62,17 +60,12 @@ async fn run_out_queue(
break;
}
wl::process_out_queue(
&doorbell,
&mut queue.borrow_mut(),
&mut wlstate.borrow_mut(),
);
wl::process_out_queue(&mut queue.borrow_mut(), &mut wlstate.borrow_mut());
}
}
async fn run_in_queue(
queue: Rc<RefCell<Queue>>,
doorbell: Interrupt,
kick_evt: EventAsync,
wlstate: Rc<RefCell<wl::WlState>>,
wlstate_ctx: IoSource<AsyncWrapper<SafeDescriptor>>,
@ -86,11 +79,8 @@ async fn run_in_queue(
break;
}
if wl::process_in_queue(
&doorbell,
&mut queue.borrow_mut(),
&mut wlstate.borrow_mut(),
) == Err(wl::DescriptorsExhausted)
if wl::process_in_queue(&mut queue.borrow_mut(), &mut wlstate.borrow_mut())
== Err(wl::DescriptorsExhausted)
{
if let Err(e) = kick_evt.next_val().await {
error!("Failed to read kick event for in queue: {}", e);
@ -172,13 +162,7 @@ impl VhostUserDevice for WlBackend {
fn read_config(&self, _offset: u64, _dst: &mut [u8]) {}
fn start_queue(
&mut self,
idx: usize,
queue: Queue,
_mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
fn start_queue(&mut self, idx: usize, queue: Queue, _mem: GuestMemory) -> anyhow::Result<()> {
if self.workers[idx].is_some() {
warn!("Starting new queue handler without stopping old handler");
self.stop_queue(idx)?;
@ -248,17 +232,12 @@ impl VhostUserDevice for WlBackend {
.context("failed to create async WaitContext")
})?;
self.ex.spawn_local(run_in_queue(
queue.clone(),
doorbell,
kick_evt,
wlstate,
wlstate_ctx,
))
self.ex
.spawn_local(run_in_queue(queue.clone(), kick_evt, wlstate, wlstate_ctx))
}
1 => self
.ex
.spawn_local(run_out_queue(queue.clone(), doorbell, kick_evt, wlstate)),
.spawn_local(run_out_queue(queue.clone(), kick_evt, wlstate)),
_ => bail!("attempted to start unknown queue: {}", idx),
};
self.workers[idx] = Some(WorkerState { queue_task, queue });

View file

@ -213,7 +213,7 @@ impl VirtioDevice for Vsock {
.expect("failed to write transport reset event");
let len = avail_desc.writer.bytes_written() as u32;
event_queue.add_used(avail_desc, len);
event_queue.trigger_interrupt(&interrupt);
event_queue.trigger_interrupt();
}
self.event_queue = Some(event_queue);

View file

@ -214,7 +214,7 @@ impl VirtioDevice for VideoDevice {
fn activate(
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
_interrupt: Interrupt,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != QUEUE_SIZES.len() {
@ -237,7 +237,7 @@ impl VirtioDevice for VideoDevice {
.resource_bridge
.take()
.context("no resource bridge is passed")?;
let mut worker = Worker::new(cmd_queue, interrupt.clone(), event_queue, interrupt);
let mut worker = Worker::new(cmd_queue, event_queue);
let worker_result = match &self.device_type {
#[cfg(feature = "video-decoder")]

View file

@ -36,19 +36,14 @@ use crate::virtio::video::response::Response;
use crate::virtio::video::Error;
use crate::virtio::video::Result;
use crate::virtio::DescriptorChain;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
/// Worker that takes care of running the virtio video device.
pub struct Worker {
/// VirtIO queue for Command queue
cmd_queue: Queue,
/// Device-to-driver notification for command queue
cmd_queue_interrupt: Interrupt,
/// VirtIO queue for Event queue
event_queue: Queue,
/// Device-to-driver notification for the event queue.
event_queue_interrupt: Interrupt,
/// Stores descriptor chains in which responses for asynchronous commands will be written
desc_map: AsyncCmdDescMap,
}
@ -57,17 +52,10 @@ pub struct Worker {
type WritableResp = (DescriptorChain, response::CmdResponse);
impl Worker {
pub fn new(
cmd_queue: Queue,
cmd_queue_interrupt: Interrupt,
event_queue: Queue,
event_queue_interrupt: Interrupt,
) -> Self {
pub fn new(cmd_queue: Queue, event_queue: Queue) -> Self {
Self {
cmd_queue,
cmd_queue_interrupt,
event_queue,
event_queue_interrupt,
desc_map: Default::default(),
}
}
@ -87,7 +75,7 @@ impl Worker {
let len = desc.writer.bytes_written() as u32;
self.cmd_queue.add_used(desc, len);
}
self.cmd_queue.trigger_interrupt(&self.cmd_queue_interrupt);
self.cmd_queue.trigger_interrupt();
Ok(())
}
@ -103,8 +91,7 @@ impl Worker {
.map_err(|error| Error::WriteEventFailure { event, error })?;
let len = desc.writer.bytes_written() as u32;
self.event_queue.add_used(desc, len);
self.event_queue
.trigger_interrupt(&self.event_queue_interrupt);
self.event_queue.trigger_interrupt();
Ok(())
}
@ -334,7 +321,7 @@ impl Worker {
.and_then(|wc| {
// resampling event exists per-PCI-INTx basis, so the two queues have the same event.
// Thus, checking only cmd_queue_interrupt suffices.
if let Some(resample_evt) = self.cmd_queue_interrupt.get_resample_evt() {
if let Some(resample_evt) = self.cmd_queue.interrupt().get_resample_evt() {
wc.add(resample_evt, Token::InterruptResample)?;
}
Ok(wc)
@ -364,11 +351,12 @@ impl Worker {
// resample exists. resampling event exists per-PCI-INTx basis, so the
// two queues have the same event.
let _ = self
.cmd_queue_interrupt
.cmd_queue
.interrupt()
.get_resample_evt()
.expect("resample event for the command queue doesn't exist")
.wait();
self.cmd_queue_interrupt.do_interrupt_resample();
self.cmd_queue.interrupt().do_interrupt_resample();
}
Token::Kill => return Ok(()),
}

View file

@ -278,6 +278,7 @@ macro_rules! suspendable_virtio_tests {
num_queues: usize,
queue_size: u16,
mem: &GuestMemory,
interrupt: Interrupt,
) -> BTreeMap<usize, Queue> {
let mut queues = BTreeMap::new();
for i in 0..num_queues {
@ -285,7 +286,7 @@ macro_rules! suspendable_virtio_tests {
let mut queue = QueueConfig::new(queue_size, 0);
queue.set_ready(true);
let queue = queue
.activate(mem, base::Event::new().unwrap())
.activate(mem, base::Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
queues.insert(i, queue);
}
@ -314,6 +315,7 @@ macro_rules! suspendable_virtio_tests {
.cloned()
.expect("missing queue size"),
&mem,
interrupt.clone(),
);
device
.activate(mem.clone(), interrupt.clone(), queues)
@ -341,6 +343,7 @@ macro_rules! suspendable_virtio_tests {
.cloned()
.expect("missing queue size"),
&mem,
interrupt.clone(),
);
device
.activate(mem.clone(), interrupt.clone(), queues)

View file

@ -156,7 +156,7 @@ impl VirtioMmioDevice {
Ok((
queue_index,
queue
.activate(&mem, queue_evt)
.activate(&mem, queue_evt, interrupt.clone())
.context("failed to activate queue")?,
))
})

View file

@ -651,7 +651,7 @@ impl VirtioPciDevice {
Ok((
queue_index,
queue
.activate(&self.mem, queue_evt)
.activate(&self.mem, queue_evt, interrupt.clone())
.context("failed to activate queue")?,
))
})
@ -1316,6 +1316,30 @@ impl Suspendable for VirtioPciDevice {
self.msix_config.lock().restore(deser.msix_config)?;
self.common_config = deser.common_config;
// Restore the interrupt. This must be done after restoring the MSI-X configuration, but
// before restoring the queues.
if let Some(deser_interrupt) = deser.interrupt {
self.interrupt = Some(Interrupt::new_from_snapshot(
self.interrupt_evt
.as_ref()
.ok_or_else(|| anyhow!("{} interrupt_evt is none", self.debug_label()))?
.try_clone()
.with_context(|| {
format!("{} failed to clone interrupt_evt", self.debug_label())
})?,
Some(self.msix_config.clone()),
self.common_config.msix_config,
deser_interrupt,
#[cfg(target_arch = "x86_64")]
Some((
PmWakeupEvent::new(self.vm_control_tube.clone(), self.pm_config.clone()),
MetricEventType::VirtioWakeup {
virtio_id: self.device.device_type() as u32,
},
)),
));
}
assert_eq!(
self.queues.len(),
deser.queues.len(),
@ -1337,6 +1361,10 @@ impl Suspendable for VirtioPciDevice {
};
// Restore `sleep_state`.
if let Some(activated_queues_snapshot) = deser.activated_queues {
let interrupt = self
.interrupt
.as_ref()
.context("tried to restore active queues without an interrupt")?;
let mut activated_queues = BTreeMap::new();
for (index, queue_snapshot) in activated_queues_snapshot {
let queue_config = self
@ -1352,7 +1380,13 @@ impl Suspendable for VirtioPciDevice {
.context("failed to clone queue event")?;
activated_queues.insert(
index,
Queue::restore(queue_config, queue_snapshot, &self.mem, queue_evt)?,
Queue::restore(
queue_config,
queue_snapshot,
&self.mem,
queue_evt,
interrupt.clone(),
)?,
);
}
@ -1362,32 +1396,6 @@ impl Suspendable for VirtioPciDevice {
self.sleep_state = Some(SleepState::Inactive);
}
// Also replicate the other work in activate: initialize the interrupt and queues
// events. This could just as easily be done in `wake` instead.
// NOTE: Needs to be done last in `restore` because it relies on the other VirtioPciDevice
// fields.
if let Some(deser_interrupt) = deser.interrupt {
self.interrupt = Some(Interrupt::new_from_snapshot(
self.interrupt_evt
.as_ref()
.ok_or_else(|| anyhow!("{} interrupt_evt is none", self.debug_label()))?
.try_clone()
.with_context(|| {
format!("{} failed to clone interrupt_evt", self.debug_label())
})?,
Some(self.msix_config.clone()),
self.common_config.msix_config,
deser_interrupt,
#[cfg(target_arch = "x86_64")]
Some((
PmWakeupEvent::new(self.vm_control_tube.clone(), self.pm_config.clone()),
MetricEventType::VirtioWakeup {
virtio_id: self.device.device_type() as u32,
},
)),
));
}
// Call register_io_events for the activated queue events.
let bar0 = self.config_regs.get_bar_addr(self.settings_bar);
let notify_base = bar0 + NOTIFICATION_BAR_OFFSET;

View file

@ -721,7 +721,7 @@ impl Worker {
}
queue.add_used(avail_desc, 0);
queue.trigger_interrupt(&self.interrupt);
queue.trigger_interrupt();
}
Ok(queue)
@ -1379,7 +1379,7 @@ impl Worker {
let bytes_written = writer.bytes_written() as u32;
if bytes_written > 0 {
queue.add_used(desc_chain, bytes_written);
queue.trigger_interrupt(&self.interrupt);
queue.trigger_interrupt();
Ok(())
} else {
error!("vsock: failed to write bytes to queue");

View file

@ -1765,7 +1765,6 @@ pub struct DescriptorsExhausted;
/// Handle incoming events and forward them to the VM over the input queue.
pub fn process_in_queue(
interrupt: &Interrupt,
in_queue: &mut Queue,
state: &mut WlState,
) -> ::std::result::Result<(), DescriptorsExhausted> {
@ -1804,7 +1803,7 @@ pub fn process_in_queue(
}
if needs_interrupt {
in_queue.trigger_interrupt(interrupt);
in_queue.trigger_interrupt();
}
if exhausted_queue {
@ -1815,7 +1814,7 @@ pub fn process_in_queue(
}
/// Handle messages from the output queue and forward them to the display sever, if necessary.
pub fn process_out_queue(interrupt: &Interrupt, out_queue: &mut Queue, state: &mut WlState) {
pub fn process_out_queue(out_queue: &mut Queue, state: &mut WlState) {
let mut needs_interrupt = false;
while let Some(mut desc) = out_queue.pop() {
let resp = match state.execute(&mut desc.reader) {
@ -1836,7 +1835,7 @@ pub fn process_out_queue(interrupt: &Interrupt, out_queue: &mut Queue, state: &m
}
if needs_interrupt {
out_queue.trigger_interrupt(interrupt);
out_queue.trigger_interrupt();
}
}
@ -1927,12 +1926,12 @@ impl Worker {
}
Token::OutQueue => {
let _ = self.out_queue.event().wait();
process_out_queue(&self.interrupt, &mut self.out_queue, &mut self.state);
process_out_queue(&mut self.out_queue, &mut self.state);
}
Token::Kill => break 'wait,
Token::State => {
if let Err(DescriptorsExhausted) =
process_in_queue(&self.interrupt, &mut self.in_queue, &mut self.state)
process_in_queue(&mut self.in_queue, &mut self.state)
{
if let Err(e) =
wait_ctx.modify(&self.state.wait_ctx, EventType::None, Token::State)

View file

@ -123,7 +123,7 @@ pub fn process_fs_queue<F: FileSystem + Sync>(
let total = server.handle_message(reader, writer, &mapper)?;
queue.add_used(mem, avail_desc.index, total as u32);
queue.trigger_interrupt(mem, &*interrupt);
queue.trigger_interrupt();
}
```
@ -157,7 +157,7 @@ pub fn process_fs_queue<F: FileSystem + Sync>(
let total = server.handle_message(reader, writer, &mapper)?;
queue.add_used(mem, avail_desc.index, total as u32);
queue.trigger_interrupt(mem, &*interrupt);
queue.trigger_interrupt();
}
```

View file

@ -79,11 +79,19 @@ fuzz_target!(|bytes| {
return;
}
let interrupt = Interrupt::new(
IrqLevelEvent::new().unwrap(),
None, // msix_config
0xFFFF, // VIRTIO_MSI_NO_VECTOR
#[cfg(target_arch = "x86_64")]
None,
);
let mut q = QueueConfig::new(QUEUE_SIZE, 0);
q.set_size(QUEUE_SIZE / 2);
q.set_ready(true);
let q = q
.activate(&mem, Event::new().unwrap())
.activate(&mem, Event::new().unwrap(), interrupt.clone())
.expect("QueueConfig::activate");
let queue_evt = q.event().try_clone().unwrap();
@ -102,17 +110,7 @@ fuzz_target!(|bytes| {
.unwrap();
block
.activate(
mem,
Interrupt::new(
IrqLevelEvent::new().unwrap(),
None, // msix_config
0xFFFF, // VIRTIO_MSI_NO_VECTOR
#[cfg(target_arch = "x86_64")]
None,
),
BTreeMap::from([(0, q)]),
)
.activate(mem, interrupt, BTreeMap::from([(0, q)]))
.unwrap();
queue_evt.signal().unwrap(); // Rings the doorbell

View file

@ -12,7 +12,9 @@ use std::mem::size_of;
use base::Event;
use crosvm_fuzz::fuzz_target;
use crosvm_fuzz::rand::FuzzRng;
use devices::virtio::Interrupt;
use devices::virtio::QueueConfig;
use devices::IrqLevelEvent;
use rand::Rng;
use rand::RngCore;
use vm_memory::GuestAddress;
@ -58,6 +60,14 @@ struct virtq_used {
}
fuzz_target!(|data: &[u8]| {
let interrupt = Interrupt::new(
IrqLevelEvent::new().unwrap(),
None, // msix_config
0xFFFF, // VIRTIO_MSI_NO_VECTOR
#[cfg(target_arch = "x86_64")]
None,
);
let mut q = QueueConfig::new(MAX_QUEUE_SIZE, 0);
let mut rng = FuzzRng::new(data);
q.set_size(rng.gen());
@ -75,7 +85,7 @@ fuzz_target!(|data: &[u8]| {
q.set_ready(true);
GUEST_MEM.with(|mem| {
let mut q = if let Ok(q) = q.activate(mem, Event::new().unwrap()) {
let mut q = if let Ok(q) = q.activate(mem, Event::new().unwrap(), interrupt) {
q
} else {
return;