devices: vhost-user: block: add control server to devices command

Add an in-process control server-like mechanism which runs in the
devices command to enable controlling vhost-user devices with control
commands such as DiskControlCommands. Like the in-process version,
the vhost-user control server waits for connection on the specified
socket, processes the recevied command, and forwards it to the
corresponding device via established tubes. Currently, DiskCommands are
the only commands supported by the control server, as block devices are
the only devices which need this mechanism.

This patch also adds the mechanism to process forwarded commands in
the vhost-user block backend device, similar to the command processing
mechanism in the worker provided in the in-process block device.

BUG=b:191845881
TEST=cargo run devices -s /path/to/socket.sock --block ..., then run
cargo run disk resize 0 $SIZE /path/to/socket.sock

Change-Id: I7b96e6c4bb7371424ca220da5f95be88e82c1fc0
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/3841001
Reviewed-by: Alexandre Courbot <acourbot@chromium.org>
Reviewed-by: Keiichi Watanabe <keiichiw@chromium.org>
Tested-by: Keita Suzuki <suzukikeita@google.com>
Reviewed-by: Morg <morg@chromium.org>
Commit-Queue: Keita Suzuki <suzukikeita@google.com>
This commit is contained in:
Keita Suzuki 2022-08-19 04:31:38 +00:00 committed by crosvm LUCI
parent 513a5484a1
commit 7d1c18ac69
6 changed files with 138 additions and 26 deletions

View file

@ -169,6 +169,7 @@ base = "*"
bit_field = { path = "bit_field" }
broker_ipc = { path = "broker_ipc" }
cfg-if = "1.0.0"
cros_async = { path = "cros_async" }
crosvm_plugin = { path = "crosvm_plugin", optional = true }
data_model = "*"
devices = { path = "devices" }

View file

