wl: Move queue handling to standalone functions

This will make it easier to run the virtqueues from a vhost-user
process.

BUG=b:179755841
TEST=Start crostini and play gnome-mahjongg.

Change-Id: Ic4132a66ef7718fe85307f5d743110645288967a
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2910215
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Chirantan Ekbote <chirantan@chromium.org>
Reviewed-by: Keiichi Watanabe <keiichiw@chromium.org>
Reviewed-by: Zach Reizner <zachr@chromium.org>
This commit is contained in:
Chirantan Ekbote 2021-05-20 18:53:41 +09:00 committed by Commit Bot
parent b47434378d
commit 4d5b350602

View file

@ -51,7 +51,7 @@ use libc::{EBADF, EINVAL};
use data_model::*;
use base::{
error, pipe, round_up_to_page_size, warn, AsRawDescriptor, Error, Event, FileFlags,
error, pipe, round_up_to_page_size, warn, AsRawDescriptor, Error, Event, EventType, FileFlags,
FromRawDescriptor, PollToken, RawDescriptor, Result, ScmSocket, SharedMemory, SharedMemoryUnix,
Tube, TubeError, WaitContext,
};
@ -69,9 +69,7 @@ use vm_control::GpuMemoryDesc;
use super::resource_bridge::{
get_resource_info, BufferInfo, ResourceBridgeError, ResourceInfo, ResourceRequest,
};
use super::{
DescriptorChain, Interrupt, Queue, Reader, SignalableInterrupt, VirtioDevice, Writer, TYPE_WL,
};
use super::{Interrupt, Queue, Reader, SignalableInterrupt, VirtioDevice, Writer, TYPE_WL};
use vm_control::{MemSlot, VmMemoryRequest, VmMemoryResponse};
const VIRTWL_SEND_MAX_ALLOCS: usize = 28;
@ -1440,6 +1438,122 @@ impl WlState {
}
}
#[derive(ThisError, Debug)]
#[error("no descriptors available in queue")]
struct DescriptorsExhausted;
fn process_in_queue<I: SignalableInterrupt>(
interrupt: &I,
in_queue: &mut Queue,
mem: &GuestMemory,
state: &mut WlState,
) -> ::std::result::Result<(), DescriptorsExhausted> {
const MIN_IN_DESC_LEN: u32 =
(size_of::<CtrlVfdRecv>() + size_of::<Le32>() * VIRTWL_SEND_MAX_ALLOCS) as u32;
state.process_wait_context();
let mut needs_interrupt = false;
let mut exhausted_queue = false;
loop {
let desc = if let Some(d) = in_queue.peek(mem) {
d
} else {
exhausted_queue = true;
break;
};
if desc.len < MIN_IN_DESC_LEN || desc.is_read_only() {
needs_interrupt = true;
in_queue.pop_peeked(mem);
in_queue.add_used(mem, desc.index, 0);
continue;
}
let index = desc.index;
let mut should_pop = false;
if let Some(in_resp) = state.next_recv() {
let bytes_written = match Writer::new(mem.clone(), desc) {
Ok(mut writer) => {
match encode_resp(&mut writer, in_resp) {
Ok(()) => {
should_pop = true;
}
Err(e) => {
error!("failed to encode response to descriptor chain: {}", e);
}
};
writer.bytes_written() as u32
}
Err(e) => {
error!("invalid descriptor: {}", e);
0
}
};
needs_interrupt = true;
in_queue.pop_peeked(mem);
in_queue.add_used(mem, index, bytes_written);
} else {
break;
}
if should_pop {
state.pop_recv();
}
}
if needs_interrupt {
interrupt.signal_used_queue(in_queue.vector);
}
if exhausted_queue {
Err(DescriptorsExhausted)
} else {
Ok(())
}
}
fn process_out_queue<I: SignalableInterrupt>(
interrupt: &I,
out_queue: &mut Queue,
mem: &GuestMemory,
state: &mut WlState,
) {
let mut needs_interrupt = false;
while let Some(desc) = out_queue.pop(mem) {
let desc_index = desc.index;
match (
Reader::new(mem.clone(), desc.clone()),
Writer::new(mem.clone(), desc),
) {
(Ok(mut reader), Ok(mut writer)) => {
let resp = match state.execute(&mut reader) {
Ok(r) => r,
Err(e) => WlResp::Err(Box::new(e)),
};
match encode_resp(&mut writer, resp) {
Ok(()) => {}
Err(e) => {
error!("failed to encode response to descriptor chain: {}", e);
}
}
out_queue.add_used(mem, desc_index, writer.bytes_written() as u32);
needs_interrupt = true;
}
(_, Err(e)) | (Err(e), _) => {
error!("invalid descriptor: {}", e);
out_queue.add_used(mem, desc_index, 0);
needs_interrupt = true;
}
}
}
if needs_interrupt {
interrupt.signal_used_queue(out_queue.vector);
}
}
struct Worker {
interrupt: Interrupt,
mem: GuestMemory,
@ -1476,8 +1590,6 @@ impl Worker {
}
fn run(&mut self, mut queue_evts: Vec<Event>, kill_evt: Event) {
let mut in_desc_chains: VecDeque<DescriptorChain> =
VecDeque::with_capacity(QUEUE_SIZE as usize);
let in_queue_evt = queue_evts.remove(0);
let out_queue_evt = queue_evts.remove(0);
#[derive(PollToken)]
@ -1511,9 +1623,8 @@ impl Worker {
}
}
let mut watching_state_ctx = true;
'wait: loop {
let mut signal_used_in = false;
let mut signal_used_out = false;
let events = match wait_ctx.wait() {
Ok(v) => v,
Err(e) => {
@ -1526,119 +1637,50 @@ impl Worker {
match event.token {
Token::InQueue => {
let _ = in_queue_evt.read();
// Used to buffer descriptor indexes that are invalid for our uses.
let mut rejects = [0u16; QUEUE_SIZE as usize];
let mut rejects_len = 0;
let min_in_desc_len = (size_of::<CtrlVfdRecv>()
+ size_of::<Le32>() * VIRTWL_SEND_MAX_ALLOCS)
as u32;
in_desc_chains.extend(self.in_queue.iter(&self.mem).filter(|d| {
if d.len >= min_in_desc_len && d.is_write_only() {
true
} else {
// Can not use queue.add_used directly because it's being borrowed
// for the iterator chain, so we buffer the descriptor index in
// rejects.
rejects[rejects_len] = d.index;
rejects_len += 1;
false
if !watching_state_ctx {
if let Err(e) =
wait_ctx.modify(&self.state.wait_ctx, EventType::Read, Token::State)
{
error!("Failed to modify wait_ctx descriptor for WlState: {}", e);
break;
}
}));
for &reject in &rejects[..rejects_len] {
signal_used_in = true;
self.in_queue.add_used(&self.mem, reject, 0);
watching_state_ctx = true;
}
}
Token::OutQueue => {
let _ = out_queue_evt.read();
while let Some(desc) = self.out_queue.pop(&self.mem) {
let desc_index = desc.index;
match (
Reader::new(self.mem.clone(), desc.clone()),
Writer::new(self.mem.clone(), desc),
) {
(Ok(mut reader), Ok(mut writer)) => {
let resp = match self.state.execute(&mut reader) {
Ok(r) => r,
Err(e) => WlResp::Err(Box::new(e)),
};
match encode_resp(&mut writer, resp) {
Ok(()) => {}
Err(e) => {
error!(
"failed to encode response to descriptor chain: {}",
e
);
}
}
self.out_queue.add_used(
&self.mem,
desc_index,
writer.bytes_written() as u32,
);
signal_used_out = true;
}
(_, Err(e)) | (Err(e), _) => {
error!("invalid descriptor: {}", e);
self.out_queue.add_used(&self.mem, desc_index, 0);
signal_used_out = true;
}
}
}
process_out_queue(
&self.interrupt,
&mut self.out_queue,
&self.mem,
&mut self.state,
);
}
Token::Kill => break 'wait,
Token::State => self.state.process_wait_context(),
Token::State => {
if let Err(DescriptorsExhausted) = process_in_queue(
&self.interrupt,
&mut self.in_queue,
&self.mem,
&mut self.state,
) {
if let Err(e) =
wait_ctx.modify(&self.state.wait_ctx, EventType::None, Token::State)
{
error!(
"Failed to stop watching wait_ctx descriptor for WlState: {}",
e
);
break;
}
watching_state_ctx = false;
}
}
Token::InterruptResample => {
self.interrupt.interrupt_resample();
}
}
}
// Because this loop should be retried after the in queue is usable or after one of the
// VFDs was read, we do it after the poll event responses.
while !in_desc_chains.is_empty() {
let mut should_pop = false;
if let Some(in_resp) = self.state.next_recv() {
// in_desc_chains is not empty (checked by loop condition) so unwrap is safe.
let desc = in_desc_chains.pop_front().unwrap();
let index = desc.index;
match Writer::new(self.mem.clone(), desc) {
Ok(mut writer) => {
match encode_resp(&mut writer, in_resp) {
Ok(()) => {
should_pop = true;
}
Err(e) => {
error!("failed to encode response to descriptor chain: {}", e);
}
};
signal_used_in = true;
self.in_queue
.add_used(&self.mem, index, writer.bytes_written() as u32);
}
Err(e) => {
error!("invalid descriptor: {}", e);
self.in_queue.add_used(&self.mem, index, 0);
signal_used_in = true;
}
}
} else {
break;
}
if should_pop {
self.state.pop_recv();
}
}
if signal_used_in {
self.interrupt.signal_used_queue(self.in_queue.vector);
}
if signal_used_out {
self.interrupt.signal_used_queue(self.out_queue.vector);
}
}
}
}