devices: Upstream Windows implementation for Serial

This adds a sync thread that will call `fsync` once a second. This is a
safety measure since Window OS handles flushing automatically, but has been
proven to be somewhat unreliable.

TEST=built and presubmits
BUG=b:233951530

Change-Id: I7f5922da09fd95999bf8a7a40abc3b6ae796eafc
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/3764466
Reviewed-by: Alexandre Courbot <acourbot@chromium.org>
Tested-by: Richard Zhang <rizhang@google.com>
Commit-Queue: Richard Zhang <rizhang@google.com>
This commit is contained in:
Richard 2022-07-12 15:48:48 -07:00 committed by crosvm LUCI
parent 7159299e9a
commit d569281cc6
5 changed files with 406 additions and 33 deletions

View file

@ -85,6 +85,7 @@ vfio_sys = { path = "../vfio_sys" }
[target.'cfg(windows)'.dependencies]
broker_ipc = { path = "../broker_ipc" }
chrono = "*"
tracing = { path = "../tracing" }
tube_transporter = { path = "../tube_transporter" }
winapi = "*"
@ -94,6 +95,9 @@ version = "*"
features = ["async-await", "std"]
default-features = false
[target.'cfg(windows)'.dev-dependencies]
regex = "*"
[dev-dependencies]
bytes = "1.1.0"
tempfile = "3"

View file

