From d569281cc651d233674bfa1774f98c7e7fb9f36e Mon Sep 17 00:00:00 2001 From: Richard Date: Tue, 12 Jul 2022 15:48:48 -0700 Subject: [PATCH] devices: Upstream Windows implementation for Serial This adds a sync thread that will call `fsync` once a second. This is a safety measure since Window OS handles flushing automatically, but has been proven to be somewhat unreliable. TEST=built and presubmits BUG=b:233951530 Change-Id: I7f5922da09fd95999bf8a7a40abc3b6ae796eafc Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/3764466 Reviewed-by: Alexandre Courbot Tested-by: Richard Zhang Commit-Queue: Richard Zhang --- devices/Cargo.toml | 4 + devices/src/serial.rs | 52 ++--- devices/src/serial/sys.rs | 4 +- devices/src/serial/sys/unix.rs | 34 +++ devices/src/serial/sys/windows.rs | 345 +++++++++++++++++++++++++++++- 5 files changed, 406 insertions(+), 33 deletions(-) diff --git a/devices/Cargo.toml b/devices/Cargo.toml index 6c26c709a6..aedd20717b 100644 --- a/devices/Cargo.toml +++ b/devices/Cargo.toml @@ -85,6 +85,7 @@ vfio_sys = { path = "../vfio_sys" } [target.'cfg(windows)'.dependencies] broker_ipc = { path = "../broker_ipc" } +chrono = "*" tracing = { path = "../tracing" } tube_transporter = { path = "../tube_transporter" } winapi = "*" @@ -94,6 +95,9 @@ version = "*" features = ["async-await", "std"] default-features = false +[target.'cfg(windows)'.dev-dependencies] +regex = "*" + [dev-dependencies] bytes = "1.1.0" tempfile = "3" diff --git a/devices/src/serial.rs b/devices/src/serial.rs index 5e4a026c0e..237830195f 100644 --- a/devices/src/serial.rs +++ b/devices/src/serial.rs @@ -5,21 +5,20 @@ pub(crate) mod sys; use std::collections::VecDeque; -use std::io::{self, Write}; +use std::io; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::mpsc::{channel, Receiver, TryRecvError}; use std::sync::Arc; use std::thread::{self}; +use base::{error, Event, Result}; #[cfg(windows)] -use base::named_pipes; -use base::{error, Event, FileSync, RawDescriptor, Result}; -use hypervisor::ProtectionType; +use base::{named_pipes, FileSync}; use crate::bus::BusAccessInfo; use crate::pci::CrosvmDeviceId; use crate::serial_device::SerialInput; -use crate::{BusDevice, DeviceId, SerialDevice}; +use crate::{BusDevice, DeviceId}; const LOOP_SIZE: usize = 0x40; @@ -87,17 +86,16 @@ pub struct Serial { in_channel: Option>, input: Option>, out: Option>, + #[cfg(windows)] + pub system_params: sys::windows::SystemSerialParams, } -impl SerialDevice for Serial { - fn new( - _protected_vm: ProtectionType, +impl Serial { + fn new_common( interrupt_evt: Event, input: Option>, out: Option>, - _sync: Option>, - _out_timestamp: bool, - _keep_rds: Vec, + #[cfg(windows)] system_params: sys::windows::SystemSerialParams, ) -> Serial { Serial { interrupt_enable: Default::default(), @@ -113,19 +111,10 @@ impl SerialDevice for Serial { in_channel: None, input, out, + #[cfg(windows)] + system_params, } } - - #[cfg(windows)] - fn new_with_pipe( - _protected_vm: ProtectionType, - interrupt_evt: Event, - pipe_in: named_pipes::PipeConnection, - pipe_out: named_pipes::PipeConnection, - keep_rds: Vec, - ) -> Serial { - unimplemented!() - } } impl Serial { @@ -317,10 +306,7 @@ impl Serial { self.trigger_recv_interrupt()?; } } else { - if let Some(out) = self.out.as_mut() { - out.write_all(&[v])?; - out.flush()?; - } + self.system_handle_write(v)?; self.trigger_thr_empty()?; } } @@ -350,6 +336,9 @@ impl BusDevice for Serial { return; } + #[cfg(windows)] + self.handle_sync_thread(); + if let Err(e) = self.handle_write(info.offset as u8, data[0]) { error!("serial failed write: {}", e); } @@ -414,15 +403,18 @@ mod tests { use std::io; use std::sync::Arc; + use hypervisor::ProtectionType; use sync::Mutex; + pub use crate::sys::serial_device::SerialDevice; + #[derive(Clone)] - struct SharedBuffer { - buf: Arc>>, + pub(super) struct SharedBuffer { + pub(super) buf: Arc>>, } impl SharedBuffer { - fn new() -> SharedBuffer { + pub(super) fn new() -> SharedBuffer { SharedBuffer { buf: Arc::new(Mutex::new(Vec::new())), } @@ -438,7 +430,7 @@ mod tests { } } - fn serial_bus_address(offset: u8) -> BusAccessInfo { + pub(super) fn serial_bus_address(offset: u8) -> BusAccessInfo { // Serial devices only use the offset of the BusAccessInfo BusAccessInfo { offset: offset as u64, diff --git a/devices/src/serial/sys.rs b/devices/src/serial/sys.rs index 64b5d47b09..327390004d 100644 --- a/devices/src/serial/sys.rs +++ b/devices/src/serial/sys.rs @@ -4,10 +4,10 @@ cfg_if::cfg_if! { if #[cfg(unix)] { - mod unix; + pub(in crate::serial) mod unix; use unix as platform; } else if #[cfg(windows)] { - mod windows; + pub(in crate::serial) mod windows; use windows as platform; } } diff --git a/devices/src/serial/sys/unix.rs b/devices/src/serial/sys/unix.rs index 34a14423c0..2e38244400 100644 --- a/devices/src/serial/sys/unix.rs +++ b/devices/src/serial/sys/unix.rs @@ -2,8 +2,42 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +use std::io; + +use base::{Event, FileSync, RawDescriptor, Result}; +use hypervisor::ProtectionType; + use crate::serial_device::SerialInput; +use crate::sys::serial_device::SerialDevice; +use crate::Serial; // TODO(b/234469655): Remove type alias once ReadNotifier is implemented for // PipeConnection. pub(crate) type InStreamType = Box; + +impl SerialDevice for Serial { + /// Constructs a Serial device ready for input and output. + /// + /// The stream `input` should not block, instead returning 0 bytes if are no bytes available. + fn new( + _protected_vm: ProtectionType, + interrupt_evt: Event, + input: Option>, + out: Option>, + _sync: Option>, + _out_timestamp: bool, + _keep_rds: Vec, + ) -> Serial { + Serial::new_common(interrupt_evt, input, out) + } +} + +impl Serial { + pub(in crate::serial) fn system_handle_write(&mut self, v: u8) -> Result<()> { + if let Some(out) = self.out.as_mut() { + out.write_all(&[v])?; + out.flush()?; + } + Ok(()) + } +} diff --git a/devices/src/serial/sys/windows.rs b/devices/src/serial/sys/windows.rs index f9edddf146..d97b25d80f 100644 --- a/devices/src/serial/sys/windows.rs +++ b/devices/src/serial/sys/windows.rs @@ -2,8 +2,351 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use base::named_pipes::PipeConnection; +use std::io; +use std::io::Write; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +use base::{ + error, + named_pipes::{self, PipeConnection}, + Event, EventToken, FileSync, RawDescriptor, Result, WaitContext, +}; +use hypervisor::ProtectionType; + +use crate::bus::BusDevice; +use crate::serial_device::SerialInput; +use crate::sys::serial_device::SerialDevice; +use crate::Serial; // TODO(b/234469655): Remove type alias once ReadNotifier is implemented for // PipeConnection. pub(crate) type InStreamType = Box; + +const TIMESTAMP_PREFIX_FMT: &str = "[ %F %T%.9f ]: "; + +pub enum LineState { + NeverWritten, + Midline, + Newline, +} + +/// Windows specific paramters for the serial device. +pub struct SystemSerialParams { + pub out_timestamp: bool, + pub out_line_state: LineState, + pub in_stream: Option, + pub sync: Option>, + pub sync_thread: Option>, + pub kill_evt: Option, +} + +impl Serial { + // Spawn the worker thread if it hasn't been spawned yet. + pub(in crate::serial) fn handle_sync_thread(&mut self) { + if self.system_params.sync.is_some() { + let sync = match self.system_params.sync.take() { + Some(sync) => sync, + None => return, + }; + + let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) + { + Ok(v) => v, + Err(e) => { + error!("failed creating kill Event pair: {}", e); + return; + } + }; + self.system_params.kill_evt = Some(self_kill_evt); + + match thread::Builder::new() + .name(format!("{} sync thread", self.debug_label())) + .spawn(move || { + let mut worker = SyncWorker { + kill_evt, + file: sync, + }; + worker.run(); + worker + }) { + Err(e) => { + error!("failed to spawn sync thread: {}", e); + } + Ok(sync_thread) => self.system_params.sync_thread = Some(sync_thread), + }; + } + } + + pub(in crate::serial) fn system_handle_write(&mut self, v: u8) -> Result<()> { + if let Some(out) = self.out.as_mut() { + if self.system_params.out_timestamp { + match self.system_params.out_line_state { + LineState::NeverWritten | LineState::Newline => { + out.write_all( + chrono::Local::now() + .format(TIMESTAMP_PREFIX_FMT) + .to_string() + .as_bytes(), + ) + .expect("Failed to write"); + self.system_params.out_line_state = LineState::Midline; + } + LineState::Midline if v == b'\n' => { + self.system_params.out_line_state = LineState::Newline; + } + _ => {} + } + } + + out.write_all(&[v])?; + out.flush()?; + } + Ok(()) + } +} + +impl SerialDevice for Serial { + /// Constructs a Serial device ready for input and output. + /// + /// The stream `input` should not block, instead returning 0 bytes if are no bytes available. + fn new( + _protected_vm: ProtectionType, + interrupt_evt: Event, + input: Option>, + out: Option>, + sync: Option>, + out_timestamp: bool, + keep_rds: Vec, + ) -> Serial { + let system_params = SystemSerialParams { + out_timestamp, + out_line_state: LineState::NeverWritten, + in_stream: None, + sync, + sync_thread: None, + kill_evt: None, + }; + Serial::new_common(interrupt_evt, input, out, system_params) + } + + /// Constructs a Serial device connected to a named pipe for I/O + /// + /// pipe_in and pipe_out should both refer to the same end of the same pipe, but may have + /// different underlying descriptors. + fn new_with_pipe( + _protected_vm: ProtectionType, + interrupt_evt: Event, + pipe_in: named_pipes::PipeConnection, + pipe_out: named_pipes::PipeConnection, + keep_rds: Vec, + ) -> Serial { + let system_params = SystemSerialParams { + out_timestamp: false, + out_line_state: LineState::NeverWritten, + in_stream: Some(Box::new(pipe_in)), + sync: None, + sync_thread: None, + kill_evt: None, + }; + Serial::new_common(interrupt_evt, None, Some(Box::new(pipe_out)), system_params) + } +} + +impl Drop for Serial { + fn drop(&mut self) { + if let Some(kill_evt) = self.system_params.kill_evt.take() { + // Ignore the result because there is nothing we can do about it. + let _ = kill_evt.write(1); + } + + if let Some(sync_thread) = self.system_params.sync_thread.take() { + let _ = sync_thread.join(); + } + } +} + +/// Worker to help with flusing contents of `file` to disk. +pub struct SyncWorker { + kill_evt: Event, + file: Box, +} + +impl SyncWorker { + pub(in crate::serial) fn run(&mut self) { + let mut timer = match base::Timer::new() { + Err(e) => { + error!("failed to create timer for SyncWorker: {}", e); + return; + } + Ok(timer) => timer, + }; + + if let Err(e) = timer.reset(Duration::from_secs(1), Some(Duration::from_secs(1))) { + error!("failed to set timer for SyncWorker: {}", e); + return; + } + + #[derive(EventToken)] + enum Token { + Sync, + Kill, + } + + let wait_ctx: WaitContext = match WaitContext::build_with(&[ + (&timer, Token::Sync), + (&self.kill_evt, Token::Kill), + ]) { + Ok(ec) => ec, + Err(e) => { + error!("failed creating WaitContext: {}", e); + return; + } + }; + loop { + let events = match wait_ctx.wait() { + Ok(v) => v, + Err(e) => { + error!("failed polling for events: {}", e); + return; + } + }; + + for event in events.iter().filter(|e| e.is_readable) { + match event.token { + Token::Sync => { + if let Err(e) = self.file.fsync() { + error!("failed to fsync serial device, stopping sync thread: {}", e); + return; + } + } + Token::Kill => { + if let Err(e) = self.file.fsync() { + error!("failed to fsync serial device, stopping sync thread: {}", e); + return; + } + return; + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use regex::Regex; + use std::sync::Arc; + use sync::Mutex; + + use crate::serial::tests::*; + use crate::serial::*; + + #[cfg(windows)] + fn assert_timestamp_is_present(data: &[u8], serial_message: &str) { + let data_str = String::from_utf8(data.to_vec()).unwrap(); + let re = Regex::new(&format!(r"\[.+\]: {}", serial_message)).unwrap(); + assert!(re.is_match(&data_str)); + } + + #[cfg(windows)] + #[test] + fn serial_output_timestamp() { + let intr_evt = Event::new().unwrap(); + let serial_out = SharedBuffer::new(); + + let mut serial = Serial::new( + ProtectionType::Unprotected, + intr_evt, + None, + Some(Box::new(serial_out.clone())), + None, + true, + Vec::new(), + ); + + serial.write(serial_bus_address(DATA), &[b'a']); + serial.write(serial_bus_address(DATA), &[b'\n']); + assert_timestamp_is_present(serial_out.buf.lock().as_slice(), "a"); + serial_out.buf.lock().clear(); + + serial.write(serial_bus_address(DATA), &[b'b']); + serial.write(serial_bus_address(DATA), &[b'\n']); + assert_timestamp_is_present(serial_out.buf.lock().as_slice(), "b"); + serial_out.buf.lock().clear(); + + serial.write(serial_bus_address(DATA), &[b'c']); + serial.write(serial_bus_address(DATA), &[b'\n']); + assert_timestamp_is_present(serial_out.buf.lock().as_slice(), "c"); + serial_out.buf.lock().clear(); + } + + #[cfg(windows)] + #[test] + fn named_pipe() { + use base::named_pipes; + use base::named_pipes::{BlockingMode, FramingMode}; + use rand::Rng; + + let path_str = format!(r"\\.\pipe\kiwi_test_{}", rand::thread_rng().gen::()); + + let pipe_in = named_pipes::create_server_pipe( + &path_str, + &FramingMode::Byte, + &BlockingMode::NoWait, + 0, // default timeout + named_pipes::DEFAULT_BUFFER_SIZE, + false, + ) + .unwrap(); + + let pipe_out = pipe_in.try_clone().unwrap(); + let event = Event::new().unwrap(); + + let mut device = Serial::new_with_pipe( + ProtectionType::Unprotected, + event, + pipe_in, + pipe_out, + Vec::new(), + ); + + let client_pipe = named_pipes::create_client_pipe( + &path_str, + &FramingMode::Byte, + &BlockingMode::Wait, + false, + ) + .unwrap(); + + unsafe { + // Check that serial output is sent to the pipe + device.write(serial_bus_address(DATA), &[b'T']); + device.write(serial_bus_address(DATA), &[b'D']); + + let mut read_buf: [u8; 2] = [0; 2]; + + assert_eq!(client_pipe.read(&mut read_buf).unwrap(), 2); + assert_eq!(read_buf, [b'T', b'D']); + + // Check that pipe_in is the other end of client_pipe. It's not actually wired up to + // SerialInput in this file so we can't test the data flow + client_pipe + .write(&[1, 2]) + .expect("Failed to write to client pipe"); + assert_eq!( + device + .system_params + .in_stream + .as_mut() + .unwrap() + .read(&mut read_buf) + .unwrap(), + 2 + ); + assert_eq!(read_buf, [1, 2]); + } + } +}