From 4d5b35060203e1c3526bfa5947ff9cd200247e69 Mon Sep 17 00:00:00 2001 From: Chirantan Ekbote Date: Thu, 20 May 2021 18:53:41 +0900 Subject: [PATCH] wl: Move queue handling to standalone functions This will make it easier to run the virtqueues from a vhost-user process. BUG=b:179755841 TEST=Start crostini and play gnome-mahjongg. Change-Id: Ic4132a66ef7718fe85307f5d743110645288967a Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2910215 Tested-by: kokoro Commit-Queue: Chirantan Ekbote Reviewed-by: Keiichi Watanabe Reviewed-by: Zach Reizner --- devices/src/virtio/wl.rs | 260 +++++++++++++++++++++++---------------- 1 file changed, 151 insertions(+), 109 deletions(-) diff --git a/devices/src/virtio/wl.rs b/devices/src/virtio/wl.rs index e993ec5911..1a4e9460e7 100644 --- a/devices/src/virtio/wl.rs +++ b/devices/src/virtio/wl.rs @@ -51,7 +51,7 @@ use libc::{EBADF, EINVAL}; use data_model::*; use base::{ - error, pipe, round_up_to_page_size, warn, AsRawDescriptor, Error, Event, FileFlags, + error, pipe, round_up_to_page_size, warn, AsRawDescriptor, Error, Event, EventType, FileFlags, FromRawDescriptor, PollToken, RawDescriptor, Result, ScmSocket, SharedMemory, SharedMemoryUnix, Tube, TubeError, WaitContext, }; @@ -69,9 +69,7 @@ use vm_control::GpuMemoryDesc; use super::resource_bridge::{ get_resource_info, BufferInfo, ResourceBridgeError, ResourceInfo, ResourceRequest, }; -use super::{ - DescriptorChain, Interrupt, Queue, Reader, SignalableInterrupt, VirtioDevice, Writer, TYPE_WL, -}; +use super::{Interrupt, Queue, Reader, SignalableInterrupt, VirtioDevice, Writer, TYPE_WL}; use vm_control::{MemSlot, VmMemoryRequest, VmMemoryResponse}; const VIRTWL_SEND_MAX_ALLOCS: usize = 28; @@ -1440,6 +1438,122 @@ impl WlState { } } +#[derive(ThisError, Debug)] +#[error("no descriptors available in queue")] +struct DescriptorsExhausted; + +fn process_in_queue( + interrupt: &I, + in_queue: &mut Queue, + mem: &GuestMemory, + state: &mut WlState, +) -> ::std::result::Result<(), DescriptorsExhausted> { + const MIN_IN_DESC_LEN: u32 = + (size_of::() + size_of::() * VIRTWL_SEND_MAX_ALLOCS) as u32; + + state.process_wait_context(); + + let mut needs_interrupt = false; + let mut exhausted_queue = false; + loop { + let desc = if let Some(d) = in_queue.peek(mem) { + d + } else { + exhausted_queue = true; + break; + }; + if desc.len < MIN_IN_DESC_LEN || desc.is_read_only() { + needs_interrupt = true; + in_queue.pop_peeked(mem); + in_queue.add_used(mem, desc.index, 0); + continue; + } + + let index = desc.index; + let mut should_pop = false; + if let Some(in_resp) = state.next_recv() { + let bytes_written = match Writer::new(mem.clone(), desc) { + Ok(mut writer) => { + match encode_resp(&mut writer, in_resp) { + Ok(()) => { + should_pop = true; + } + Err(e) => { + error!("failed to encode response to descriptor chain: {}", e); + } + }; + writer.bytes_written() as u32 + } + Err(e) => { + error!("invalid descriptor: {}", e); + 0 + } + }; + + needs_interrupt = true; + in_queue.pop_peeked(mem); + in_queue.add_used(mem, index, bytes_written); + } else { + break; + } + if should_pop { + state.pop_recv(); + } + } + + if needs_interrupt { + interrupt.signal_used_queue(in_queue.vector); + } + + if exhausted_queue { + Err(DescriptorsExhausted) + } else { + Ok(()) + } +} + +fn process_out_queue( + interrupt: &I, + out_queue: &mut Queue, + mem: &GuestMemory, + state: &mut WlState, +) { + let mut needs_interrupt = false; + while let Some(desc) = out_queue.pop(mem) { + let desc_index = desc.index; + match ( + Reader::new(mem.clone(), desc.clone()), + Writer::new(mem.clone(), desc), + ) { + (Ok(mut reader), Ok(mut writer)) => { + let resp = match state.execute(&mut reader) { + Ok(r) => r, + Err(e) => WlResp::Err(Box::new(e)), + }; + + match encode_resp(&mut writer, resp) { + Ok(()) => {} + Err(e) => { + error!("failed to encode response to descriptor chain: {}", e); + } + } + + out_queue.add_used(mem, desc_index, writer.bytes_written() as u32); + needs_interrupt = true; + } + (_, Err(e)) | (Err(e), _) => { + error!("invalid descriptor: {}", e); + out_queue.add_used(mem, desc_index, 0); + needs_interrupt = true; + } + } + } + + if needs_interrupt { + interrupt.signal_used_queue(out_queue.vector); + } +} + struct Worker { interrupt: Interrupt, mem: GuestMemory, @@ -1476,8 +1590,6 @@ impl Worker { } fn run(&mut self, mut queue_evts: Vec, kill_evt: Event) { - let mut in_desc_chains: VecDeque = - VecDeque::with_capacity(QUEUE_SIZE as usize); let in_queue_evt = queue_evts.remove(0); let out_queue_evt = queue_evts.remove(0); #[derive(PollToken)] @@ -1511,9 +1623,8 @@ impl Worker { } } + let mut watching_state_ctx = true; 'wait: loop { - let mut signal_used_in = false; - let mut signal_used_out = false; let events = match wait_ctx.wait() { Ok(v) => v, Err(e) => { @@ -1526,119 +1637,50 @@ impl Worker { match event.token { Token::InQueue => { let _ = in_queue_evt.read(); - // Used to buffer descriptor indexes that are invalid for our uses. - let mut rejects = [0u16; QUEUE_SIZE as usize]; - let mut rejects_len = 0; - let min_in_desc_len = (size_of::() - + size_of::() * VIRTWL_SEND_MAX_ALLOCS) - as u32; - in_desc_chains.extend(self.in_queue.iter(&self.mem).filter(|d| { - if d.len >= min_in_desc_len && d.is_write_only() { - true - } else { - // Can not use queue.add_used directly because it's being borrowed - // for the iterator chain, so we buffer the descriptor index in - // rejects. - rejects[rejects_len] = d.index; - rejects_len += 1; - false + if !watching_state_ctx { + if let Err(e) = + wait_ctx.modify(&self.state.wait_ctx, EventType::Read, Token::State) + { + error!("Failed to modify wait_ctx descriptor for WlState: {}", e); + break; } - })); - for &reject in &rejects[..rejects_len] { - signal_used_in = true; - self.in_queue.add_used(&self.mem, reject, 0); + watching_state_ctx = true; } } Token::OutQueue => { let _ = out_queue_evt.read(); - while let Some(desc) = self.out_queue.pop(&self.mem) { - let desc_index = desc.index; - match ( - Reader::new(self.mem.clone(), desc.clone()), - Writer::new(self.mem.clone(), desc), - ) { - (Ok(mut reader), Ok(mut writer)) => { - let resp = match self.state.execute(&mut reader) { - Ok(r) => r, - Err(e) => WlResp::Err(Box::new(e)), - }; - - match encode_resp(&mut writer, resp) { - Ok(()) => {} - Err(e) => { - error!( - "failed to encode response to descriptor chain: {}", - e - ); - } - } - - self.out_queue.add_used( - &self.mem, - desc_index, - writer.bytes_written() as u32, - ); - signal_used_out = true; - } - (_, Err(e)) | (Err(e), _) => { - error!("invalid descriptor: {}", e); - self.out_queue.add_used(&self.mem, desc_index, 0); - signal_used_out = true; - } - } - } + process_out_queue( + &self.interrupt, + &mut self.out_queue, + &self.mem, + &mut self.state, + ); } Token::Kill => break 'wait, - Token::State => self.state.process_wait_context(), + Token::State => { + if let Err(DescriptorsExhausted) = process_in_queue( + &self.interrupt, + &mut self.in_queue, + &self.mem, + &mut self.state, + ) { + if let Err(e) = + wait_ctx.modify(&self.state.wait_ctx, EventType::None, Token::State) + { + error!( + "Failed to stop watching wait_ctx descriptor for WlState: {}", + e + ); + break; + } + watching_state_ctx = false; + } + } Token::InterruptResample => { self.interrupt.interrupt_resample(); } } } - - // Because this loop should be retried after the in queue is usable or after one of the - // VFDs was read, we do it after the poll event responses. - while !in_desc_chains.is_empty() { - let mut should_pop = false; - if let Some(in_resp) = self.state.next_recv() { - // in_desc_chains is not empty (checked by loop condition) so unwrap is safe. - let desc = in_desc_chains.pop_front().unwrap(); - let index = desc.index; - match Writer::new(self.mem.clone(), desc) { - Ok(mut writer) => { - match encode_resp(&mut writer, in_resp) { - Ok(()) => { - should_pop = true; - } - Err(e) => { - error!("failed to encode response to descriptor chain: {}", e); - } - }; - signal_used_in = true; - self.in_queue - .add_used(&self.mem, index, writer.bytes_written() as u32); - } - Err(e) => { - error!("invalid descriptor: {}", e); - self.in_queue.add_used(&self.mem, index, 0); - signal_used_in = true; - } - } - } else { - break; - } - if should_pop { - self.state.pop_recv(); - } - } - - if signal_used_in { - self.interrupt.signal_used_queue(self.in_queue.vector); - } - - if signal_used_out { - self.interrupt.signal_used_queue(self.out_queue.vector); - } } } }