@ -66,7 +66,7 @@ const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE; NUM_QUEUES as usize];
#[sorted]
#[derive(ThisError, Debug)]
enum ExecuteError {
pub enum ExecuteError {
#[error("failed to copy ID string: {0}")]
CopyId(io::Error),
#[error("virtio descriptor error: {0}")]
@ -293,9 +293,24 @@ pub async fn handle_queue<I: SignalableInterrupt + Clone + 'static>(
}
}
/// handles the disk control requests from the vhost user backend control server.
pub async fn handle_vhost_user_command_tube(
command_tube: AsyncTube,
disk_state: Rc<AsyncMutex<DiskState>>,
) -> Result<(), ExecuteError> {
// Process the commands. Sets |interrupt| to None since vhost user backend
// currently does not support sending interrupts to the guest kernel.
// TODO(b/191845881): Use backend to frontend vhost user message
// CONFIG_CHANGE_MSG to notify the guest kernel once sending such message
// is supported.
handle_command_tube(&Some(command_tube), None, Rc::clone(&disk_state)).await
}
// TODO(b/191845881): Update argument |interrupt| from Option to non-Option value
// once we enable sending vhost-user message from the backend to the frontend.
async fn handle_command_tube(
command_tube: &Option<AsyncTube>,
interrupt: Rc<RefCell<Interrupt>>,
interrupt: Option<Rc<RefCell<Interrupt>>>,
disk_state: Rc<AsyncMutex<DiskState>>,
) -> Result<(), ExecuteError> {
let command_tube = match command_tube {
@ -320,7 +335,9 @@ async fn handle_command_tube(
.await
.map_err(ExecuteError::SendingResponse)?;
if let DiskControlResult::Ok = resp {
interrupt.borrow().signal_config_changed();
if let Some(interrupt) = &interrupt {
interrupt.borrow().signal_config_changed();
}
}
}
Err(e) => return Err(ExecuteError::ReceivingCommand(e)),
@ -416,7 +433,11 @@ fn run_worker(
pin_mut!(resample);
// Handles control requests.
let control = handle_command_tube(control_tube, Rc::clone(&interrupt), disk_state.clone());
let control = handle_command_tube(
control_tube,
Some(Rc::clone(&interrupt)),
disk_state.clone(),
);
pin_mut!(control);
// Handle all the queues in one sub-select call.
@ -489,9 +510,9 @@ pub struct BlockAsync {
pub(crate) seg_max: u32,
pub(crate) block_size: u32,
pub(crate) id: Option<BlockId>,
pub(crate) control_tube: Option<Tube>,
kill_evt: Option<Event>,
worker_thread: Option<thread::JoinHandle<(Box<dyn ToAsyncDisk>, Option<Tube>)>>,
control_tube: Option<Tube>,
}
impl BlockAsync {

View file

@ -17,6 +17,7 @@ use base::warn;
use base::Event;
use base::Timer;
use cros_async::sync::Mutex as AsyncMutex;
use cros_async::AsyncTube;
use cros_async::EventAsync;
use cros_async::Executor;
use cros_async::TimerAsync;
@ -32,6 +33,7 @@ use vmm_vhost::message::*;
use crate::virtio;
use crate::virtio::block::asynchronous::flush_disk;
use crate::virtio::block::asynchronous::handle_queue;
use crate::virtio::block::asynchronous::handle_vhost_user_command_tube;
use crate::virtio::block::asynchronous::BlockAsync;
use crate::virtio::block::build_config_space;
use crate::virtio::block::DiskState;
@ -108,6 +110,15 @@ impl VhostUserDevice for BlockAsync {
))
.detach();
if let Some(control_tube) = self.control_tube.take() {
let async_tube = AsyncTube::new(ex, control_tube)?;
ex.spawn_local(handle_vhost_user_command_tube(
async_tube,
Rc::clone(&disk_state),
))
.detach();
}
Ok(Box::new(BlockBackend {
ex: ex.clone(),
disk_state,

View file

@ -55,6 +55,7 @@ use base::UnixSeqpacket;
use base::UnixSeqpacketListener;
use base::UnlinkUnixSeqpacketListener;
use base::*;
use cros_async::Executor;
use device_helpers::*;
use devices::serial_device::SerialHardware;
use devices::vfio::VfioCommonSetup;
@ -2883,6 +2884,51 @@ fn jail_and_start_vu_device<T: VirtioDeviceBuilder>(
}
}
fn process_vhost_user_control_request(tube: Tube, disk_host_tubes: &[Tube]) -> Result<()> {
let command = tube
.recv::<VmRequest>()
.context("failed to receive VmRequest")?;
let resp = match command {
VmRequest::DiskCommand {
disk_index,
ref command,
} => match &disk_host_tubes.get(disk_index) {
Some(tube) => handle_disk_command(command, tube),
None => VmResponse::Err(base::Error::new(libc::ENODEV)),
},
request => {
error!(
"Request {:?} currently not supported in vhost user backend",
request
);
VmResponse::Err(base::Error::new(libc::EPERM))
}
};
tube.send(&resp).context("failed to send VmResponse")?;
Ok(())
}
fn start_vhost_user_control_server(
control_server_socket: UnlinkUnixSeqpacketListener,
disk_host_tubes: Vec<Tube>,
) {
info!("Start vhost-user control server");
loop {
match control_server_socket.accept() {
Ok(socket) => {
let tube = Tube::new_from_unix_seqpacket(socket);
if let Err(e) = process_vhost_user_control_request(tube, &disk_host_tubes) {
error!("failed to process control request: {:#}", e);
}
}
Err(e) => {
error!("failed to establish connection: {}", e);
}
}
}
}
pub fn start_devices(opts: DevicesCommand) -> anyhow::Result<()> {
struct DeviceJailInfo {
// Unique name for the device, in the form `foomatic-0`.
@ -2921,18 +2967,43 @@ pub fn start_devices(opts: DevicesCommand) -> anyhow::Result<()> {
Some(opts.jail)
};
// Create control server socket
let control_server_socket = opts.control_socket.map(|path| {
UnlinkUnixSeqpacketListener(
UnixSeqpacketListener::bind(path).expect("Could not bind socket"),
)
});
// Create serial devices.
for (i, params) in opts.serial.iter().enumerate() {
let serial_config = &params.device_params;
add_device(i, serial_config, &params.vhost, &jail, &mut devices_jails)?;
}
let mut disk_host_tubes = Vec::new();
let control_socket_exists = control_server_socket.is_some();
// Create block devices.
for (i, params) in opts.block.iter().enumerate() {
let disk_config = DiskConfig::new(&params.device_params, None);
let tube = if control_socket_exists {
let (host_tube, device_tube) = Tube::pair().context("failed to create tube")?;
disk_host_tubes.push(host_tube);
Some(device_tube)
} else {
None
};
let disk_config = DiskConfig::new(&params.device_params, tube);
add_device(i, &disk_config, &params.vhost, &jail, &mut devices_jails)?;
}
let ex = Executor::new()?;
if let Some(control_server_socket) = control_server_socket {
// Start the control server in the parent process.
ex.spawn_blocking(move || {
start_vhost_user_control_server(control_server_socket, disk_host_tubes)
})
.detach();
}
// Now wait for all device processes to return.
while !devices_jails.is_empty() {
match base::platform::wait_for_pid(-1, 0) {

View file

@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::path::PathBuf;
use argh::FromArgs;
use devices::virtio::block::block::DiskOption;
use devices::virtio::vhost::user::device;
@ -89,6 +91,10 @@ pub struct DevicesCommand {
#[argh(option, arg_name = "block options")]
/// start a block device (see help from run command for options)
pub block: Vec<VhostUserParams<DiskOption>>,
#[argh(option, short = 's', arg_name = "PATH")]
/// path to put the control socket.
pub control_socket: Option<PathBuf>,
}
#[derive(FromArgs)]

View file

@ -948,6 +948,24 @@ pub enum VmRequest {
},
}
pub fn handle_disk_command(command: &DiskControlCommand, disk_host_tube: &Tube) -> VmResponse {
// Forward the request to the block device process via its control socket.
if let Err(e) = disk_host_tube.send(command) {
error!("disk socket send failed: {}", e);
return VmResponse::Err(SysError::new(EINVAL));
}
// Wait for the disk control command to be processed
match disk_host_tube.recv() {
Ok(DiskControlResult::Ok) => VmResponse::Ok,
Ok(DiskControlResult::Err(e)) => VmResponse::Err(e),
Err(e) => {
error!("disk socket recv failed: {}", e);
VmResponse::Err(SysError::new(EINVAL))
}
}
}
/// WARNING: descriptor must be a mapping handle on Windows.
fn map_descriptor(
descriptor: &dyn AsRawDescriptor,
@ -1151,26 +1169,10 @@ impl VmRequest {
VmRequest::DiskCommand {
disk_index,
ref command,
} => {
// Forward the request to the block device process via its control socket.
if let Some(sock) = disk_host_tubes.get(disk_index) {
if let Err(e) = sock.send(command) {
error!("disk socket send failed: {}", e);
VmResponse::Err(SysError::new(EINVAL))
} else {
match sock.recv() {
Ok(DiskControlResult::Ok) => VmResponse::Ok,
Ok(DiskControlResult::Err(e)) => VmResponse::Err(e),
Err(e) => {
error!("disk socket recv failed: {}", e);
VmResponse::Err(SysError::new(EINVAL))
}
}
}
} else {
VmResponse::Err(SysError::new(ENODEV))
}
}
} => match &disk_host_tubes.get(disk_index) {
Some(tube) => handle_disk_command(command, tube),
None => VmResponse::Err(SysError::new(ENODEV)),
},
VmRequest::UsbCommand(ref cmd) => {
let usb_control_tube = match usb_control_tube {
Some(t) => t,