mirror of
https://chromium.googlesource.com/crosvm/crosvm
synced 2025-02-11 04:26:38 +00:00
devices: vvu-proxy: Support reconnecting when a sibling exits
This change will allow the guest to start a new VVU device backend after an old VVU device backend process exited due to sibling disconnection. More specifically, this CL will allow the vvu-proxy to support scenarios like follows: 1. A device backend starts in the guest and a sibling VM connects to the socket. 2. The sibling VM shuts down. Then, the device backend process exists in the guest. And, the vvu-proxy's state is changed from `Running` to `Activated`. 3. Return to 1. Note that we don't support more complicated reconnection scenario such as: * reconnection after sibling's unexpected crash * restart after device backend's unexpected crash To support the reconnection feature, the vvu-proxy device needs to clean its status when a sibling disconnected. Specifically, it needs to * reset virtqueues' state, * unregister memory regions that it registered via VmMemoryRequest, and * update the device state from the worker thread. BUG=b:216407443 TEST=run on workstation; sibling could reconnect after the first instance exits. Change-Id: I4c01e6069484ff74a0d643edd6a3b3231fb5c2d6 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3602361 Reviewed-by: Alexandre Courbot <acourbot@chromium.org> Tested-by: kokoro <noreply+kokoro@google.com> Commit-Queue: Keiichi Watanabe <keiichiw@chromium.org>
This commit is contained in:
parent
45ed48f213
commit
77ca3e34e6
2 changed files with 151 additions and 50 deletions
|
@ -331,6 +331,15 @@ impl Queue {
|
|||
self.last_used = Wrapping(0);
|
||||
}
|
||||
|
||||
/// Reset queue's counters.
|
||||
/// This method doesn't change the queue's metadata so it's reusable without initializing it
|
||||
/// again.
|
||||
pub fn reset_counters(&mut self) {
|
||||
self.next_avail = Wrapping(0);
|
||||
self.next_used = Wrapping(0);
|
||||
self.last_used = Wrapping(0);
|
||||
}
|
||||
|
||||
pub fn is_valid(&self, mem: &GuestMemory) -> bool {
|
||||
let queue_size = self.actual_size() as usize;
|
||||
let desc_table = self.desc_table;
|
||||
|
|
|
@ -14,6 +14,7 @@ use std::fmt;
|
|||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::os::unix::net::UnixListener;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
|
@ -24,8 +25,9 @@ use base::{
|
|||
use data_model::{DataInit, Le32};
|
||||
use libc::{recv, MSG_DONTWAIT, MSG_PEEK};
|
||||
use resources::Alloc;
|
||||
use sync::Mutex;
|
||||
use uuid::Uuid;
|
||||
use vm_control::{VmMemoryDestination, VmMemoryRequest, VmMemoryResponse, VmMemorySource};
|
||||
use vm_control::{MemSlot, VmMemoryDestination, VmMemoryRequest, VmMemoryResponse, VmMemorySource};
|
||||
use vm_memory::GuestMemory;
|
||||
use vmm_vhost::{
|
||||
connection::socket::Endpoint as SocketEndpoint,
|
||||
|
@ -227,6 +229,9 @@ struct Worker {
|
|||
|
||||
// Helps with communication and parsing messages from the sibling.
|
||||
slave_req_helper: SlaveReqHelper<SocketEndpoint<MasterReq>>,
|
||||
|
||||
// Stores memory regions that the worker has asked the main thread to register.
|
||||
registered_memory: Vec<MemSlot>,
|
||||
}
|
||||
|
||||
#[derive(EventToken, Debug, Clone)]
|
||||
|
@ -650,27 +655,34 @@ impl Worker {
|
|||
}
|
||||
|
||||
for (region, file) in contexts.iter().zip(files.into_iter()) {
|
||||
let request = VmMemoryRequest::RegisterMemory {
|
||||
source: VmMemorySource::Descriptor {
|
||||
descriptor: SafeDescriptor::from(file),
|
||||
offset: region.mmap_offset,
|
||||
size: region.memory_size,
|
||||
},
|
||||
dest: VmMemoryDestination::ExistingAllocation {
|
||||
allocation: self.pci_bar,
|
||||
offset: self.mem_offset as u64,
|
||||
},
|
||||
read_only: false,
|
||||
let source = VmMemorySource::Descriptor {
|
||||
descriptor: SafeDescriptor::from(file),
|
||||
offset: region.mmap_offset,
|
||||
size: region.memory_size,
|
||||
};
|
||||
self.process_memory_mapping_request(&request)?;
|
||||
let dest = VmMemoryDestination::ExistingAllocation {
|
||||
allocation: self.pci_bar,
|
||||
offset: self.mem_offset as u64,
|
||||
};
|
||||
self.register_memory(source, dest)?;
|
||||
self.mem_offset += region.memory_size as usize;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn register_memory(&mut self, source: VmMemorySource, dest: VmMemoryDestination) -> Result<()> {
|
||||
let request = VmMemoryRequest::RegisterMemory {
|
||||
source,
|
||||
dest,
|
||||
read_only: false,
|
||||
};
|
||||
self.send_memory_request(&request)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Sends memory mapping request to the main process. If successful adds the
|
||||
// mmaped info into |sibling_mem|, else returns error.
|
||||
fn process_memory_mapping_request(&mut self, request: &VmMemoryRequest) -> Result<()> {
|
||||
// mmaped info into `registered_memory`, else returns error.
|
||||
fn send_memory_request(&mut self, request: &VmMemoryRequest) -> Result<()> {
|
||||
self.main_process_tube
|
||||
.send(request)
|
||||
.context("sending mapping request to tube failed")?;
|
||||
|
@ -681,7 +693,12 @@ impl Worker {
|
|||
.context("receiving mapping request from tube failed")?;
|
||||
|
||||
match response {
|
||||
VmMemoryResponse::RegisterMemory { .. } => Ok(()),
|
||||
VmMemoryResponse::Ok => Ok(()),
|
||||
VmMemoryResponse::RegisterMemory { slot, .. } => {
|
||||
// Store the registered memory slot so we can unregister it when the thread ends.
|
||||
self.registered_memory.push(slot);
|
||||
Ok(())
|
||||
}
|
||||
VmMemoryResponse::Err(e) => {
|
||||
bail!("memory mapping failed: {}", e);
|
||||
}
|
||||
|
@ -838,6 +855,17 @@ impl Worker {
|
|||
.with_context(|| format!("failed to write call event for {}-th ring", index))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Clean up memory regions that the worker registered so that the device can start another
|
||||
// worker later.
|
||||
fn cleanup_registered_memory(&mut self) {
|
||||
while let Some(slot) = self.registered_memory.pop() {
|
||||
let req = VmMemoryRequest::UnregisterMemory(slot);
|
||||
if let Err(e) = self.send_memory_request(&req) {
|
||||
error!("failed to unregister memory slot: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Doorbell capability of the proxy device.
|
||||
|
@ -958,7 +986,9 @@ pub struct VirtioVhostUser {
|
|||
pci_address: Option<PciAddress>,
|
||||
|
||||
// The device's state.
|
||||
state: State,
|
||||
// The value is wrapped with `Arc<Mutex<_>>` because it can be modified from the worker thread
|
||||
// as well as the main device thread.
|
||||
state: Arc<Mutex<State>>,
|
||||
}
|
||||
|
||||
impl VirtioVhostUser {
|
||||
|
@ -985,10 +1015,10 @@ impl VirtioVhostUser {
|
|||
pci_bar: None,
|
||||
notification_select: None,
|
||||
notification_msix_vectors: [None; MAX_VHOST_DEVICE_QUEUES],
|
||||
state: State::Initialized {
|
||||
state: Arc::new(Mutex::new(State::Initialized {
|
||||
main_process_tube,
|
||||
listener,
|
||||
},
|
||||
})),
|
||||
pci_address,
|
||||
})
|
||||
}
|
||||
|
@ -1007,7 +1037,7 @@ impl VirtioVhostUser {
|
|||
|
||||
// Handles writes to the DOORBELL region of the BAR as per the VVU spec.
|
||||
fn write_bar_doorbell(&mut self, offset: u64) {
|
||||
match &self.state {
|
||||
match &*self.state.lock() {
|
||||
State::Running {
|
||||
worker_thread_tube, ..
|
||||
} => {
|
||||
|
@ -1096,17 +1126,34 @@ impl VirtioVhostUser {
|
|||
data[..2].copy_from_slice(&d);
|
||||
}
|
||||
|
||||
// Initializes state and starts the worker thread which will process all messages to this device
|
||||
// and send out messages in response.
|
||||
// This method must be called when a state is `State::Activated`.
|
||||
fn start_worker(&mut self) {
|
||||
// Checks the device's state and starts a worker thread if it's ready.
|
||||
// The thread will process all messages to this device and send out messages in response.
|
||||
fn try_starting_worker(&mut self) {
|
||||
let mut state = self.state.lock();
|
||||
|
||||
// Check the device state to decide whether start a new worker thread.
|
||||
// Note that this check cannot be done by the caller of this function because `self.state`
|
||||
// can be modified by another thread technically.
|
||||
match *state {
|
||||
State::Activated { .. } => (),
|
||||
_ => {
|
||||
// If the device is not ready or a thread is already running, do nothing here.
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// We'll prepare values that will be used in a new thread below.
|
||||
|
||||
// Clone to pass it into a worker thread.
|
||||
let state_cloned = Arc::clone(&self.state);
|
||||
|
||||
// Create tube to communicate with the worker thread and update the state.
|
||||
let (worker_thread_tube, main_thread_tube) =
|
||||
Tube::pair().expect("failed to create tube pair");
|
||||
|
||||
// Use `State::Invalid` as the intermediate state while preparing the proper next state.
|
||||
// Once a worker thread is successfully started, `self.state` will be updated to `Running`.
|
||||
let old_state: State = std::mem::replace(&mut self.state, State::Invalid);
|
||||
let old_state: State = std::mem::replace(&mut *state, State::Invalid);
|
||||
|
||||
// Retrieve values stored in the state value.
|
||||
let (
|
||||
|
@ -1139,8 +1186,8 @@ impl VirtioVhostUser {
|
|||
tx_queue_evt,
|
||||
),
|
||||
s => {
|
||||
error!("start_worker was called with invalid state: {}", s);
|
||||
return;
|
||||
// Unreachable because we've checked the state at the beginning of this function.
|
||||
unreachable!("invalid state: {}", s)
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1193,6 +1240,7 @@ impl VirtioVhostUser {
|
|||
mem_offset: SHARED_MEMORY_OFFSET as usize,
|
||||
vrings,
|
||||
slave_req_helper,
|
||||
registered_memory: Vec::new(),
|
||||
};
|
||||
match worker.run(
|
||||
rx_queue_evt.try_clone().unwrap(),
|
||||
|
@ -1206,7 +1254,30 @@ impl VirtioVhostUser {
|
|||
}
|
||||
Ok(ExitReason::Disconnected) => {
|
||||
info!("worker thread exited: sibling disconnected");
|
||||
// TODO(b/216407443): Handle sibling reconnect events and update the state.
|
||||
|
||||
worker.cleanup_registered_memory();
|
||||
|
||||
let mut state = state_cloned.lock();
|
||||
let Worker {
|
||||
mem,
|
||||
interrupt,
|
||||
rx_queue,
|
||||
tx_queue,
|
||||
main_process_tube,
|
||||
..
|
||||
} = worker;
|
||||
|
||||
*state = State::Activated {
|
||||
main_process_tube,
|
||||
listener,
|
||||
mem,
|
||||
interrupt,
|
||||
rx_queue,
|
||||
tx_queue,
|
||||
rx_queue_evt,
|
||||
tx_queue_evt,
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
|
@ -1222,7 +1293,7 @@ impl VirtioVhostUser {
|
|||
return;
|
||||
}
|
||||
Ok(worker_thread) => {
|
||||
self.state = State::Running {
|
||||
*state = State::Running {
|
||||
worker_thread_tube,
|
||||
kill_evt: self_kill_evt,
|
||||
worker_thread,
|
||||
|
@ -1234,11 +1305,12 @@ impl VirtioVhostUser {
|
|||
|
||||
impl Drop for VirtioVhostUser {
|
||||
fn drop(&mut self) {
|
||||
let mut state = self.state.lock();
|
||||
if let State::Running {
|
||||
kill_evt,
|
||||
worker_thread,
|
||||
..
|
||||
} = std::mem::replace(&mut self.state, State::Invalid)
|
||||
} = std::mem::replace(&mut *state, State::Invalid)
|
||||
{
|
||||
match kill_evt.write(1) {
|
||||
Ok(()) => {
|
||||
|
@ -1263,7 +1335,7 @@ impl VirtioDevice for VirtioVhostUser {
|
|||
fn keep_rds(&self) -> Vec<RawDescriptor> {
|
||||
let mut rds = Vec::new();
|
||||
|
||||
match &self.state {
|
||||
match &*self.state.lock() {
|
||||
State::Initialized {
|
||||
main_process_tube,
|
||||
listener,
|
||||
|
@ -1312,12 +1384,10 @@ impl VirtioDevice for VirtioVhostUser {
|
|||
0, /* src_offset */
|
||||
);
|
||||
|
||||
let is_activated = matches!(self.state, State::Activated { .. });
|
||||
|
||||
// The driver has indicated that it's safe for the Vhost-user sibling to
|
||||
// initiate a connection and send data over.
|
||||
if self.config.is_slave_up() && is_activated {
|
||||
self.start_worker();
|
||||
if self.config.is_slave_up() {
|
||||
self.try_starting_worker();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1387,14 +1457,16 @@ impl VirtioDevice for VirtioVhostUser {
|
|||
return;
|
||||
}
|
||||
|
||||
let mut state = self.state.lock();
|
||||
// Use `State::Invalid` as the intermediate state here.
|
||||
let old_state: State = std::mem::replace(&mut self.state, State::Invalid);
|
||||
let old_state: State = std::mem::replace(&mut *state, State::Invalid);
|
||||
|
||||
match old_state {
|
||||
State::Initialized {
|
||||
listener,
|
||||
main_process_tube,
|
||||
} => {
|
||||
self.state = State::Activated {
|
||||
*state = State::Activated {
|
||||
listener,
|
||||
main_process_tube,
|
||||
mem,
|
||||
|
@ -1409,7 +1481,7 @@ impl VirtioDevice for VirtioVhostUser {
|
|||
// If the old state is not `Initialized`, it becomes `Invalid`.
|
||||
error!("activate() is called in an unexpected state: {}", s);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn read_bar(&mut self, bar_index: PciBarIndex, offset: u64, data: &mut [u8]) {
|
||||
|
@ -1441,37 +1513,57 @@ impl VirtioDevice for VirtioVhostUser {
|
|||
}
|
||||
|
||||
fn reset(&mut self) -> bool {
|
||||
let new_state = match std::mem::replace(&mut self.state, State::Invalid) {
|
||||
old_state @ State::Initialized { .. } => old_state,
|
||||
info!("resetting vvu-proxy device");
|
||||
|
||||
let mut state = self.state.lock();
|
||||
match std::mem::replace(&mut *state, State::Invalid) {
|
||||
old_state @ State::Initialized { .. } => {
|
||||
*state = old_state;
|
||||
}
|
||||
State::Activated {
|
||||
listener,
|
||||
main_process_tube,
|
||||
ref mut rx_queue,
|
||||
ref mut tx_queue,
|
||||
..
|
||||
} => State::Initialized {
|
||||
listener,
|
||||
main_process_tube,
|
||||
},
|
||||
} => {
|
||||
rx_queue.reset_counters();
|
||||
tx_queue.reset_counters();
|
||||
*state = State::Initialized {
|
||||
listener,
|
||||
main_process_tube,
|
||||
};
|
||||
}
|
||||
State::Running {
|
||||
kill_evt,
|
||||
worker_thread,
|
||||
..
|
||||
} => {
|
||||
// TODO(b/216407443): The current implementation doesn't support the case where
|
||||
// vvu-proxy is reset while running.
|
||||
// So, the state is changed to `Invalid` in this case below.
|
||||
// We should support this case eventually.
|
||||
// e.g. The VVU device backend in the guest is killed unexpectedly.
|
||||
// To support this case, we might need to reset iommu's state as well.
|
||||
|
||||
if let Err(e) = kill_evt.write(1) {
|
||||
error!("failed to notify the kill event: {}", e);
|
||||
}
|
||||
|
||||
// Drop the lock, as the worker thread might change the state.
|
||||
drop(state);
|
||||
if let Err(e) = worker_thread.join() {
|
||||
error!("failed to get back resources: {:?}", e);
|
||||
}
|
||||
|
||||
// TODO(b/216407443): Support the case where vvu-proxy is reset while running.
|
||||
// e.g. The VVU device backend in the guest is killed unexpectedly.
|
||||
State::Invalid
|
||||
let mut state = self.state.lock();
|
||||
*state = State::Invalid;
|
||||
}
|
||||
State::Invalid => {
|
||||
// TODO(b/216407443): Support this case.
|
||||
}
|
||||
State::Invalid => State::Invalid,
|
||||
};
|
||||
|
||||
self.state = new_state;
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue