diff --git a/Cargo.toml b/Cargo.toml index b656542c6e..849187ef24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,6 +169,7 @@ base = "*" bit_field = { path = "bit_field" } broker_ipc = { path = "broker_ipc" } cfg-if = "1.0.0" +cros_async = { path = "cros_async" } crosvm_plugin = { path = "crosvm_plugin", optional = true } data_model = "*" devices = { path = "devices" } diff --git a/devices/src/virtio/block/asynchronous.rs b/devices/src/virtio/block/asynchronous.rs index 6d47371460..71c0b4e4fc 100644 --- a/devices/src/virtio/block/asynchronous.rs +++ b/devices/src/virtio/block/asynchronous.rs @@ -66,7 +66,7 @@ const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE; NUM_QUEUES as usize]; #[sorted] #[derive(ThisError, Debug)] -enum ExecuteError { +pub enum ExecuteError { #[error("failed to copy ID string: {0}")] CopyId(io::Error), #[error("virtio descriptor error: {0}")] @@ -293,9 +293,24 @@ pub async fn handle_queue( } } +/// handles the disk control requests from the vhost user backend control server. +pub async fn handle_vhost_user_command_tube( + command_tube: AsyncTube, + disk_state: Rc>, +) -> Result<(), ExecuteError> { + // Process the commands. Sets |interrupt| to None since vhost user backend + // currently does not support sending interrupts to the guest kernel. + // TODO(b/191845881): Use backend to frontend vhost user message + // CONFIG_CHANGE_MSG to notify the guest kernel once sending such message + // is supported. + handle_command_tube(&Some(command_tube), None, Rc::clone(&disk_state)).await +} + +// TODO(b/191845881): Update argument |interrupt| from Option to non-Option value +// once we enable sending vhost-user message from the backend to the frontend. async fn handle_command_tube( command_tube: &Option, - interrupt: Rc>, + interrupt: Option>>, disk_state: Rc>, ) -> Result<(), ExecuteError> { let command_tube = match command_tube { @@ -320,7 +335,9 @@ async fn handle_command_tube( .await .map_err(ExecuteError::SendingResponse)?; if let DiskControlResult::Ok = resp { - interrupt.borrow().signal_config_changed(); + if let Some(interrupt) = &interrupt { + interrupt.borrow().signal_config_changed(); + } } } Err(e) => return Err(ExecuteError::ReceivingCommand(e)), @@ -416,7 +433,11 @@ fn run_worker( pin_mut!(resample); // Handles control requests. - let control = handle_command_tube(control_tube, Rc::clone(&interrupt), disk_state.clone()); + let control = handle_command_tube( + control_tube, + Some(Rc::clone(&interrupt)), + disk_state.clone(), + ); pin_mut!(control); // Handle all the queues in one sub-select call. @@ -489,9 +510,9 @@ pub struct BlockAsync { pub(crate) seg_max: u32, pub(crate) block_size: u32, pub(crate) id: Option, + pub(crate) control_tube: Option, kill_evt: Option, worker_thread: Option, Option)>>, - control_tube: Option, } impl BlockAsync { diff --git a/devices/src/virtio/vhost/user/device/block.rs b/devices/src/virtio/vhost/user/device/block.rs index 28720c0fb3..9039302e62 100644 --- a/devices/src/virtio/vhost/user/device/block.rs +++ b/devices/src/virtio/vhost/user/device/block.rs @@ -17,6 +17,7 @@ use base::warn; use base::Event; use base::Timer; use cros_async::sync::Mutex as AsyncMutex; +use cros_async::AsyncTube; use cros_async::EventAsync; use cros_async::Executor; use cros_async::TimerAsync; @@ -32,6 +33,7 @@ use vmm_vhost::message::*; use crate::virtio; use crate::virtio::block::asynchronous::flush_disk; use crate::virtio::block::asynchronous::handle_queue; +use crate::virtio::block::asynchronous::handle_vhost_user_command_tube; use crate::virtio::block::asynchronous::BlockAsync; use crate::virtio::block::build_config_space; use crate::virtio::block::DiskState; @@ -108,6 +110,15 @@ impl VhostUserDevice for BlockAsync { )) .detach(); + if let Some(control_tube) = self.control_tube.take() { + let async_tube = AsyncTube::new(ex, control_tube)?; + ex.spawn_local(handle_vhost_user_command_tube( + async_tube, + Rc::clone(&disk_state), + )) + .detach(); + } + Ok(Box::new(BlockBackend { ex: ex.clone(), disk_state, diff --git a/src/crosvm/sys/unix.rs b/src/crosvm/sys/unix.rs index 328e11db19..9d7fd304df 100644 --- a/src/crosvm/sys/unix.rs +++ b/src/crosvm/sys/unix.rs @@ -55,6 +55,7 @@ use base::UnixSeqpacket; use base::UnixSeqpacketListener; use base::UnlinkUnixSeqpacketListener; use base::*; +use cros_async::Executor; use device_helpers::*; use devices::serial_device::SerialHardware; use devices::vfio::VfioCommonSetup; @@ -2883,6 +2884,51 @@ fn jail_and_start_vu_device( } } +fn process_vhost_user_control_request(tube: Tube, disk_host_tubes: &[Tube]) -> Result<()> { + let command = tube + .recv::() + .context("failed to receive VmRequest")?; + let resp = match command { + VmRequest::DiskCommand { + disk_index, + ref command, + } => match &disk_host_tubes.get(disk_index) { + Some(tube) => handle_disk_command(command, tube), + None => VmResponse::Err(base::Error::new(libc::ENODEV)), + }, + request => { + error!( + "Request {:?} currently not supported in vhost user backend", + request + ); + VmResponse::Err(base::Error::new(libc::EPERM)) + } + }; + + tube.send(&resp).context("failed to send VmResponse")?; + Ok(()) +} + +fn start_vhost_user_control_server( + control_server_socket: UnlinkUnixSeqpacketListener, + disk_host_tubes: Vec, +) { + info!("Start vhost-user control server"); + loop { + match control_server_socket.accept() { + Ok(socket) => { + let tube = Tube::new_from_unix_seqpacket(socket); + if let Err(e) = process_vhost_user_control_request(tube, &disk_host_tubes) { + error!("failed to process control request: {:#}", e); + } + } + Err(e) => { + error!("failed to establish connection: {}", e); + } + } + } +} + pub fn start_devices(opts: DevicesCommand) -> anyhow::Result<()> { struct DeviceJailInfo { // Unique name for the device, in the form `foomatic-0`. @@ -2921,18 +2967,43 @@ pub fn start_devices(opts: DevicesCommand) -> anyhow::Result<()> { Some(opts.jail) }; + // Create control server socket + let control_server_socket = opts.control_socket.map(|path| { + UnlinkUnixSeqpacketListener( + UnixSeqpacketListener::bind(path).expect("Could not bind socket"), + ) + }); + // Create serial devices. for (i, params) in opts.serial.iter().enumerate() { let serial_config = ¶ms.device_params; add_device(i, serial_config, ¶ms.vhost, &jail, &mut devices_jails)?; } + let mut disk_host_tubes = Vec::new(); + let control_socket_exists = control_server_socket.is_some(); // Create block devices. for (i, params) in opts.block.iter().enumerate() { - let disk_config = DiskConfig::new(¶ms.device_params, None); + let tube = if control_socket_exists { + let (host_tube, device_tube) = Tube::pair().context("failed to create tube")?; + disk_host_tubes.push(host_tube); + Some(device_tube) + } else { + None + }; + let disk_config = DiskConfig::new(¶ms.device_params, tube); add_device(i, &disk_config, ¶ms.vhost, &jail, &mut devices_jails)?; } + let ex = Executor::new()?; + if let Some(control_server_socket) = control_server_socket { + // Start the control server in the parent process. + ex.spawn_blocking(move || { + start_vhost_user_control_server(control_server_socket, disk_host_tubes) + }) + .detach(); + } + // Now wait for all device processes to return. while !devices_jails.is_empty() { match base::platform::wait_for_pid(-1, 0) { diff --git a/src/crosvm/sys/unix/cmdline.rs b/src/crosvm/sys/unix/cmdline.rs index 5d4b45e246..0bac9be915 100644 --- a/src/crosvm/sys/unix/cmdline.rs +++ b/src/crosvm/sys/unix/cmdline.rs @@ -2,6 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +use std::path::PathBuf; + use argh::FromArgs; use devices::virtio::block::block::DiskOption; use devices::virtio::vhost::user::device; @@ -89,6 +91,10 @@ pub struct DevicesCommand { #[argh(option, arg_name = "block options")] /// start a block device (see help from run command for options) pub block: Vec>, + + #[argh(option, short = 's', arg_name = "PATH")] + /// path to put the control socket. + pub control_socket: Option, } #[derive(FromArgs)] diff --git a/vm_control/src/lib.rs b/vm_control/src/lib.rs index 97ca9f87ea..796dc2a622 100644 --- a/vm_control/src/lib.rs +++ b/vm_control/src/lib.rs @@ -948,6 +948,24 @@ pub enum VmRequest { }, } +pub fn handle_disk_command(command: &DiskControlCommand, disk_host_tube: &Tube) -> VmResponse { + // Forward the request to the block device process via its control socket. + if let Err(e) = disk_host_tube.send(command) { + error!("disk socket send failed: {}", e); + return VmResponse::Err(SysError::new(EINVAL)); + } + + // Wait for the disk control command to be processed + match disk_host_tube.recv() { + Ok(DiskControlResult::Ok) => VmResponse::Ok, + Ok(DiskControlResult::Err(e)) => VmResponse::Err(e), + Err(e) => { + error!("disk socket recv failed: {}", e); + VmResponse::Err(SysError::new(EINVAL)) + } + } +} + /// WARNING: descriptor must be a mapping handle on Windows. fn map_descriptor( descriptor: &dyn AsRawDescriptor, @@ -1151,26 +1169,10 @@ impl VmRequest { VmRequest::DiskCommand { disk_index, ref command, - } => { - // Forward the request to the block device process via its control socket. - if let Some(sock) = disk_host_tubes.get(disk_index) { - if let Err(e) = sock.send(command) { - error!("disk socket send failed: {}", e); - VmResponse::Err(SysError::new(EINVAL)) - } else { - match sock.recv() { - Ok(DiskControlResult::Ok) => VmResponse::Ok, - Ok(DiskControlResult::Err(e)) => VmResponse::Err(e), - Err(e) => { - error!("disk socket recv failed: {}", e); - VmResponse::Err(SysError::new(EINVAL)) - } - } - } - } else { - VmResponse::Err(SysError::new(ENODEV)) - } - } + } => match &disk_host_tubes.get(disk_index) { + Some(tube) => handle_disk_command(command, tube), + None => VmResponse::Err(SysError::new(ENODEV)), + }, VmRequest::UsbCommand(ref cmd) => { let usb_control_tube = match usb_control_tube { Some(t) => t,