diff --git a/devices/src/virtio/console.rs b/devices/src/virtio/console.rs index 9493697287..a3c4512850 100644 --- a/devices/src/virtio/console.rs +++ b/devices/src/virtio/console.rs @@ -2,14 +2,17 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +use std::collections::VecDeque; use std::io::{self, Read, Write}; +use std::ops::DerefMut; use std::result; -use std::sync::mpsc::{channel, Receiver, TryRecvError}; +use std::sync::Arc; use std::thread; use base::{error, Event, PollToken, RawDescriptor, WaitContext}; use data_model::{DataInit, Le16, Le32}; use remain::sorted; +use sync::Mutex; use thiserror::Error as ThisError; use vm_memory::GuestMemory; @@ -31,9 +34,6 @@ pub enum ConsoleError { /// There are no more available descriptors to receive into #[error("no rx descriptors available")] RxDescriptorsExhausted, - /// Input channel has been disconnected - #[error("input channel disconnected")] - RxDisconnected, } #[derive(Copy, Clone, Debug, Default)] @@ -48,17 +48,18 @@ pub struct virtio_console_config { // Safe because it only has data and has no implicit padding. unsafe impl DataInit for virtio_console_config {} -/// Checks for input from `in_channel_opt` and transfers it to the receive queue, if any. +/// Checks for input from `buffer` and transfers it to the receive queue, if any. /// /// # Arguments +/// /// * `mem` - The GuestMemory to write the data into /// * `interrupt` - SignalableInterrupt used to signal that the queue has been used -/// * `in_channel_opt` - Optional input channel to read data from +/// * `buffer` - Ring buffer providing data to put into the guest /// * `receive_queue` - The receive virtio Queue pub fn handle_input( mem: &GuestMemory, interrupt: &I, - in_channel: &Receiver>, + buffer: &mut VecDeque, receive_queue: &mut Queue, ) -> result::Result<(), ConsoleError> { let mut exhausted_queue = false; @@ -81,18 +82,15 @@ pub fn handle_input( } }; - let mut disconnected = false; - while writer.available_bytes() > 0 { - match in_channel.try_recv() { - Ok(data) => { - writer.write_all(&data).unwrap(); - } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => { - disconnected = true; - break; - } - } + while writer.available_bytes() > 0 && !buffer.is_empty() { + let (buffer_front, buffer_back) = buffer.as_slices(); + let buffer_chunk = if !buffer_front.is_empty() { + buffer_front + } else { + buffer_back + }; + let written = writer.write(buffer_chunk).unwrap(); + drop(buffer.drain(..written)); } let bytes_written = writer.bytes_written() as u32; @@ -103,10 +101,6 @@ pub fn handle_input( receive_queue.trigger_interrupt(mem, interrupt); } - if disconnected { - return Err(ConsoleError::RxDisconnected); - } - if bytes_written == 0 { break; } @@ -176,16 +170,22 @@ fn write_output(output: &mut dyn io::Write, data: &[u8]) -> io::Result<()> { output.flush() } -/// Starts a thread that reads rx_input and sends the input back via the returned channel. +/// Starts a thread that reads rx and sends the input back via the returned buffer. +/// +/// The caller should listen on `in_avail_event` for events. When `in_avail_event` signals that data +/// is available, the caller should lock the returned `Mutex` and read data out of the inner +/// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed. /// /// # Arguments -/// * `rx_input` - Data source that the reader thread will wait on to send data back to the channel -/// * `in_avail_evt` - Event triggered by the thread when new input is available on the channel +/// +/// * `rx` - Data source that the reader thread will wait on to send data back to the buffer +/// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer pub fn spawn_input_thread( mut rx: Box, in_avail_evt: &Event, -) -> Option>> { - let (send_channel, recv_channel) = channel(); +) -> Option>>> { + let buffer = Arc::new(Mutex::new(VecDeque::::new())); + let buffer_cloned = buffer.clone(); let thread_in_avail_evt = match in_avail_evt.try_clone() { Ok(evt) => evt, @@ -195,21 +195,16 @@ pub fn spawn_input_thread( } }; - // The input thread runs in detached mode and will exit when channel is disconnected because - // the console device has been dropped. + // The input thread runs in detached mode. let res = thread::Builder::new() .name("console_input".to_string()) .spawn(move || { + let mut rx_buf = [0u8; 1 << 12]; loop { - let mut rx_buf = vec![0u8; 1 << 12]; match rx.read(&mut rx_buf) { Ok(0) => break, // Assume the stream of input has ended. Ok(size) => { - rx_buf.truncate(size); - if send_channel.send(rx_buf).is_err() { - // The receiver has disconnected. - break; - } + buffer.lock().extend(&rx_buf[0..size]); thread_in_avail_evt.write(1).unwrap(); } Err(e) => { @@ -229,7 +224,7 @@ pub fn spawn_input_thread( error!("failed to spawn input thread: {}", e); return None; } - Some(recv_channel) + Some(buffer_cloned) } /// Writes the available data from the reader into the given output queue. @@ -276,7 +271,7 @@ impl Worker { // generic way to add an io::Read instance to a poll context (it may not be backed by a file // descriptor). Moving the blocking read call to a separate thread and sending data back to // the main worker thread with an event for notification bridges this gap. - let mut in_channel = match self.input.take() { + let in_buffer = match self.input.take() { Some(input) => spawn_input_thread(input, &in_avail_evt), None => None, }; @@ -336,14 +331,15 @@ impl Worker { error!("failed reading receive queue Event: {}", e); break 'wait; } - if let Some(ch) = in_channel.as_ref() { - match handle_input(&self.mem, &self.interrupt, ch, &mut receive_queue) { + if let Some(in_buf_ref) = in_buffer.as_ref() { + match handle_input( + &self.mem, + &self.interrupt, + in_buf_ref.lock().deref_mut(), + &mut receive_queue, + ) { Ok(()) => {} - Err(ConsoleError::RxDisconnected) => { - // Set in_channel to None so that future handle_input calls exit early. - in_channel.take(); - } - // Other console errors are no-ops, so just continue. + // Console errors are no-ops, so just continue. Err(_) => { continue; } @@ -355,14 +351,15 @@ impl Worker { error!("failed reading in_avail_evt: {}", e); break 'wait; } - if let Some(ch) = in_channel.as_ref() { - match handle_input(&self.mem, &self.interrupt, ch, &mut receive_queue) { + if let Some(in_buf_ref) = in_buffer.as_ref() { + match handle_input( + &self.mem, + &self.interrupt, + in_buf_ref.lock().deref_mut(), + &mut receive_queue, + ) { Ok(()) => {} - Err(ConsoleError::RxDisconnected) => { - // Set in_channel to None so that future handle_input calls exit early. - in_channel.take(); - } - // Other console errors are no-ops, so just continue. + // Console errors are no-ops, so just continue. Err(_) => { continue; } diff --git a/devices/src/virtio/vhost/user/device/console.rs b/devices/src/virtio/vhost/user/device/console.rs index 75c0339d51..88d11fc7a2 100644 --- a/devices/src/virtio/vhost/user/device/console.rs +++ b/devices/src/virtio/vhost/user/device/console.rs @@ -2,9 +2,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +use std::collections::VecDeque; use std::io::{self, stdin}; +use std::ops::DerefMut; use std::path::PathBuf; -use std::sync::mpsc::Receiver; use std::sync::Arc; use anyhow::{anyhow, bail, Context}; @@ -52,7 +53,7 @@ async fn run_rx_queue( mem: GuestMemory, call_evt: Arc>, kick_evt: EventAsync, - in_channel: Receiver>, + in_buffer: Arc>>, in_avail_evt: EventAsync, ) { loop { @@ -60,7 +61,7 @@ async fn run_rx_queue( error!("Failed reading in_avail_evt: {}", e); break; } - match handle_input(&mem, &call_evt, &in_channel, &mut queue) { + match handle_input(&mem, &call_evt, in_buffer.lock().deref_mut(), &mut queue) { Ok(()) => {} Err(ConsoleError::RxDescriptorsExhausted) => { if let Err(e) = kick_evt.next_val().await { @@ -68,10 +69,6 @@ async fn run_rx_queue( break; } } - Err(e) => { - error!("Failed to process rx queue: {}", e); - break; - } } } } @@ -202,7 +199,7 @@ impl VhostUserBackend for ConsoleBackend { .input .take() .ok_or_else(|| anyhow!("input source unavailable"))?; - let in_channel = spawn_input_thread(input_unpacked, &in_avail_evt) + let in_buffer = spawn_input_thread(input_unpacked, &in_avail_evt) .take() .ok_or_else(|| anyhow!("input channel unavailable"))?; @@ -216,7 +213,7 @@ impl VhostUserBackend for ConsoleBackend { mem, call_evt, kick_evt, - in_channel, + in_buffer, in_avail_async_evt, ), registration,