Use a shared buffer for virtio-console data

The strategy of passing Vec<u8> types introduced in crrev.com/c/2470376
can cause panics when the receiving side doesn't have the capacity to
receive the full buffers produced by crosvm. For example, a
virtio-console implementation with only one-byte buffers
(https://android-review.googlesource.com/c/1853082) almost always won't
have the capacity to receive the full buffers.

Using a VecDeque ring buffer allows continually appending data from the
input in the read thread and pulling off as much as the guest can handle
in the guest communication thread, without sacrificing performance when
the guest can handle higher volumes.

Bug: b/182849835
Bug: b/203138623
Test: Pass some input with an unbuffered single-character console driver
Test: Run CtsKeystoreTestCases against cuttlefish (high volume transfers)
Change-Id: I6b52a729d5af82f4626a9b1f29176116299b9297
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3227733
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Cody Schuffelen <schuffelen@google.com>
Reviewed-by: Chirantan Ekbote <chirantan@chromium.org>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
This commit is contained in:
A. Cody Schuffelen 2021-10-15 16:28:33 -07:00 committed by Commit Bot
parent 610158aa55
commit bc50b0420a
2 changed files with 55 additions and 61 deletions

View file

@ -2,14 +2,17 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::collections::VecDeque;
use std::io::{self, Read, Write};
use std::ops::DerefMut;
use std::result;
use std::sync::mpsc::{channel, Receiver, TryRecvError};
use std::sync::Arc;
use std::thread;
use base::{error, Event, PollToken, RawDescriptor, WaitContext};
use data_model::{DataInit, Le16, Le32};
use remain::sorted;
use sync::Mutex;
use thiserror::Error as ThisError;
use vm_memory::GuestMemory;
@ -31,9 +34,6 @@ pub enum ConsoleError {
/// There are no more available descriptors to receive into
#[error("no rx descriptors available")]
RxDescriptorsExhausted,
/// Input channel has been disconnected
#[error("input channel disconnected")]
RxDisconnected,
}
#[derive(Copy, Clone, Debug, Default)]
@ -48,17 +48,18 @@ pub struct virtio_console_config {
// Safe because it only has data and has no implicit padding.
unsafe impl DataInit for virtio_console_config {}
/// Checks for input from `in_channel_opt` and transfers it to the receive queue, if any.
/// Checks for input from `buffer` and transfers it to the receive queue, if any.
///
/// # Arguments
///
/// * `mem` - The GuestMemory to write the data into
/// * `interrupt` - SignalableInterrupt used to signal that the queue has been used
/// * `in_channel_opt` - Optional input channel to read data from
/// * `buffer` - Ring buffer providing data to put into the guest
/// * `receive_queue` - The receive virtio Queue
pub fn handle_input<I: SignalableInterrupt>(
mem: &GuestMemory,
interrupt: &I,
in_channel: &Receiver<Vec<u8>>,
buffer: &mut VecDeque<u8>,
receive_queue: &mut Queue,
) -> result::Result<(), ConsoleError> {
let mut exhausted_queue = false;
@ -81,18 +82,15 @@ pub fn handle_input<I: SignalableInterrupt>(
}
};
let mut disconnected = false;
while writer.available_bytes() > 0 {
match in_channel.try_recv() {
Ok(data) => {
writer.write_all(&data).unwrap();
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
disconnected = true;
break;
}
}
while writer.available_bytes() > 0 && !buffer.is_empty() {
let (buffer_front, buffer_back) = buffer.as_slices();
let buffer_chunk = if !buffer_front.is_empty() {
buffer_front
} else {
buffer_back
};
let written = writer.write(buffer_chunk).unwrap();
drop(buffer.drain(..written));
}
let bytes_written = writer.bytes_written() as u32;
@ -103,10 +101,6 @@ pub fn handle_input<I: SignalableInterrupt>(
receive_queue.trigger_interrupt(mem, interrupt);
}
if disconnected {
return Err(ConsoleError::RxDisconnected);
}
if bytes_written == 0 {
break;
}
@ -176,16 +170,22 @@ fn write_output(output: &mut dyn io::Write, data: &[u8]) -> io::Result<()> {
output.flush()
}
/// Starts a thread that reads rx_input and sends the input back via the returned channel.
/// Starts a thread that reads rx and sends the input back via the returned buffer.
///
/// The caller should listen on `in_avail_event` for events. When `in_avail_event` signals that data
/// is available, the caller should lock the returned `Mutex` and read data out of the inner
/// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed.
///
/// # Arguments
/// * `rx_input` - Data source that the reader thread will wait on to send data back to the channel
/// * `in_avail_evt` - Event triggered by the thread when new input is available on the channel
///
/// * `rx` - Data source that the reader thread will wait on to send data back to the buffer
/// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
pub fn spawn_input_thread(
mut rx: Box<dyn io::Read + Send>,
in_avail_evt: &Event,
) -> Option<Receiver<Vec<u8>>> {
let (send_channel, recv_channel) = channel();
) -> Option<Arc<Mutex<VecDeque<u8>>>> {
let buffer = Arc::new(Mutex::new(VecDeque::<u8>::new()));
let buffer_cloned = buffer.clone();
let thread_in_avail_evt = match in_avail_evt.try_clone() {
Ok(evt) => evt,
@ -195,21 +195,16 @@ pub fn spawn_input_thread(
}
};
// The input thread runs in detached mode and will exit when channel is disconnected because
// the console device has been dropped.
// The input thread runs in detached mode.
let res = thread::Builder::new()
.name("console_input".to_string())
.spawn(move || {
let mut rx_buf = [0u8; 1 << 12];
loop {
let mut rx_buf = vec![0u8; 1 << 12];
match rx.read(&mut rx_buf) {
Ok(0) => break, // Assume the stream of input has ended.
Ok(size) => {
rx_buf.truncate(size);
if send_channel.send(rx_buf).is_err() {
// The receiver has disconnected.
break;
}
buffer.lock().extend(&rx_buf[0..size]);
thread_in_avail_evt.write(1).unwrap();
}
Err(e) => {
@ -229,7 +224,7 @@ pub fn spawn_input_thread(
error!("failed to spawn input thread: {}", e);
return None;
}
Some(recv_channel)
Some(buffer_cloned)
}
/// Writes the available data from the reader into the given output queue.
@ -276,7 +271,7 @@ impl Worker {
// generic way to add an io::Read instance to a poll context (it may not be backed by a file
// descriptor). Moving the blocking read call to a separate thread and sending data back to
// the main worker thread with an event for notification bridges this gap.
let mut in_channel = match self.input.take() {
let in_buffer = match self.input.take() {
Some(input) => spawn_input_thread(input, &in_avail_evt),
None => None,
};
@ -336,14 +331,15 @@ impl Worker {
error!("failed reading receive queue Event: {}", e);
break 'wait;
}
if let Some(ch) = in_channel.as_ref() {
match handle_input(&self.mem, &self.interrupt, ch, &mut receive_queue) {
if let Some(in_buf_ref) = in_buffer.as_ref() {
match handle_input(
&self.mem,
&self.interrupt,
in_buf_ref.lock().deref_mut(),
&mut receive_queue,
) {
Ok(()) => {}
Err(ConsoleError::RxDisconnected) => {
// Set in_channel to None so that future handle_input calls exit early.
in_channel.take();
}
// Other console errors are no-ops, so just continue.
// Console errors are no-ops, so just continue.
Err(_) => {
continue;
}
@ -355,14 +351,15 @@ impl Worker {
error!("failed reading in_avail_evt: {}", e);
break 'wait;
}
if let Some(ch) = in_channel.as_ref() {
match handle_input(&self.mem, &self.interrupt, ch, &mut receive_queue) {
if let Some(in_buf_ref) = in_buffer.as_ref() {
match handle_input(
&self.mem,
&self.interrupt,
in_buf_ref.lock().deref_mut(),
&mut receive_queue,
) {
Ok(()) => {}
Err(ConsoleError::RxDisconnected) => {
// Set in_channel to None so that future handle_input calls exit early.
in_channel.take();
}
// Other console errors are no-ops, so just continue.
// Console errors are no-ops, so just continue.
Err(_) => {
continue;
}

View file

@ -2,9 +2,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::collections::VecDeque;
use std::io::{self, stdin};
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use anyhow::{anyhow, bail, Context};
@ -52,7 +53,7 @@ async fn run_rx_queue(
mem: GuestMemory,
call_evt: Arc<Mutex<CallEvent>>,
kick_evt: EventAsync,
in_channel: Receiver<Vec<u8>>,
in_buffer: Arc<Mutex<VecDeque<u8>>>,
in_avail_evt: EventAsync,
) {
loop {
@ -60,7 +61,7 @@ async fn run_rx_queue(
error!("Failed reading in_avail_evt: {}", e);
break;
}
match handle_input(&mem, &call_evt, &in_channel, &mut queue) {
match handle_input(&mem, &call_evt, in_buffer.lock().deref_mut(), &mut queue) {
Ok(()) => {}
Err(ConsoleError::RxDescriptorsExhausted) => {
if let Err(e) = kick_evt.next_val().await {
@ -68,10 +69,6 @@ async fn run_rx_queue(
break;
}
}
Err(e) => {
error!("Failed to process rx queue: {}", e);
break;
}
}
}
}
@ -202,7 +199,7 @@ impl VhostUserBackend for ConsoleBackend {
.input
.take()
.ok_or_else(|| anyhow!("input source unavailable"))?;
let in_channel = spawn_input_thread(input_unpacked, &in_avail_evt)
let in_buffer = spawn_input_thread(input_unpacked, &in_avail_evt)
.take()
.ok_or_else(|| anyhow!("input channel unavailable"))?;
@ -216,7 +213,7 @@ impl VhostUserBackend for ConsoleBackend {
mem,
call_evt,
kick_evt,
in_channel,
in_buffer,
in_avail_async_evt,
),
registration,