@ -5,21 +5,20 @@
pub(crate) mod sys;
use std::collections::VecDeque;
use std::io::{self, Write};
use std::io;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::mpsc::{channel, Receiver, TryRecvError};
use std::sync::Arc;
use std::thread::{self};
use base::{error, Event, Result};
#[cfg(windows)]
use base::named_pipes;
use base::{error, Event, FileSync, RawDescriptor, Result};
use hypervisor::ProtectionType;
use base::{named_pipes, FileSync};
use crate::bus::BusAccessInfo;
use crate::pci::CrosvmDeviceId;
use crate::serial_device::SerialInput;
use crate::{BusDevice, DeviceId, SerialDevice};
use crate::{BusDevice, DeviceId};
const LOOP_SIZE: usize = 0x40;
@ -87,17 +86,16 @@ pub struct Serial {
in_channel: Option<Receiver<u8>>,
input: Option<Box<dyn SerialInput>>,
out: Option<Box<dyn io::Write + Send>>,
#[cfg(windows)]
pub system_params: sys::windows::SystemSerialParams,
}
impl SerialDevice for Serial {
fn new(
_protected_vm: ProtectionType,
impl Serial {
fn new_common(
interrupt_evt: Event,
input: Option<Box<dyn SerialInput>>,
out: Option<Box<dyn io::Write + Send>>,
_sync: Option<Box<dyn FileSync + Send>>,
_out_timestamp: bool,
_keep_rds: Vec<RawDescriptor>,
#[cfg(windows)] system_params: sys::windows::SystemSerialParams,
) -> Serial {
Serial {
interrupt_enable: Default::default(),
@ -113,18 +111,9 @@ impl SerialDevice for Serial {
in_channel: None,
input,
out,
}
}
#[cfg(windows)]
fn new_with_pipe(
_protected_vm: ProtectionType,
interrupt_evt: Event,
pipe_in: named_pipes::PipeConnection,
pipe_out: named_pipes::PipeConnection,
keep_rds: Vec<RawDescriptor>,
) -> Serial {
unimplemented!()
system_params,
}
}
}
@ -317,10 +306,7 @@ impl Serial {
self.trigger_recv_interrupt()?;
}
} else {
if let Some(out) = self.out.as_mut() {
out.write_all(&[v])?;
out.flush()?;
}
self.system_handle_write(v)?;
self.trigger_thr_empty()?;
}
}
@ -350,6 +336,9 @@ impl BusDevice for Serial {
return;
}
#[cfg(windows)]
self.handle_sync_thread();
if let Err(e) = self.handle_write(info.offset as u8, data[0]) {
error!("serial failed write: {}", e);
}
@ -414,15 +403,18 @@ mod tests {
use std::io;
use std::sync::Arc;
use hypervisor::ProtectionType;
use sync::Mutex;
pub use crate::sys::serial_device::SerialDevice;
#[derive(Clone)]
struct SharedBuffer {
buf: Arc<Mutex<Vec<u8>>>,
pub(super) struct SharedBuffer {
pub(super) buf: Arc<Mutex<Vec<u8>>>,
}
impl SharedBuffer {
fn new() -> SharedBuffer {
pub(super) fn new() -> SharedBuffer {
SharedBuffer {
buf: Arc::new(Mutex::new(Vec::new())),
}
@ -438,7 +430,7 @@ mod tests {
}
}
fn serial_bus_address(offset: u8) -> BusAccessInfo {
pub(super) fn serial_bus_address(offset: u8) -> BusAccessInfo {
// Serial devices only use the offset of the BusAccessInfo
BusAccessInfo {
offset: offset as u64,

View file

@ -4,10 +4,10 @@
cfg_if::cfg_if! {
if #[cfg(unix)] {
mod unix;
pub(in crate::serial) mod unix;
use unix as platform;
} else if #[cfg(windows)] {
mod windows;
pub(in crate::serial) mod windows;
use windows as platform;
}
}

View file

@ -2,8 +2,42 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::io;
use base::{Event, FileSync, RawDescriptor, Result};
use hypervisor::ProtectionType;
use crate::serial_device::SerialInput;
use crate::sys::serial_device::SerialDevice;
use crate::Serial;
// TODO(b/234469655): Remove type alias once ReadNotifier is implemented for
// PipeConnection.
pub(crate) type InStreamType = Box<dyn SerialInput>;
impl SerialDevice for Serial {
/// Constructs a Serial device ready for input and output.
///
/// The stream `input` should not block, instead returning 0 bytes if are no bytes available.
fn new(
_protected_vm: ProtectionType,
interrupt_evt: Event,
input: Option<Box<dyn SerialInput>>,
out: Option<Box<dyn io::Write + Send>>,
_sync: Option<Box<dyn FileSync + Send>>,
_out_timestamp: bool,
_keep_rds: Vec<RawDescriptor>,
) -> Serial {
Serial::new_common(interrupt_evt, input, out)
}
}
impl Serial {
pub(in crate::serial) fn system_handle_write(&mut self, v: u8) -> Result<()> {
if let Some(out) = self.out.as_mut() {
out.write_all(&[v])?;
out.flush()?;
}
Ok(())
}
}

View file

@ -2,8 +2,351 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use base::named_pipes::PipeConnection;
use std::io;
use std::io::Write;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use base::{
error,
named_pipes::{self, PipeConnection},
Event, EventToken, FileSync, RawDescriptor, Result, WaitContext,
};
use hypervisor::ProtectionType;
use crate::bus::BusDevice;
use crate::serial_device::SerialInput;
use crate::sys::serial_device::SerialDevice;
use crate::Serial;
// TODO(b/234469655): Remove type alias once ReadNotifier is implemented for
// PipeConnection.
pub(crate) type InStreamType = Box<PipeConnection>;
const TIMESTAMP_PREFIX_FMT: &str = "[ %F %T%.9f ]: ";
pub enum LineState {
NeverWritten,
Midline,
Newline,
}
/// Windows specific paramters for the serial device.
pub struct SystemSerialParams {
pub out_timestamp: bool,
pub out_line_state: LineState,
pub in_stream: Option<InStreamType>,
pub sync: Option<Box<dyn FileSync + Send>>,
pub sync_thread: Option<JoinHandle<SyncWorker>>,
pub kill_evt: Option<Event>,
}
impl Serial {
// Spawn the worker thread if it hasn't been spawned yet.
pub(in crate::serial) fn handle_sync_thread(&mut self) {
if self.system_params.sync.is_some() {
let sync = match self.system_params.sync.take() {
Some(sync) => sync,
None => return,
};
let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e)))
{
Ok(v) => v,
Err(e) => {
error!("failed creating kill Event pair: {}", e);
return;
}
};
self.system_params.kill_evt = Some(self_kill_evt);
match thread::Builder::new()
.name(format!("{} sync thread", self.debug_label()))
.spawn(move || {
let mut worker = SyncWorker {
kill_evt,
file: sync,
};
worker.run();
worker
}) {
Err(e) => {
error!("failed to spawn sync thread: {}", e);
}
Ok(sync_thread) => self.system_params.sync_thread = Some(sync_thread),
};
}
}
pub(in crate::serial) fn system_handle_write(&mut self, v: u8) -> Result<()> {
if let Some(out) = self.out.as_mut() {
if self.system_params.out_timestamp {
match self.system_params.out_line_state {
LineState::NeverWritten | LineState::Newline => {
out.write_all(
chrono::Local::now()
.format(TIMESTAMP_PREFIX_FMT)
.to_string()
.as_bytes(),
)
.expect("Failed to write");
self.system_params.out_line_state = LineState::Midline;
}
LineState::Midline if v == b'\n' => {
self.system_params.out_line_state = LineState::Newline;
}
_ => {}
}
}
out.write_all(&[v])?;
out.flush()?;
}
Ok(())
}
}
impl SerialDevice for Serial {
/// Constructs a Serial device ready for input and output.
///
/// The stream `input` should not block, instead returning 0 bytes if are no bytes available.
fn new(
_protected_vm: ProtectionType,
interrupt_evt: Event,
input: Option<Box<dyn SerialInput>>,
out: Option<Box<dyn io::Write + Send>>,
sync: Option<Box<dyn FileSync + Send>>,
out_timestamp: bool,
keep_rds: Vec<RawDescriptor>,
) -> Serial {
let system_params = SystemSerialParams {
out_timestamp,
out_line_state: LineState::NeverWritten,
in_stream: None,
sync,
sync_thread: None,
kill_evt: None,
};
Serial::new_common(interrupt_evt, input, out, system_params)
}
/// Constructs a Serial device connected to a named pipe for I/O
///
/// pipe_in and pipe_out should both refer to the same end of the same pipe, but may have
/// different underlying descriptors.
fn new_with_pipe(
_protected_vm: ProtectionType,
interrupt_evt: Event,
pipe_in: named_pipes::PipeConnection,
pipe_out: named_pipes::PipeConnection,
keep_rds: Vec<RawDescriptor>,
) -> Serial {
let system_params = SystemSerialParams {
out_timestamp: false,
out_line_state: LineState::NeverWritten,
in_stream: Some(Box::new(pipe_in)),
sync: None,
sync_thread: None,
kill_evt: None,
};
Serial::new_common(interrupt_evt, None, Some(Box::new(pipe_out)), system_params)
}
}
impl Drop for Serial {
fn drop(&mut self) {
if let Some(kill_evt) = self.system_params.kill_evt.take() {
// Ignore the result because there is nothing we can do about it.
let _ = kill_evt.write(1);
}
if let Some(sync_thread) = self.system_params.sync_thread.take() {
let _ = sync_thread.join();
}
}
}
/// Worker to help with flusing contents of `file` to disk.
pub struct SyncWorker {
kill_evt: Event,
file: Box<dyn FileSync + Send>,
}
impl SyncWorker {
pub(in crate::serial) fn run(&mut self) {
let mut timer = match base::Timer::new() {
Err(e) => {
error!("failed to create timer for SyncWorker: {}", e);
return;
}
Ok(timer) => timer,
};
if let Err(e) = timer.reset(Duration::from_secs(1), Some(Duration::from_secs(1))) {
error!("failed to set timer for SyncWorker: {}", e);
return;
}
#[derive(EventToken)]
enum Token {
Sync,
Kill,
}
let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
(&timer, Token::Sync),
(&self.kill_evt, Token::Kill),
]) {
Ok(ec) => ec,
Err(e) => {
error!("failed creating WaitContext: {}", e);
return;
}
};
loop {
let events = match wait_ctx.wait() {
Ok(v) => v,
Err(e) => {
error!("failed polling for events: {}", e);
return;
}
};
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::Sync => {
if let Err(e) = self.file.fsync() {
error!("failed to fsync serial device, stopping sync thread: {}", e);
return;
}
}
Token::Kill => {
if let Err(e) = self.file.fsync() {
error!("failed to fsync serial device, stopping sync thread: {}", e);
return;
}
return;
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use regex::Regex;
use std::sync::Arc;
use sync::Mutex;
use crate::serial::tests::*;
use crate::serial::*;
#[cfg(windows)]
fn assert_timestamp_is_present(data: &[u8], serial_message: &str) {
let data_str = String::from_utf8(data.to_vec()).unwrap();
let re = Regex::new(&format!(r"\[.+\]: {}", serial_message)).unwrap();
assert!(re.is_match(&data_str));
}
#[cfg(windows)]
#[test]
fn serial_output_timestamp() {
let intr_evt = Event::new().unwrap();
let serial_out = SharedBuffer::new();
let mut serial = Serial::new(
ProtectionType::Unprotected,
intr_evt,
None,
Some(Box::new(serial_out.clone())),
None,
true,
Vec::new(),
);
serial.write(serial_bus_address(DATA), &[b'a']);
serial.write(serial_bus_address(DATA), &[b'\n']);
assert_timestamp_is_present(serial_out.buf.lock().as_slice(), "a");
serial_out.buf.lock().clear();
serial.write(serial_bus_address(DATA), &[b'b']);
serial.write(serial_bus_address(DATA), &[b'\n']);
assert_timestamp_is_present(serial_out.buf.lock().as_slice(), "b");
serial_out.buf.lock().clear();
serial.write(serial_bus_address(DATA), &[b'c']);
serial.write(serial_bus_address(DATA), &[b'\n']);
assert_timestamp_is_present(serial_out.buf.lock().as_slice(), "c");
serial_out.buf.lock().clear();
}
#[cfg(windows)]
#[test]
fn named_pipe() {
use base::named_pipes;
use base::named_pipes::{BlockingMode, FramingMode};
use rand::Rng;
let path_str = format!(r"\\.\pipe\kiwi_test_{}", rand::thread_rng().gen::<u64>());
let pipe_in = named_pipes::create_server_pipe(
&path_str,
&FramingMode::Byte,
&BlockingMode::NoWait,
0, // default timeout
named_pipes::DEFAULT_BUFFER_SIZE,
false,
)
.unwrap();
let pipe_out = pipe_in.try_clone().unwrap();
let event = Event::new().unwrap();
let mut device = Serial::new_with_pipe(
ProtectionType::Unprotected,
event,
pipe_in,
pipe_out,
Vec::new(),
);
let client_pipe = named_pipes::create_client_pipe(
&path_str,
&FramingMode::Byte,
&BlockingMode::Wait,
false,
)
.unwrap();
unsafe {
// Check that serial output is sent to the pipe
device.write(serial_bus_address(DATA), &[b'T']);
device.write(serial_bus_address(DATA), &[b'D']);
let mut read_buf: [u8; 2] = [0; 2];
assert_eq!(client_pipe.read(&mut read_buf).unwrap(), 2);
assert_eq!(read_buf, [b'T', b'D']);
// Check that pipe_in is the other end of client_pipe. It's not actually wired up to
// SerialInput in this file so we can't test the data flow
client_pipe
.write(&[1, 2])
.expect("Failed to write to client pipe");
assert_eq!(
device
.system_params
.in_stream
.as_mut()
.unwrap()
.read(&mut read_buf)
.unwrap(),
2
);
assert_eq!(read_buf, [1, 2]);
}
}
}